Spark读取文件来统计股票资金流水

数据格式:

操作代码:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.sql.types._

import scala.io.Source
//导入隐饰操作,否则RDD无法调用toDF方法


object ExcelStockEarn extends AppConf {

def main(args: Array[String]): Unit = {
readExcel;
}

def readExcel = {


import spark.implicits._
val path = "E:\\newcode\\MyFirstProject\\data\\stockearn"

//我们要统计脏数据的数量,所以我在这里定义了一个计数器
val accum = spark.sparkContext.longAccumulator("Error Accum")

val listRdd = spark.read.textFile(path).rdd
.map({
line =>
val fields = line.split("\\s+")
if(fields.length == 14) {
CaseFlow(fields(0).toString, fields(1).toString, fields(2).toString, fields(3).toInt, fields(4).toDouble, fields(5).toString, fields(6).toDouble, fields(7).toDouble,
fields(8).toString, fields(9).toString, fields(10).toString, fields(11).toString, fields(12).toString, "")
}else{
accum.add(1L)
CaseFlow(fields(0).toString, "", "", 0, 0, "", 0, 0, "", "", "", "", "", "")
}

}).toDF();


listRdd.createTempView("option_stock")
val df = listRdd.toDF()
df.persist()

val resultRdd = df.sqlContext.sql("select * from option_stock ")
resultRdd.show();

val groupRdd = df.sqlContext.sql("select stockCode, SUM(dealAmount) from option_stock group by StockCode order by StockCode")
groupRdd.show();

}


case class CaseFlow(dealDate : String ,stockCode : String , stockName: String , dealNum : Int , dealPrice : Double, dealContent : String,
dealAmount :Double, remainAmount: Double, standby1: String, standby2: String, standby3: String, standby4: String, standby5: String,standby6: String);

}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注