统计出自己关心的数据,并把部分关心的数据保存为csv。从这个实例可以学习到spark2.0+如何来读写csv文件,如何用spark sql来统计数据。
object AmazonFeeSQL {
def main(arg: Array[String]): Unit = {
val spark = SparkSession.builder().appName("UmengPV").master("local[*]").getOrCreate(); //为读取的数据创建schema
val taxiSchema = StructType(Array(
StructField("Category", StringType, true),
StructField("Name", StringType, true),
StructField("ASIN", StringType, true),
StructField("Seller", StringType, true),
StructField("Tracking ID", StringType, true),
StructField("Date Shipped", StringType, true),
StructField("Price", StringType, true),
StructField("Items Shipped", IntegerType, true),
StructField("Returns", IntegerType, true),
StructField("Revenue", DoubleType, true),
StructField("Ad Fees", DoubleType, true),
StructField("Device Type Group", StringType, true),
StructField("Direct", StringType, true)
))
val path = "E:\\newcode\\MyFirstProject\\data\\amazon\\fee"
//跳过第一行的标题 .option("header","true")
val data = spark.read.option("header","true").schema(taxiSchema).csv(path)
//data.show()
data.createTempView("amazon_fee")
val df = data.toDF()
//按受欢迎的分类倒序排列
val resultRdd = df.sqlContext.sql("select Category, count(Category) as cateNum from amazon_fee GROUP BY Category order by cateNum DESC")
resultRdd.show()
//最受欢迎的商品排列
val top1Rdd = df.sqlContext.sql("select * from amazon_fee WHERE Category = 'Home'")
top1Rdd.show()
//最受欢迎的商品排列
val earnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE ORDER BY Revenue DESC")
earnTopRdd.show()
//被退回次数最多的
val returnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Returns > 0 ORDER BY Returns DESC")
returnTopRdd.show()
//统计价格区间内的商品数量
val priceRangeRdd = df.sqlContext.sql("SELECT price_range, count(*) AS number FROM(select case when Price >= 0 and Price <= 4.99 then '0-5' when Price >= 5 and Price <= 10 then '005-10' when Price >= 10 and Price <= 14.99 then '010-15' when Price >= 15 and Price <= 19.99 then '015-20' when Price >= 20 and Price <= 24.99 then '020-25' when Price >= 25 and Price <= 49.99 then '025-50' when Price >= 50 and Price <= 99.99 then '050-100' else '100+' end as price_range FROM amazon_fee WHERE true) AS price_summaries GROUP BY price_range ORDER BY price_range")
priceRangeRdd.show()
//购买前2名的类型的商品
val top3Rdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Category = 'Home' OR Category = 'Toys & Games'")
top3Rdd.show()
top3Rdd.write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\Home_ToysGames_BestSeller")
}
}