解决Spark读取CSV文件中文乱码的完整例子

park.read.option(“header”,”true”).csv(path) 的默认方法,如果读取的源数据是utf-8k中文的,能正常显示,但如果Spark读取带有GBK或GB2312等中文编码的话,就会有Spark GBK乱码或Spark GB2312乱码。下面示例一个完整例子,如果Spark读取不是GBK编码的,只需要替换下面的中文编码。

import java.sql.DriverManager

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StructField, StructType, _}

import scala.collection.mutable.ArrayBuffer


/**
* 通过友盟统计1天访问明细
*/
object UmengRangfeiSQL {
def main(arg: Array[String]): Unit = {
val spark = SparkSession.builder().appName(“UmengRangfeiSQL”).master(“local[*]”).getOrCreate(); //为读取的数据创建schema
// println(System.getProperty(“file.encoding”))
// val pps = System.getProperties
// pps.setProperty(“file.encoding”, “GB2312”)
val taxiSchema = StructType(Array(
StructField(“PageUrl”, StringType, true),
StructField(“PV”, IntegerType, true),
StructField(“UV”, IntegerType, true),
StructField(“IP”, IntegerType, true),
StructField(“PageViews”, DoubleType, true),
StructField(“Output PV”, IntegerType, true),
StructField(“Stay Time”, StringType, true)
))
val path = “E:\\newcode\\MyFirstProject\\data\\rangfei”
//跳过第一行的标题 .option(“header”,”true”)
// val data = spark.read.option(“header”,”true”).schema(taxiSchema).csv(path)

val mySchema = new ArrayBuffer[String]();
mySchema.append(“PageUrl”);
mySchema.append(“PV”);
mySchema.append(“UV”);
mySchema.append(“IP”);
mySchema.append(“PageViews”);
mySchema.append(“Output PV”);
mySchema.append(“Stay Time”);

val data = readCSV(spark, “TRUE”, mySchema,”GBK”, path)
data.show()




data.createTempView(“umng_rangfei”)
val df = data.toDF()

df.persist()

//按受欢迎的分类倒序排列
val resultRdd = df.sqlContext.sql(“select * from umng_rangfei order by PageViews DESC”)
resultRdd.show()

//过虑查找深度好文
val haowenRdd = df.sqlContext.sql(“select * from umng_rangfei WHERE PageUrl LIKE ‘%haowen%’ AND PV > 100 order by PageUrl DESC”)
haowenRdd.show()

spark.sparkContext.hadoopConfiguration.setBoolean(“mapreduce.input.fileinputformat.input.dir.recursive”, true)
deleteOutPutPath(spark.sparkContext,”E:\\newcode\\MyFirstProject\\data\\output\\haowen”)


//加上repartition来控制只有1个输出文件
haowenRdd.repartition(1).write.format(“com.databricks.spark.csv”).save(“E:\\newcode\\MyFirstProject\\data\\output\\haowen”)

val womanRdd = df.sqlContext.sql(“select * from umng_rangfei WHERE PageUrl LIKE ‘%/woman/?p=%’ AND PV > 100 order by PageUrl DESC”)
womanRdd.show()

deleteOutPutPath(spark.sparkContext,”E:\\newcode\\MyFirstProject\\data\\output\\woman”)

//加上repartition来控制只有1个输出文件
womanRdd.repartition(1).write.format(“com.databricks.spark.csv”).save(“E:\\newcode\\MyFirstProject\\data\\output\\woman”)



}

/**
* 删除文件夹或文件
*
@param sc
* @param outputPath
*/
def deleteOutPutPath(sc: SparkContext,outputPath: String):Unit={
val path = new Path(outputPath)
val hadoopConf = sc.hadoopConfiguration
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
if(hdfs.exists(path)){
hdfs.delete(path,true)
}
}



def readCSV(spark:SparkSession,headerSchema:String,mySchema: ArrayBuffer[String],code:String,file:String) ={
val rddArr:RDD[Array[String]] = spark.sparkContext.hadoopFile(file, classOf[TextInputFormat],
classOf[LongWritable], classOf[Text]).map(
pair => new String(pair._2.getBytes, 0, pair._2.getLength, code))
//处理同一个单元格 同时出现 引号 逗号串列问题 切割
.map(_.trim.split(“,(?=([^\”]*\”[^\”]*\”)*[^\”]*$)”,-1))
val fieldArr = rddArr.first()
//Row.fromSeq(_) 如果只是 map(Row(_)),会导致 spark.createDataFrame(rddRow,schema)错误
val rddRow = rddArr.filter(!_.reduce(_+_).equals(fieldArr.reduce(_+_))).map(Row.fromSeq(_))
val schemaList = ArrayBuffer[StructField]()
if(“TRUE”.equals(headerSchema)){
for(i <- 0 until fieldArr.length){
println(“fieldArr(i)=” + fieldArr(i))
schemaList.append(StructField(mySchema(i),DataTypes.StringType))
}
}else{
for(i <- 0 until fieldArr.length){
schemaList.append(StructField(s”_c$i”,DataTypes.StringType))
println(“fieldArr(i)=” + fieldArr(i))
}
}
val schema = StructType(schemaList)
spark.createDataFrame(rddRow,schema)
}




}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注