Spark分析个股的活跃性

在前文scala获取免费的股票日k线数据 ,本文做进一步扩展,统计一下股票振幅,统计最近20天振幅大于4个点有多少天。对于炒股喜欢短线的人来说,振幅大的股票,越好做T+0操作。


import java.util
import java.util.Collections

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import stock.SinaStock

import scala.io.Source
object KLineAnalyse {
def main(args: Array[String]): Unit = {
println("查询日k线股票 http://data.gtimg.cn/flashdata/hushen/daily/19/sh603000.js")
val sinaStockStream = Source.fromURL("http://data.gtimg.cn/flashdata/hushen/daily/19/sh603000.js","utf-8")
val sinaLines=sinaStockStream.getLines
val spark = SparkSession.builder().appName("kline").master("local[*]").getOrCreate(); //为读取的数据创建schema
//val sc = new SparkContext(SparkUtils.getSparkConf("SequenceFileUsage"))
val list = new util.ArrayList[KLineModel]()
for(line <- sinaLines) { /** 将每行数据解析成SinaStock对象,并答应对应的股票信息 **/
if(line.length > 20) {
// println(new KLineModel(line).toString)
list.add(new KLineModel(line));
}
}
Collections.reverse(list);
import scala.collection.JavaConverters
import scala.collection.Seq
import spark.implicits._
// List 转 Seq
val tmpSeq = JavaConverters.asScalaIteratorConverter(list.iterator).asScala.toSeq
sinaStockStream.close()
val mySparkRdd = spark.sparkContext.parallelize(tmpSeq);
val top2Rdd = mySparkRdd.take(20)
val rateRdd = top2Rdd.map(a => (a.dateStr, (a.highPrice - a.lowPrice)/a.openPrice * 100))
//过滤每天振幅大于几个点的
val resultRdd = rateRdd.filter(_._2 >= 4)
println(resultRdd.toBuffer)
val wordPairs = resultRdd.map(word => (word, 1))
val wordCounts = wordPairs.length / 20.0f;
println("wordCounts: " + wordCounts)

}

}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注