spark实例6:kafka生产者和消费者实例
mvn配置:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.0</version> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency>
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
生产者代码: import java.util.Properties import com.alibaba.fastjson.JSONObject import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import scala.util.Random object OrderProducer { def main(args: Array[String]): Unit = { //Kafka参数设置 val topic = "order" val brokers = "192.168.0.219:9092" val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(props) //创建生产者 val producer = new Producer[String, String](kafkaConfig) while (true) { //随机生成10以内ID val id = Random.nextInt(10) //创建订单成交事件 val event = new JSONObject(); //商品ID event.put("id", id) //商品成交价格 event.put("price", Random.nextInt(10000)) //发送信息 producer.send(new KeyedMessage[String, String](topic, event.toString)) println("Message sent: " + event) //随机暂停一段时间 Thread.sleep(Random.nextInt(100)) } } } 消费者代码:
import com.alibaba.fastjson.JSON import kafka.click.RedisClient import kafka.serializer.StringDecoder import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * Created by Administrator on 2017/8/30. */ object OrderConsumer { //Redis配置 val dbIndex = 0 //每件商品总销售额 val orderTotalKey = "app::order::total" //每件商品每分钟销售额 val oneMinTotalKey = "app::order::product" //总销售额 val totalKey = "app::order::all" def main(args: Array[String]): Unit = { // 创建 StreamingContext 时间片为1秒 val conf = new SparkConf().setMaster("local").setAppName("UserClickCountStat") val ssc = new StreamingContext(conf, Seconds(1)) // Kafka 配置 val topics = Set("order") val brokers = "192.168.0.219:9092" /*val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "192.168.0.219:9092,192.168.0.220:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) // val topics = Array("topicA", "topicB") val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) // 创建一个 direct stream // val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topics) //解析JSON // val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line._2))) val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line.value()))) // 按ID分组统计个数与价格总合 val orders = events.map(x => (x.getString("id"), x.getLong("price"))).groupByKey().map(x => (x._1, x._2.size, x._2.reduceLeft(_ + _))) //输出 orders.foreachRDD(x => x.foreachPartition(partition => partition.foreach(x => { println("id=" + x._1 + " count=" + x._2 + " price=" + x._3) //保存到Redis中 val jedis = RedisClient.pool.getResource jedis.auth("123456") jedis.select(dbIndex) //每个商品销售额累加 jedis.hincrBy(orderTotalKey, x._1, x._3) //上一分钟第每个商品销售额 jedis.hset(oneMinTotalKey, x._1.toString, x._3.toString) //总销售额累加 jedis.incrBy(totalKey, x._3) RedisClient.pool.returnResource(jedis) }) )) ssc.start() ssc.awaitTermination() } }