spark实例6:kafka生产者和消费者实例

mvn配置:

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.0</version>
  </dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.54</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>


生产者代码:
import java.util.Properties
import com.alibaba.fastjson.JSONObject
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.util.Random

object OrderProducer {
def main(args: Array[String]): Unit = {

//Kafka参数设置
val topic = "order"
val brokers = "192.168.0.219:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
//创建生产者
val producer = new Producer[String, String](kafkaConfig)

while (true) {
//随机生成10以内ID
val id = Random.nextInt(10)
//创建订单成交事件
val event = new JSONObject();
//商品ID
event.put("id", id)
//商品成交价格
event.put("price", Random.nextInt(10000))

//发送信息
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
//随机暂停一段时间
Thread.sleep(Random.nextInt(100))
}
}
}


消费者代码:

import com.alibaba.fastjson.JSON
import kafka.click.RedisClient
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * Created by Administrator on 2017/8/30.
  */
object OrderConsumer {
  //Redis配置
  val dbIndex = 0
  //每件商品总销售额
  val orderTotalKey = "app::order::total"
  //每件商品每分钟销售额
  val oneMinTotalKey = "app::order::product"
  //总销售额
  val totalKey = "app::order::all"


  def main(args: Array[String]): Unit = {

    // 创建 StreamingContext 时间片为1秒
    val conf = new SparkConf().setMaster("local").setAppName("UserClickCountStat")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Kafka 配置
    val topics = Set("order")
    val brokers = "192.168.0.219:9092"
    /*val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "serializer.class" -> "kafka.serializer.StringEncoder")
    */

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.0.219:9092,192.168.0.220:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

   // val topics = Array("topicA", "topicB")
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    // 创建一个 direct stream
 //   val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topics)

    //解析JSON
  //  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line._2)))
  val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line.value())))

    // 按ID分组统计个数与价格总合
    val orders = events.map(x => (x.getString("id"), x.getLong("price"))).groupByKey().map(x => (x._1, x._2.size, x._2.reduceLeft(_ + _)))

    //输出
    orders.foreachRDD(x =>
      x.foreachPartition(partition =>
        partition.foreach(x => {

          println("id=" + x._1 + " count=" + x._2 + " price=" + x._3)

          //保存到Redis中
          val jedis = RedisClient.pool.getResource
          jedis.auth("123456")
          jedis.select(dbIndex)
          //每个商品销售额累加
          jedis.hincrBy(orderTotalKey, x._1, x._3)
          //上一分钟第每个商品销售额
          jedis.hset(oneMinTotalKey, x._1.toString, x._3.toString)
          //总销售额累加
          jedis.incrBy(totalKey, x._3)
          RedisClient.pool.returnResource(jedis)


        })
      ))


    ssc.start()
    ssc.awaitTermination()
  }

}


关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注