spark实例6:kafka生产者和消费者实例
mvn配置:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
生产者代码:
import java.util.Properties
import com.alibaba.fastjson.JSONObject
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import scala.util.Random
object OrderProducer {
def main(args: Array[String]): Unit = {
//Kafka参数设置
val topic = "order"
val brokers = "192.168.0.219:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
//创建生产者
val producer = new Producer[String, String](kafkaConfig)
while (true) {
//随机生成10以内ID
val id = Random.nextInt(10)
//创建订单成交事件
val event = new JSONObject();
//商品ID
event.put("id", id)
//商品成交价格
event.put("price", Random.nextInt(10000))
//发送信息
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
//随机暂停一段时间
Thread.sleep(Random.nextInt(100))
}
}
}
消费者代码:
import com.alibaba.fastjson.JSON
import kafka.click.RedisClient
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* Created by Administrator on 2017/8/30.
*/
object OrderConsumer {
//Redis配置
val dbIndex = 0
//每件商品总销售额
val orderTotalKey = "app::order::total"
//每件商品每分钟销售额
val oneMinTotalKey = "app::order::product"
//总销售额
val totalKey = "app::order::all"
def main(args: Array[String]): Unit = {
// 创建 StreamingContext 时间片为1秒
val conf = new SparkConf().setMaster("local").setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(1))
// Kafka 配置
val topics = Set("order")
val brokers = "192.168.0.219:9092"
/*val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder")
*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.0.219:9092,192.168.0.220:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// val topics = Array("topicA", "topicB")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// 创建一个 direct stream
// val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topics)
//解析JSON
// val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line._2)))
val events = kafkaStream.flatMap(line => Some(JSON.parseObject(line.value())))
// 按ID分组统计个数与价格总合
val orders = events.map(x => (x.getString("id"), x.getLong("price"))).groupByKey().map(x => (x._1, x._2.size, x._2.reduceLeft(_ + _)))
//输出
orders.foreachRDD(x =>
x.foreachPartition(partition =>
partition.foreach(x => {
println("id=" + x._1 + " count=" + x._2 + " price=" + x._3)
//保存到Redis中
val jedis = RedisClient.pool.getResource
jedis.auth("123456")
jedis.select(dbIndex)
//每个商品销售额累加
jedis.hincrBy(orderTotalKey, x._1, x._3)
//上一分钟第每个商品销售额
jedis.hset(oneMinTotalKey, x._1.toString, x._3.toString)
//总销售额累加
jedis.incrBy(totalKey, x._3)
RedisClient.pool.returnResource(jedis)
})
))
ssc.start()
ssc.awaitTermination()
}
}
