数据清洗, 是大数据分析过程中重要的环节,主要作用是去除不需要的数据,填充缺失内容,确定缺失值的范围并制定好应对策略。
/**
* 接收用户的评分信息
*/
case class UserRating(userId:Int, movieId:Int, rating:Double)
import com.zxl.caseclass.{UserRating, Users}
import com.zxl.conf.AppConf
import com.zxl.datacleaner.UserETL._
import org.apache.spark.sql.SaveMode
/**
* 数据格式如下
* 1,1193,5,978300760
* 1,661,3,978302109
* 1,914,3,978301968
* 1,3408,4,978300275
* 1,2355,5,978824291
* 1,1197,3,978302268
* 1,1287,5,978302039
* 1,2804,5,978300719
* 1,594,4,978302268
* 1,919,4,978301368
* 1,595,5,978824268
* 1,938,4,978301752
* 1,2398,4,978302281
* 1,2918,4,978302124
* 1,1035,5,978301753
* 1,2791,4,978302188
* 1,2687,3,978824268
* 1,2018,4,978301777
* 1,3105,5,978301713
* 1,2797,4,978302039
*/
object RatingETL extends AppConf {
def main(args: Array[String]) {
import sqlContext.implicits._
// 2 读取样本数据
// val data_path = "hdfs://movie1:9000/movie/data/ratings.txt"
val data_path = "data/ratings.dat"
val data = sc.textFile(data_path, 8)
val userdata = data.map(_.split(",")).map(f => UserRating(f(0).toInt,f(1).toInt,f(2).toDouble)).cache()
val userDF = userdata.toDF()
// 存储结果至数据库
userDF.write.mode(SaveMode.Append).jdbc(jdbcURL, ratingTable, prop)
}
}