Spark Streaming读取kafka直连案例

Spark Streaming读取kafka有2种方式:基于Receiver的方式和 基于Direct的方式 。 Direct 更高效, 负责追踪消费的offset 可以用redis或mysql来保存。

 
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2){ System.err.println( s""" |DirectKafka
| is a list of one or more kafka brokers
| is a list of one or more kafka topics
""".stripMargin)
System.exit(1)
}


/*启动zookeeper
cd /root/kafka/kafka_2.11-1.0.0/bin
./zookeeper-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/zookeeper.properties

启动kafka
cd /root/kafka/kafka_2.11-1.0.0/bin
./kafka-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/server.properties

启动kafka生产者
./kafka-console-producer.sh --broker-list master:9092 --topic kafka_test

提交任务
spark-submit --class com.kafka.DirectKafka \
--master spark://master:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/root/datafile/SparkStreamingKafka-1.0-SNAPSHOT-jar-with-dependencies.jar \
master:9092 kafka_test
*/

val sparkConf = new SparkConf().setAppName("SparkStreaming-DirectKafka")
val sc = new SparkContext(sparkConf)
val Array(brokers, topics) = args
val ssc = new StreamingContext(sc, Seconds(2))
val topicset = topics.split(",").toSet
val KafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, KafkaParams, topicset)
directKafkaStream.print()
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(._2) .flatMap(.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.take(10).foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}


import com.alibaba.fastjson.JSON
import main.scala.until.dataschema
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import scalikejdbc.config.{DBs, DBsWithEnv}
import scalikejdbc._
import main.scala.until.ParamsUtils
import main.scala.until.SparkUtils

object readkafka {
  def main(args: Array[String]): Unit = {
    if (args.length != 1){
      System.err.println("please input data args")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SparkStreaming-test")
      .setMaster("local[*]")
      .set("spark.testing.memory","2147480000")
    val sc = new SparkContext(sparkConf)

//topic : spark_example_topic , countly_event ,countly_imp
//broker : 172.31.2.6:9292,172.31.2.7:9292,172.31.2.8:9292

//    val ssc = new StreamingContext(sc, Seconds(2))
    val ssc = new StreamingContext(sc, Seconds(2))

    val messages = new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)
    messages.print()

//    SparkUtils.apply(null).getDirectStream()

//-------------------------------------------------------------------------------

    // messages 从kafka获取数据,将数据转为RDD
    messages.foreachRDD((rdd, batchTime) => {
      import org.apache.spark.streaming.kafka.HasOffsetRanges
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges   // 获取偏移量信息
      /**
        * OffsetRange 是对topic name,partition id,fromOffset(当前消费的开始偏移),untilOffset(当前消费的结束偏移)的封装。
        * *  所以OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移
        */
      println("===========================> count: " + rdd.map(x => x + "1").count())
     // offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      for (offset <- offsetRanges) {
        // 遍历offsetRanges,里面有多个partition
        println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset)
        DBs.setupAll()
        // 将partition及对应的untilOffset存到MySQL中
        val saveoffset = DB localTx {
          implicit session =>
           sql"DELETE FROM offsetinfo WHERE topic = ${offset.topic} AND partitionname = ${offset.partition}".update.apply()
            sql"INSERT INTO offsetinfo (topic, partitionname, untilOffset) VALUES (${offset.topic},${offset.partition},${offset.untilOffset})".update.apply()
        }
      }
    })

    // 处理从kafka获取的message信息
    val parameter = messages.flatMap(line => {
      //获取服务端事件日期 reqts_day
      val reqts_day = try {
        new DateTime(JSON.parseObject(line._2).getJSONObject("i").getLong("timestamp") * 1000).toDateTime.toString("yyyy-MM-dd HH:mm:ss")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //获取 设备号
      val cookieid = try {
        JSON.parseObject(line._2).getJSONObject("d").get("d")    //将Json字符串转化为相应的对象  .getString("kid")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //组合成一个字符串
      val data = reqts_day + "##" + cookieid
      Some(data)       //some是一定有值的, some.get获取值,如果没有值,会报异常
    }).map(_.split("##")).map(x => (x(0),x(1)))

    println("------------------")

    parameter.foreachRDD{ rdd =>

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      // 转换成DataFrame
      val SaveParameter = rdd.map(w => dataschema(w._1.toString,w._2.toString)).toDF("data_date","cookies_num")
      // 注册视图
      SaveParameter.createOrReplaceTempView("dau_tmp_table")
      val insertsql =sqlContext.sql("select * from dau_tmp_table")
      insertsql.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/userprofile_test","dau_tmp_table",ParamsUtils.mysql.mysqlProp)

    }

    messages.print()
    ssc.start()
    ssc.awaitTermination()

  }
}

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注