gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 开发博客
  • bug清单
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Kafka

分类归档Kafka

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Kafka"
Kafka, Spark 9月 5,2020

Spark Streaming读取kafka直连案例

Spark Streaming读取kafka有2种方式:基于Receiver的方式和 基于Direct的方式 。 Direct 更高效, 负责追踪消费的offset 可以用redis或mysql来保存。

 
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2){ System.err.println( s""" |DirectKafka
| is a list of one or more kafka brokers
| is a list of one or more kafka topics
""".stripMargin)
System.exit(1)
}


/*启动zookeeper
cd /root/kafka/kafka_2.11-1.0.0/bin
./zookeeper-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/zookeeper.properties

启动kafka
cd /root/kafka/kafka_2.11-1.0.0/bin
./kafka-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/server.properties

启动kafka生产者
./kafka-console-producer.sh --broker-list master:9092 --topic kafka_test

提交任务
spark-submit --class com.kafka.DirectKafka \
--master spark://master:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/root/datafile/SparkStreamingKafka-1.0-SNAPSHOT-jar-with-dependencies.jar \
master:9092 kafka_test
*/

val sparkConf = new SparkConf().setAppName("SparkStreaming-DirectKafka")
val sc = new SparkContext(sparkConf)
val Array(brokers, topics) = args
val ssc = new StreamingContext(sc, Seconds(2))
val topicset = topics.split(",").toSet
val KafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, KafkaParams, topicset)
directKafkaStream.print()
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(._2) .flatMap(.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.take(10).foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}


import com.alibaba.fastjson.JSON
import main.scala.until.dataschema
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import scalikejdbc.config.{DBs, DBsWithEnv}
import scalikejdbc._
import main.scala.until.ParamsUtils
import main.scala.until.SparkUtils

object readkafka {
  def main(args: Array[String]): Unit = {
    if (args.length != 1){
      System.err.println("please input data args")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SparkStreaming-test")
      .setMaster("local[*]")
      .set("spark.testing.memory","2147480000")
    val sc = new SparkContext(sparkConf)

//topic : spark_example_topic , countly_event ,countly_imp
//broker : 172.31.2.6:9292,172.31.2.7:9292,172.31.2.8:9292

//    val ssc = new StreamingContext(sc, Seconds(2))
    val ssc = new StreamingContext(sc, Seconds(2))

    val messages = new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)
    messages.print()

//    SparkUtils.apply(null).getDirectStream()

//-------------------------------------------------------------------------------

    // messages 从kafka获取数据,将数据转为RDD
    messages.foreachRDD((rdd, batchTime) => {
      import org.apache.spark.streaming.kafka.HasOffsetRanges
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges   // 获取偏移量信息
      /**
        * OffsetRange 是对topic name,partition id,fromOffset(当前消费的开始偏移),untilOffset(当前消费的结束偏移)的封装。
        * *  所以OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移
        */
      println("===========================> count: " + rdd.map(x => x + "1").count())
     // offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      for (offset <- offsetRanges) {
        // 遍历offsetRanges,里面有多个partition
        println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset)
        DBs.setupAll()
        // 将partition及对应的untilOffset存到MySQL中
        val saveoffset = DB localTx {
          implicit session =>
           sql"DELETE FROM offsetinfo WHERE topic = ${offset.topic} AND partitionname = ${offset.partition}".update.apply()
            sql"INSERT INTO offsetinfo (topic, partitionname, untilOffset) VALUES (${offset.topic},${offset.partition},${offset.untilOffset})".update.apply()
        }
      }
    })

    // 处理从kafka获取的message信息
    val parameter = messages.flatMap(line => {
      //获取服务端事件日期 reqts_day
      val reqts_day = try {
        new DateTime(JSON.parseObject(line._2).getJSONObject("i").getLong("timestamp") * 1000).toDateTime.toString("yyyy-MM-dd HH:mm:ss")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //获取 设备号
      val cookieid = try {
        JSON.parseObject(line._2).getJSONObject("d").get("d")    //将Json字符串转化为相应的对象  .getString("kid")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //组合成一个字符串
      val data = reqts_day + "##" + cookieid
      Some(data)       //some是一定有值的, some.get获取值,如果没有值,会报异常
    }).map(_.split("##")).map(x => (x(0),x(1)))

    println("------------------")

    parameter.foreachRDD{ rdd =>

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      // 转换成DataFrame
      val SaveParameter = rdd.map(w => dataschema(w._1.toString,w._2.toString)).toDF("data_date","cookies_num")
      // 注册视图
      SaveParameter.createOrReplaceTempView("dau_tmp_table")
      val insertsql =sqlContext.sql("select * from dau_tmp_table")
      insertsql.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/userprofile_test","dau_tmp_table",ParamsUtils.mysql.mysqlProp)

    }

    messages.print()
    ssc.start()
    ssc.awaitTermination()

  }
}
作者 east

标签

flex布局 github mysql O2O UI控件 不含后台 交流 体育 共享经济 出行 单机类 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 旅游 日历 时钟 流量主 物流 用户系统 电商 画图 画布(canvas) 社交 签到 算命 联网 装修 解锁 评论 读书 读音 资讯 阅读 预订

官方QQ群

1群:74052405

薅羊毛交流群: 952493060

近期文章

  • android http请求带中文参数会乱码(url编码)
  • mysql主从复制
  • 修改my.ini导致启动不了服务
  • Android聊天输入框控件
  • Android直播间刷礼物动画控件
  • Android自定义交易密码框
  • Android记录文件日志工具类
  • Android常用图片操作工具类
  • Android倒计时工具类
  • Android压缩解压工具

文章归档

  • 2021年一月
  • 2020年十二月
  • 2020年十一月
  • 2020年十月
  • 2020年九月
  • 2020年八月
  • 2020年七月
  • 2020年六月
  • 2020年五月
  • 2020年四月
  • 2020年三月
  • 2020年二月
  • 2020年一月
  • 2019年七月
  • 2019年六月
  • 2019年五月
  • 2019年四月
  • 2019年三月
  • 2019年二月
  • 2019年一月
  • 2018年十二月
  • 2018年七月
  • 2018年六月

分类目录

  • Android (30)
  • bug清单 (26)
  • Fuchsia (15)
  • php (1)
  • python (2)
  • 人工智能 (1)
  • 大数据开发 (119)
    • Elasticsearch (6)
    • Flink (3)
    • Hadoop (11)
    • Hbase (8)
    • Hive (1)
    • Java (27)
    • Kafka (1)
    • solr (1)
    • Spark (42)
    • spring (8)
    • 数据仓库 (1)
    • 数据挖掘 (5)
    • 运维 (4)
  • 小游戏代码 (1)
  • 小程序代码 (111)
    • O2O (15)
    • UI控件 (3)
    • 互联网类 (17)
    • 企业类 (5)
    • 地图定位 (9)
    • 多媒体 (5)
    • 工具类 (19)
    • 电商类 (18)
    • 社交 (5)
    • 行业软件 (7)
    • 资讯读书 (7)
  • 开发博客 (5)
  • 数据库 (2)
  • 未分类 (6)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.