spark实例2:统计亚马逊联盟的导出的费用明细的csv

统计出自己关心的数据,并把部分关心的数据保存为csv。从这个实例可以学习到spark2.0+如何来读写csv文件,如何用spark sql来统计数据。

 

object AmazonFeeSQL {
  def main(arg: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("UmengPV").master("local[*]").getOrCreate(); //为读取的数据创建schema

    val taxiSchema = StructType(Array(
      StructField("Category", StringType, true),
      StructField("Name", StringType, true),
      StructField("ASIN", StringType, true),
      StructField("Seller", StringType, true),
      StructField("Tracking ID", StringType, true),
      StructField("Date Shipped", StringType, true),
      StructField("Price", StringType, true),
      StructField("Items Shipped", IntegerType, true),
      StructField("Returns", IntegerType, true),
      StructField("Revenue", DoubleType, true),
      StructField("Ad Fees", DoubleType, true),
      StructField("Device Type Group", StringType, true),
      StructField("Direct", StringType, true)
    ))
    val path = "E:\\newcode\\MyFirstProject\\data\\amazon\\fee"
    //跳过第一行的标题 .option("header","true")
    val data = spark.read.option("header","true").schema(taxiSchema).csv(path)
    //data.show()
    data.createTempView("amazon_fee")
    val df = data.toDF()

    //按受欢迎的分类倒序排列
    val resultRdd = df.sqlContext.sql("select Category, count(Category) as cateNum from amazon_fee GROUP BY Category order by cateNum DESC")
    resultRdd.show()

    //最受欢迎的商品排列
    val top1Rdd = df.sqlContext.sql("select * from amazon_fee WHERE Category = 'Home'")
    top1Rdd.show()

    //最受欢迎的商品排列
    val earnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE ORDER BY Revenue DESC")
    earnTopRdd.show()

    //被退回次数最多的
    val returnTopRdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Returns > 0  ORDER BY Returns DESC")
    returnTopRdd.show()

    //统计价格区间内的商品数量
    val priceRangeRdd = df.sqlContext.sql("SELECT price_range, count(*) AS number FROM(select case when Price >= 0 and Price <= 4.99 then '0-5'  when Price >= 5 and Price <= 10 then '005-10'  when Price >= 10 and Price <= 14.99 then '010-15'   when Price >= 15 and Price <= 19.99 then '015-20'  when Price >= 20 and Price <= 24.99 then '020-25' when Price >= 25 and Price <= 49.99 then '025-50'   when Price >= 50 and Price <= 99.99 then '050-100'    else '100+' end as price_range FROM amazon_fee WHERE true) AS  price_summaries GROUP BY price_range ORDER BY price_range")
    priceRangeRdd.show()


    //购买前2名的类型的商品
    val top3Rdd = df.sqlContext.sql("SELECT * FROM amazon_fee WHERE Category = 'Home' OR Category = 'Toys & Games'")
    top3Rdd.show()
    top3Rdd.write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\Home_ToysGames_BestSeller")


  }
}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注