gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Spark

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Spark"
  • ( 页面6 )
Spark, 数据挖掘 11月 17,2020

Spark道路拥堵模式聚类案例

从基础的道路通行状态统计、分析、聚类等维度开展对某个城市道路拥堵情况的分析和研究。

13.3.2 数据预处理根据给定的某地图路况数据,首先进行数据预处理工作,清洗原始数据并去除缺失数据、错误数据,根据道路ID进行数据汇集,计算拥堵指数。1)清除缺失数据:清除字段为空记录;

2)清除错误数据:清除字段错误记录;

3)根据道路ID进行道路拥堵指数聚合;

4)根据时间进行道路拥堵指数排序。

13.3.3 特征构建仍然以半小时为最小时间粒度(每日24小时划分为48维时间片),并对道路拥堵指数按时间片进行聚合计算,同时按照48维时间片进行拥堵指数排列。具体处理过程以及代码如下:

package com.koala.ch13

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import java.util.Calendar

import breeze.linalg.Counter
import org.apache.log4j.{Level, Logger}

object CrowdModel {

  def main(args: Array[String]){
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 2) {
      System.err.println("Usage:CrowdModel <InPath> <OutPut> <Model>")
      System.exit(1)
    }

    // 2rd_data/ch13/user_location_sample.txt output/ch13/CrowdModel local[2]
    val Array(input,output,mode) = args

    //初始化SparkContext
    val conf = new SparkConf()
      .setMaster(mode)//.setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    // 位置筛选
    // 清洗数据,通过split(",")切分数据,得到 User_id Time_stamp Cell_id三个维度的数据列表。
    // (Time_stamp,Cell_id,User_id)-> (User_id,Time_stamp,Cell_id)
    // 20160501055052,209059,898
    val data = sc.textFile(input).map(_.split(",")).map {
      x => (x(2), x(0), x(1))
    }
    //data.coalesce(1).saveAsTextFile(output)

    // 根据Time_stamp分析当前日期为工作日或节假日,并添加time标签标识HH:mm,work_flag标签标识工作日(work_falg=1)或节假日(work_flag=0)
    // 输出:(User_id,work_flag,date_time,Cell_id)
    val preData = data.map {
      case (preUser_id, preTime_stamp, preCell_id) => {

        //将日期转变成星期,获取工作日(星期一至星期五)和非工作日(星期六、星期日)
        // 20160501 055052
        val sdf = new SimpleDateFormat("yyyyMMddHHmmss") //24小时工作制
        val date = sdf.parse(preTime_stamp)
        val cal = Calendar.getInstance
        cal.setTime(date)
        var w = cal.get(Calendar.DAY_OF_WEEK) - 1
        // 工作日默认为1 非工作日默认为0
        var work_flag = 1
        if (w <= 0 || w >= 6) {
          work_flag = 0
        }

        // 按照30分钟间隔处理时间
        val time_ = preTime_stamp.substring(8, 12)
        // 截取指定位置的元素,前包括后不包括
        var minute_ = "00"
        if (time_.substring(2).toInt >= 30) {
          minute_ = "30"
        }
        val date_time = time_.toString.substring(0, 2) + minute_
        ((preUser_id, work_flag, date_time, preCell_id), 1)
      }
    }
    //preData.coalesce(1).saveAsTextFile(output)

    //使用reduceByKey(_+_)对(User_id,work_flag,date_time,Cell_id)访问次数进行聚合,根据聚合结果,选择用户某段时间在30分钟内划分访问次数最多的基站为标准访问地点。
    val aggData = preData.reduceByKey(_ + _)
      .map { x => ((x._1._1, x._1._2, x._1._3), (x._1._4, x._2)) }
      .reduceByKey((a, b) => if (a._2 > b._2) a else b)//选取访问次数最多的cell
    //aggData.coalesce(1).saveAsTextFile(output)

    //获取用户工作日24小时访问地点cell_id、节假日24小时访问地点cell_id,以30分钟为最小时间粒度划分时间片,得到user_id工作日48维时间片访问cell_id和节假日48维时间片访问cell_id,共计96维时间片。
    //(User_id,work_flag,date_time),(Cell_id,nums)->(User_id,work_flag),(date_time,Cell_id)

    val slotData = aggData.map { x => ((x._1._1, x._1._2), (x._1._3 + ":" + x._2._1)) }.reduceByKey(_ + ";" + _)
    //slotData.coalesce(1).saveAsTextFile(output)

    // 位置编码
    // 根据聚合结果,提取所有用户访问的基站进行重新编码,获得用户访问位置列表cell_id,并进行排序去重
    // (User_id,work_flag,date_time),(Cell_id,nums)
    val minCell = aggData.map(x => x._2._1).sortBy(x=>x.toLong,true).collect().distinct
    println(minCell.toList)
    //使用zip方法从1开始对用户访问地址进行编码,并将编码进行保存。
    val index_list = minCell.zip(Stream from 1).toMap
    println(index_list)
    //得到的index_list即是用户访问位置编码特征向量。
  }
}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object CleanCongestionData {

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 2) {
      System.err.println("Usage:CleanCongestionData <InPath> <OutPut> <Model>")
      System.exit(1)
    }

    // 2rd_data/ch13/road_congestion_sample.txt output/ch13/CongestionModel local[2]
    val Array(input,output,mode) = args

    // 初始化SparkContext
    val conf = new SparkConf()
      .setMaster(mode)//.setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    // 计算link的拥堵情况,指定道路、工作日状态、时间片时,link拥堵指数的平均值(四舍五入)取整,
    // key (linkid, work_flag, hour) value (congestion)
    // 85349482;1;20.5;1
    val data = sc.textFile(input).map(_.split(";"))
      .map {x => ((x(0),x(1),x(2)),x(3))}
      .groupByKey().mapValues(x=>{
        val a = x.toList.reduceLeft((sum,i)=>sum +i)//拥堵指数求和
        val b = x.toList.length
        Math.round(a.toInt/b)//平均拥堵指数
      })
    //data.coalesce(1).saveAsTextFile(output)

    // 根据key聚合数据后,使用hour 进行排序 并删除hour数据
    // key (linkid,work_flag, hour) value (congestion)->(linkid) value(work_flag,congestion)
    val collectData = data.sortBy(x=>x._1._3).map(x => ((x._1._1),(x._1._2+":"+x._2))).reduceByKey(_ + ";" + _)
    collectData.coalesce(1).saveAsTextFile(output)
  }
}
作者 east
Spark 10月 26,2020

Spark Streaming调优经验

Spark Streaming调优

操作场景

Streaming作为一种mini-batch方式的流式处理框架,它主要的特点是:秒级时延和高吞吐量。因此Streaming调优的目标:在秒级延迟的情景下,提高Streaming的吞吐能力,在单位时间处理尽可能多的数据。

说明:

本章节适用于输入数据源为Kafka的使用场景。

操作步骤

一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。

对Streaming调优,就必须使该三个部件的性能都最优化。

  • 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点:
    • 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。
    • 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。
    详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html
  • 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API:
    • KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。
    • ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。
    • DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。
    从实现上来看,DirectKafka的性能会是最好的,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html
  • 处理器调优 Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Streaming之中,例如:
    • 数据序列化
    • 配置内存
    • 设置并行度
    • 使用External Shuffle Service提升性能
    说明: 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。
  • 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。
作者 east
Spark 10月 26,2020

Spark Core调优经验

使用mapPartitions,按每个分区计算结果

如果每条记录的开销太大,例:

rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}

则可以使用MapPartitions,按每个分区计算结果,如

rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
write(item.toString); conn.close)

使用mapPartitions可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。

使用coalesce调整分片的数量

coalesce可以调整分片的数量。coalesce函数有两个参数:

coalesce(numPartitions: Int, shuffle: Boolean = false)

当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。

遇到下列场景,可选择使用coalesce算子:

  • 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。
  • 当输入切片个数太大,导致程序无法正常运行时使用。
  • 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。

localDir配置

Spark的Shuffle过程需要写本地磁盘,Shuffle是Spark性能的瓶颈,I/O是Shuffle的瓶颈。配置多个磁盘则可以并行的把数据写入磁盘。如果节点中挂载多个磁盘,则在每个磁盘配置一个Spark的localDir,这将有效分散Shuffle文件的存放,提高磁盘I/O的效率。如果只有一个磁盘,配置了多个目录,性能提升效果不明显。

Collect小数据

大数据量不适用collect操作。

collect操作会将Executor的数据发送到Driver端,因此使用collect前需要确保Driver端内存足够,以免Driver进程发生OutOfMemory异常。当不确定数据量大小时,可使用saveAsTextFile等操作把数据写入HDFS中。只有在能够大致确定数据大小且driver内存充足的时候,才能使用collect。

使用reduceByKey

reduceByKey会在Map端做本地聚合,使得Shuffle过程更加平缓,而groupByKey等Shuffle操作不会在Map端做聚合。因此能使用reduceByKey的地方尽量使用该算子,避免出现groupByKey().map(x=>(x._1,x._2.size))这类实现方式。

广播map代替数组

当每条记录需要查表,如果是Driver端用广播方式传递的数据,数据结构优先采用set/map而不是Iterator,因为Set/Map的查询速率接近O(1),而Iterator是O(n)。

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。

优化数据结构

  • 把数据按列存放,读取数据时就可以只扫描需要的列。
  • 使用Hash Shuffle时,通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能。最终文件数为reduce tasks数目。
作者 east
Kafka, Spark 9月 5,2020

Spark Streaming读取kafka直连案例

Spark Streaming读取kafka有2种方式:基于Receiver的方式和 基于Direct的方式 。 Direct 更高效, 负责追踪消费的offset 可以用redis或mysql来保存。

 
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2){ System.err.println( s""" |DirectKafka
| is a list of one or more kafka brokers
| is a list of one or more kafka topics
""".stripMargin)
System.exit(1)
}


/*启动zookeeper
cd /root/kafka/kafka_2.11-1.0.0/bin
./zookeeper-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/zookeeper.properties

启动kafka
cd /root/kafka/kafka_2.11-1.0.0/bin
./kafka-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/server.properties

启动kafka生产者
./kafka-console-producer.sh --broker-list master:9092 --topic kafka_test

提交任务
spark-submit --class com.kafka.DirectKafka \
--master spark://master:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/root/datafile/SparkStreamingKafka-1.0-SNAPSHOT-jar-with-dependencies.jar \
master:9092 kafka_test
*/

val sparkConf = new SparkConf().setAppName("SparkStreaming-DirectKafka")
val sc = new SparkContext(sparkConf)
val Array(brokers, topics) = args
val ssc = new StreamingContext(sc, Seconds(2))
val topicset = topics.split(",").toSet
val KafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, KafkaParams, topicset)
directKafkaStream.print()
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(._2) .flatMap(.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.take(10).foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}


import com.alibaba.fastjson.JSON
import main.scala.until.dataschema
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import scalikejdbc.config.{DBs, DBsWithEnv}
import scalikejdbc._
import main.scala.until.ParamsUtils
import main.scala.until.SparkUtils

object readkafka {
  def main(args: Array[String]): Unit = {
    if (args.length != 1){
      System.err.println("please input data args")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SparkStreaming-test")
      .setMaster("local[*]")
      .set("spark.testing.memory","2147480000")
    val sc = new SparkContext(sparkConf)

//topic : spark_example_topic , countly_event ,countly_imp
//broker : 172.31.2.6:9292,172.31.2.7:9292,172.31.2.8:9292

//    val ssc = new StreamingContext(sc, Seconds(2))
    val ssc = new StreamingContext(sc, Seconds(2))

    val messages = new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)
    messages.print()

//    SparkUtils.apply(null).getDirectStream()

//-------------------------------------------------------------------------------

    // messages 从kafka获取数据,将数据转为RDD
    messages.foreachRDD((rdd, batchTime) => {
      import org.apache.spark.streaming.kafka.HasOffsetRanges
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges   // 获取偏移量信息
      /**
        * OffsetRange 是对topic name,partition id,fromOffset(当前消费的开始偏移),untilOffset(当前消费的结束偏移)的封装。
        * *  所以OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移
        */
      println("===========================> count: " + rdd.map(x => x + "1").count())
     // offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      for (offset <- offsetRanges) {
        // 遍历offsetRanges,里面有多个partition
        println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset)
        DBs.setupAll()
        // 将partition及对应的untilOffset存到MySQL中
        val saveoffset = DB localTx {
          implicit session =>
           sql"DELETE FROM offsetinfo WHERE topic = ${offset.topic} AND partitionname = ${offset.partition}".update.apply()
            sql"INSERT INTO offsetinfo (topic, partitionname, untilOffset) VALUES (${offset.topic},${offset.partition},${offset.untilOffset})".update.apply()
        }
      }
    })

    // 处理从kafka获取的message信息
    val parameter = messages.flatMap(line => {
      //获取服务端事件日期 reqts_day
      val reqts_day = try {
        new DateTime(JSON.parseObject(line._2).getJSONObject("i").getLong("timestamp") * 1000).toDateTime.toString("yyyy-MM-dd HH:mm:ss")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //获取 设备号
      val cookieid = try {
        JSON.parseObject(line._2).getJSONObject("d").get("d")    //将Json字符串转化为相应的对象  .getString("kid")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //组合成一个字符串
      val data = reqts_day + "##" + cookieid
      Some(data)       //some是一定有值的, some.get获取值,如果没有值,会报异常
    }).map(_.split("##")).map(x => (x(0),x(1)))

    println("------------------")

    parameter.foreachRDD{ rdd =>

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      // 转换成DataFrame
      val SaveParameter = rdd.map(w => dataschema(w._1.toString,w._2.toString)).toDF("data_date","cookies_num")
      // 注册视图
      SaveParameter.createOrReplaceTempView("dau_tmp_table")
      val insertsql =sqlContext.sql("select * from dau_tmp_table")
      insertsql.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/userprofile_test","dau_tmp_table",ParamsUtils.mysql.mysqlProp)

    }

    messages.print()
    ssc.start()
    ssc.awaitTermination()

  }
}
作者 east
Spark, 大数据开发 6月 25,2020

大数据利用基站或GPS推断是家和工作地

如果有某个手机用户的GPS轨迹或基站,是可以利用大数据来推断他的家和工作地。

思路应该从时空2个维度进行考虑:

1、从时间上考虑:对于一般人来说,一天最早的出发地通常是家,晚上最后的目的地通常是家。而上班的地方,普通是6-10点出发的,在17点到22点后回来的,工作地通常是停留时间很长。(可以把轨迹按天归类,并按每天时间排序,从而计算最早、最晚、白天停留时间最长的地点)





2、从空间上考虑:家和工作点应该是2个不同的聚类中心。 可以利用轨迹绘制集群中每个集群中GPS或基站数据点的时间分布。 应该可以推断出从早上9点到晚上18点,用户停留在集群1区域,而在午夜到早上8点,用户倾向于留在集群2。从而大概率推断出集群1是家,集群2是工作地(可以用 DBSCAN算法来识别此数据集中的聚类。 DBSCAN是一种聚类算法,对于聚类具有许多异常值的空间数据特别有用 )

作者 east
Spark 7月 7,2019

idea开发spark配置问题

问题1:scala版本跟spark版本不一致

使用maven方式,注意切注意spark与scala有版本对应关系, 详情参考Spark官网相关说明:https://spark.apache.org/docs/latest/index.html
scala版本还要跟工程配置Library添加的Scala版本一致。

问题2:更新依赖等半天没更新完


在pom.xml中添加maven 依赖包时,我就发现不管是否用了翻墙,下载速度都好慢,就1M的东西能下半天,很是苦恼,于是到网上搜资料,然后让我查到了。说是使用阿里的maven镜像就可以了。我于是亲自试了下,速度快的飞起!!!
右键项目选中maven选项,然后选择“open settings.xml”或者 “create settings.xml”,然后把如下代码粘贴进去就可以了。重启IDE,感受速度飞起来的感觉吧!!!
<?xml version=”1.0″ encoding=”UTF-8″?><settings xmlns=”http://maven.apache.org/SETTINGS/1.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd”> <mirrors> <!– mirror | Specifies a repository mirror site to use instead of a given repository. The repository that | this mirror serves has an ID that matches the mirrorOf element of this mirror. IDs are used | for inheritance and direct lookup purposes, and must be unique across the set of mirrors. | <mirror> <id>mirrorId</id> <mirrorOf>repositoryId</mirrorOf> <name>Human Readable Name for this Mirror.</name> <url>http://my.repository.com/repo/path</url> </mirror> –>
<mirror> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror>
<mirror> <id>uk</id> <mirrorOf>central</mirrorOf> <name>Human Readable Name for this Mirror.</name> <url>http://uk.maven.org/maven2/</url> </mirror>
<mirror> <id>CN</id> <name>OSChina Central</name> <url>http://maven.oschina.net/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror>
<mirror> <id>nexus</id> <name>internal nexus repository</name> <!– <url>http://192.168.1.100:8081/nexus/content/groups/public/</url>–> <url>http://repo.maven.apache.org/maven2</url> <mirrorOf>central</mirrorOf> </mirror>
</mirrors></settings>

作者 east
Spark 5月 7,2019

解决Spark读取CSV文件中文乱码的完整例子

park.read.option(“header”,”true”).csv(path) 的默认方法,如果读取的源数据是utf-8k中文的,能正常显示,但如果Spark读取带有GBK或GB2312等中文编码的话,就会有Spark GBK乱码或Spark GB2312乱码。下面示例一个完整例子,如果Spark读取不是GBK编码的,只需要替换下面的中文编码。

import java.sql.DriverManager

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StructField, StructType, _}

import scala.collection.mutable.ArrayBuffer


/**
* 通过友盟统计1天访问明细
*/
object UmengRangfeiSQL {
def main(arg: Array[String]): Unit = {
val spark = SparkSession.builder().appName(“UmengRangfeiSQL”).master(“local[*]”).getOrCreate(); //为读取的数据创建schema
// println(System.getProperty(“file.encoding”))
// val pps = System.getProperties
// pps.setProperty(“file.encoding”, “GB2312”)
val taxiSchema = StructType(Array(
StructField(“PageUrl”, StringType, true),
StructField(“PV”, IntegerType, true),
StructField(“UV”, IntegerType, true),
StructField(“IP”, IntegerType, true),
StructField(“PageViews”, DoubleType, true),
StructField(“Output PV”, IntegerType, true),
StructField(“Stay Time”, StringType, true)
))
val path = “E:\\newcode\\MyFirstProject\\data\\rangfei”
//跳过第一行的标题 .option(“header”,”true”)
// val data = spark.read.option(“header”,”true”).schema(taxiSchema).csv(path)

val mySchema = new ArrayBuffer[String]();
mySchema.append(“PageUrl”);
mySchema.append(“PV”);
mySchema.append(“UV”);
mySchema.append(“IP”);
mySchema.append(“PageViews”);
mySchema.append(“Output PV”);
mySchema.append(“Stay Time”);

val data = readCSV(spark, “TRUE”, mySchema,”GBK”, path)
data.show()




data.createTempView(“umng_rangfei”)
val df = data.toDF()

df.persist()

//按受欢迎的分类倒序排列
val resultRdd = df.sqlContext.sql(“select * from umng_rangfei order by PageViews DESC”)
resultRdd.show()

//过虑查找深度好文
val haowenRdd = df.sqlContext.sql(“select * from umng_rangfei WHERE PageUrl LIKE ‘%haowen%’ AND PV > 100 order by PageUrl DESC”)
haowenRdd.show()

spark.sparkContext.hadoopConfiguration.setBoolean(“mapreduce.input.fileinputformat.input.dir.recursive”, true)
deleteOutPutPath(spark.sparkContext,”E:\\newcode\\MyFirstProject\\data\\output\\haowen”)


//加上repartition来控制只有1个输出文件
haowenRdd.repartition(1).write.format(“com.databricks.spark.csv”).save(“E:\\newcode\\MyFirstProject\\data\\output\\haowen”)

val womanRdd = df.sqlContext.sql(“select * from umng_rangfei WHERE PageUrl LIKE ‘%/woman/?p=%’ AND PV > 100 order by PageUrl DESC”)
womanRdd.show()

deleteOutPutPath(spark.sparkContext,”E:\\newcode\\MyFirstProject\\data\\output\\woman”)

//加上repartition来控制只有1个输出文件
womanRdd.repartition(1).write.format(“com.databricks.spark.csv”).save(“E:\\newcode\\MyFirstProject\\data\\output\\woman”)



}

/**
* 删除文件夹或文件
*
@param sc
* @param outputPath
*/
def deleteOutPutPath(sc: SparkContext,outputPath: String):Unit={
val path = new Path(outputPath)
val hadoopConf = sc.hadoopConfiguration
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
if(hdfs.exists(path)){
hdfs.delete(path,true)
}
}



def readCSV(spark:SparkSession,headerSchema:String,mySchema: ArrayBuffer[String],code:String,file:String) ={
val rddArr:RDD[Array[String]] = spark.sparkContext.hadoopFile(file, classOf[TextInputFormat],
classOf[LongWritable], classOf[Text]).map(
pair => new String(pair._2.getBytes, 0, pair._2.getLength, code))
//处理同一个单元格 同时出现 引号 逗号串列问题 切割
.map(_.trim.split(“,(?=([^\”]*\”[^\”]*\”)*[^\”]*$)”,-1))
val fieldArr = rddArr.first()
//Row.fromSeq(_) 如果只是 map(Row(_)),会导致 spark.createDataFrame(rddRow,schema)错误
val rddRow = rddArr.filter(!_.reduce(_+_).equals(fieldArr.reduce(_+_))).map(Row.fromSeq(_))
val schemaList = ArrayBuffer[StructField]()
if(“TRUE”.equals(headerSchema)){
for(i <- 0 until fieldArr.length){
println(“fieldArr(i)=” + fieldArr(i))
schemaList.append(StructField(mySchema(i),DataTypes.StringType))
}
}else{
for(i <- 0 until fieldArr.length){
schemaList.append(StructField(s”_c$i”,DataTypes.StringType))
println(“fieldArr(i)=” + fieldArr(i))
}
}
val schema = StructType(schemaList)
spark.createDataFrame(rddRow,schema)
}




}

作者 east
Spark 5月 7,2019

解决Spark读取文本中文乱码的完整例子

spark.read.textFile() 的默认方法,如果读取的源数据是utf-8k中文的,能正常显示,但如果带有GBK或GB2312等中文编码的话,就会有乱码。下面示例一个完整例子,如果不是GBK编码的,只需要替换下面的中文编码。



//导入隐饰操作,否则RDD无法调用toDF方法


object ExcelStockEarn extends AppConf {

def main(args: Array[String]): Unit = {
readExcel;
}

def readExcel = {


import spark.implicits._


import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
val path = "E:\\newcode\\MyFirstProject\\data\\stockearn"
val inputRdd = spark.sparkContext.hadoopFile(path, classOf[TextInputFormat],
classOf[LongWritable], classOf[Text]).map(
pair => new String(pair._2.getBytes, 0, pair._2.getLength, "GBK"))


//我们要统计脏数据的数量,所以我在这里定义了一个计数器
val accum = spark.sparkContext.longAccumulator("Error Accum")

val listRdd =inputRdd
.map({
line =>
val fields = line.split("\\s+")
if(fields.length == 14) {
CaseFlow(fields(0).toString, fields(1).toString, fields(2).toString, fields(3).toInt, fields(4).toDouble, fields(5).toString, fields(6).toDouble, fields(7).toDouble,
fields(8).toString, fields(9).toString, fields(10).toString, fields(11).toString, fields(12).toString, "")
}else{
accum.add(1L)
CaseFlow(fields(0).toString, "", "", 0, 0, "", 0, 0, "", "", "", "", "", "")
}

})
val dataRdd = listRdd.filter(_.stockCode.length > 1).toDF()

dataRdd.createTempView("option_stock")
val df = dataRdd.toDF()
df.persist()

val resultRdd = df.sqlContext.sql("select * from option_stock ")
resultRdd.show();

val groupRdd = df.sqlContext.sql("select stockCode, SUM(dealAmount) from option_stock group by StockCode order by StockCode")
groupRdd.show();

}




case class CaseFlow(dealDate : String ,stockCode : String , stockName: String , dealNum : Int , dealPrice : Double, dealContent : String,
dealAmount :Double, remainAmount: Double, standby1: String, standby2: String, standby3: String, standby4: String, standby5: String,standby6: String);

}
作者 east
Spark 5月 6,2019

Spark读取文件来统计股票资金流水

数据格式:

操作代码:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.sql.types._

import scala.io.Source
//导入隐饰操作,否则RDD无法调用toDF方法


object ExcelStockEarn extends AppConf {

def main(args: Array[String]): Unit = {
readExcel;
}

def readExcel = {


import spark.implicits._
val path = "E:\\newcode\\MyFirstProject\\data\\stockearn"

//我们要统计脏数据的数量,所以我在这里定义了一个计数器
val accum = spark.sparkContext.longAccumulator("Error Accum")

val listRdd = spark.read.textFile(path).rdd
.map({
line =>
val fields = line.split("\\s+")
if(fields.length == 14) {
CaseFlow(fields(0).toString, fields(1).toString, fields(2).toString, fields(3).toInt, fields(4).toDouble, fields(5).toString, fields(6).toDouble, fields(7).toDouble,
fields(8).toString, fields(9).toString, fields(10).toString, fields(11).toString, fields(12).toString, "")
}else{
accum.add(1L)
CaseFlow(fields(0).toString, "", "", 0, 0, "", 0, 0, "", "", "", "", "", "")
}

}).toDF();


listRdd.createTempView("option_stock")
val df = listRdd.toDF()
df.persist()

val resultRdd = df.sqlContext.sql("select * from option_stock ")
resultRdd.show();

val groupRdd = df.sqlContext.sql("select stockCode, SUM(dealAmount) from option_stock group by StockCode order by StockCode")
groupRdd.show();

}


case class CaseFlow(dealDate : String ,stockCode : String , stockName: String , dealNum : Int , dealPrice : Double, dealContent : String,
dealAmount :Double, remainAmount: Double, standby1: String, standby2: String, standby3: String, standby4: String, standby5: String,standby6: String);

}
作者 east
Spark 4月 25,2019

Spark清洗数据实例-评分数据

 
数据清洗, 是大数据分析过程中重要的环节,主要作用是去除不需要的数据,填充缺失内容,确定缺失值的范围并制定好应对策略。


/**
* 接收用户的评分信息
*/
case class UserRating(userId:Int, movieId:Int, rating:Double)

import com.zxl.caseclass.{UserRating, Users}
import com.zxl.conf.AppConf
import com.zxl.datacleaner.UserETL._
import org.apache.spark.sql.SaveMode

/**
* 数据格式如下
* 1,1193,5,978300760
* 1,661,3,978302109
* 1,914,3,978301968
* 1,3408,4,978300275
* 1,2355,5,978824291
* 1,1197,3,978302268
* 1,1287,5,978302039
* 1,2804,5,978300719
* 1,594,4,978302268
* 1,919,4,978301368
* 1,595,5,978824268
* 1,938,4,978301752
* 1,2398,4,978302281
* 1,2918,4,978302124
* 1,1035,5,978301753
* 1,2791,4,978302188
* 1,2687,3,978824268
* 1,2018,4,978301777
* 1,3105,5,978301713
* 1,2797,4,978302039
*/
object RatingETL extends AppConf {

def main(args: Array[String]) {

import sqlContext.implicits._

// 2 读取样本数据
// val data_path = "hdfs://movie1:9000/movie/data/ratings.txt"
val data_path = "data/ratings.dat"
val data = sc.textFile(data_path, 8)
val userdata = data.map(_.split(",")).map(f => UserRating(f(0).toInt,f(1).toInt,f(2).toDouble)).cache()

val userDF = userdata.toDF()
// 存储结果至数据库
userDF.write.mode(SaveMode.Append).jdbc(jdbcURL, ratingTable, prop)
}
}
作者 east
Spark 4月 19,2019

Spark分析个股的活跃性

在前文scala获取免费的股票日k线数据 ,本文做进一步扩展,统计一下股票振幅,统计最近20天振幅大于4个点有多少天。对于炒股喜欢短线的人来说,振幅大的股票,越好做T+0操作。


import java.util
import java.util.Collections

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import stock.SinaStock

import scala.io.Source
object KLineAnalyse {
def main(args: Array[String]): Unit = {
println("查询日k线股票 http://data.gtimg.cn/flashdata/hushen/daily/19/sh603000.js")
val sinaStockStream = Source.fromURL("http://data.gtimg.cn/flashdata/hushen/daily/19/sh603000.js","utf-8")
val sinaLines=sinaStockStream.getLines
val spark = SparkSession.builder().appName("kline").master("local[*]").getOrCreate(); //为读取的数据创建schema
//val sc = new SparkContext(SparkUtils.getSparkConf("SequenceFileUsage"))
val list = new util.ArrayList[KLineModel]()
for(line <- sinaLines) { /** 将每行数据解析成SinaStock对象,并答应对应的股票信息 **/
if(line.length > 20) {
// println(new KLineModel(line).toString)
list.add(new KLineModel(line));
}
}
Collections.reverse(list);
import scala.collection.JavaConverters
import scala.collection.Seq
import spark.implicits._
// List 转 Seq
val tmpSeq = JavaConverters.asScalaIteratorConverter(list.iterator).asScala.toSeq
sinaStockStream.close()
val mySparkRdd = spark.sparkContext.parallelize(tmpSeq);
val top2Rdd = mySparkRdd.take(20)
val rateRdd = top2Rdd.map(a => (a.dateStr, (a.highPrice - a.lowPrice)/a.openPrice * 100))
//过滤每天振幅大于几个点的
val resultRdd = rateRdd.filter(_._2 >= 4)
println(resultRdd.toBuffer)
val wordPairs = resultRdd.map(word => (word, 1))
val wordCounts = wordPairs.length / 20.0f;
println("wordCounts: " + wordCounts)

}

}
作者 east
Spark 4月 18,2019

scala获取免费的股票日k线数据

接口的的抓取使用了Scala标准库的Source


class KLineModel {
var dateStr ="";
var openPrice = 0f;
var closePrice = 0f;
var highPrice = 0f;
var lowPrice = 0f;

private var stockInfo :String =""

def this(stockInfo:String)
{
this()
this.stockInfo=stockInfo /** 根据腾讯的数据接口解析数据 **/
val stockDetail=stockInfo.split(Array(' ',' ',' ',' ',' '))
if (stockDetail.length>4){
this.dateStr=stockDetail(0)
this.openPrice=stockDetail(1).toFloat
this.closePrice =stockDetail(2).toFloat
this.highPrice=stockDetail(3).toFloat
this.lowPrice =stockDetail(4).toFloat

}
}


override def toString = s"KLineModel($dateStr, $openPrice, $closePrice, $highPrice, $lowPrice)"
import scala.io.Source
object KLineAnalyse {
def main(args: Array[String]): Unit = {
println("查询日k线股票 http://data.gtimg.cn/flashdata/hushen/daily/19/sh603000.js")
val sinaStockStream = Source.fromURL("http://data.gtimg.cn/flashdata/hushen/daily/19/sh603000.js","utf-8")
val sinaLines=sinaStockStream.getLines
for(line <- sinaLines) { /** 将每行数据解析成SinaStock对象,并答应对应的股票信息 **/
if(line.length > 20) {
println(new KLineModel(line).toString)
}
}
sinaStockStream.close()
}

}
作者 east

上一 1 … 5 6 7 … 9 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.