解决Spark读取文本中文乱码的完整例子

spark.read.textFile() 的默认方法,如果读取的源数据是utf-8k中文的,能正常显示,但如果带有GBK或GB2312等中文编码的话,就会有乱码。下面示例一个完整例子,如果不是GBK编码的,只需要替换下面的中文编码。



//导入隐饰操作,否则RDD无法调用toDF方法


object ExcelStockEarn extends AppConf {

def main(args: Array[String]): Unit = {
readExcel;
}

def readExcel = {


import spark.implicits._


import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
val path = "E:\\newcode\\MyFirstProject\\data\\stockearn"
val inputRdd = spark.sparkContext.hadoopFile(path, classOf[TextInputFormat],
classOf[LongWritable], classOf[Text]).map(
pair => new String(pair._2.getBytes, 0, pair._2.getLength, "GBK"))


//我们要统计脏数据的数量,所以我在这里定义了一个计数器
val accum = spark.sparkContext.longAccumulator("Error Accum")

val listRdd =inputRdd
.map({
line =>
val fields = line.split("\\s+")
if(fields.length == 14) {
CaseFlow(fields(0).toString, fields(1).toString, fields(2).toString, fields(3).toInt, fields(4).toDouble, fields(5).toString, fields(6).toDouble, fields(7).toDouble,
fields(8).toString, fields(9).toString, fields(10).toString, fields(11).toString, fields(12).toString, "")
}else{
accum.add(1L)
CaseFlow(fields(0).toString, "", "", 0, 0, "", 0, 0, "", "", "", "", "", "")
}

})
val dataRdd = listRdd.filter(_.stockCode.length > 1).toDF()

dataRdd.createTempView("option_stock")
val df = dataRdd.toDF()
df.persist()

val resultRdd = df.sqlContext.sql("select * from option_stock ")
resultRdd.show();

val groupRdd = df.sqlContext.sql("select stockCode, SUM(dealAmount) from option_stock group by StockCode order by StockCode")
groupRdd.show();

}




case class CaseFlow(dealDate : String ,stockCode : String , stockName: String , dealNum : Int , dealPrice : Double, dealContent : String,
dealAmount :Double, remainAmount: Double, standby1: String, standby2: String, standby3: String, standby4: String, standby5: String,standby6: String);

}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注