Flink 向Kafka生产并消费数据程序
场景说明
假定某个Flink业务每秒就会收到1个消息记录。
基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。
数据规划
Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。
- 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
- 创建Topic。 创建topic的命令格式: bin/kafka-topics.sh –create –zookeeper {zkQuorum}/kafka –partitions {partitionNum} –replication-factor {replicationNum} –topic {Topic} 表1 参数说明 参数名 说明 {zkQuorum} ZooKeeper集群信息,格式为IP:port。 {partitionNum} topic的分区数。 {replicationNum} topic中每个partition数据的副本数。 {Topic} topic名称。 示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002,Topic名称为topic1的数据为例。bin/kafka-topics.sh –create –zookeeper 10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002/kafka –partitions 5 –replication-factor 1 –topic topic1
开发思路
- 启动Flink Kafka Producer应用向Kafka发送数据。
- 启动Flink Kafka Consumer应用从Kafka接收数据,保证topic与producer一致。
- 在数据内容中增加前缀并进行打印。
Java样例代码
功能介绍
在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。
代码样例
下面列出producer和consumer主要逻辑代码作为演示。
完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka
//producer代码
public class WriteIntoKafka {
public static void main(String[] args) throws Exception {
// 打印出执行flink run的参考命令
System.out.println("use command as: ");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
" /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005");
System.out.println("******************************************************************************************");
System.out.println("<topic> is the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of brokers");
System.out.println("******************************************************************************************");
// 构造执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发度
env.setParallelism(1);
// 解析运行参数
ParameterTool paraTool = ParameterTool.fromArgs(args);
// 构造流图,将自定义Source生成的数据写入Kafka
DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"),
new SimpleStringSchema(),
paraTool.getProperties()));
// 调用execute触发执行
env.execute();
}
// 自定义Source,每隔1s持续产生消息
public static class SimpleStringGenerator implements SourceFunction<String> {
private static final long serialVersionUID = 2174904787118597072L;
boolean running = true;
long i = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
ctx.collect("element-" + (i++));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
//consumer代码
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// 打印出执行flink run的参考命令
System.out.println("use command as: ");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
" /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
System.out.println("******************************************************************************************");
System.out.println("<topic> is the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of brokers");
System.out.println("******************************************************************************************");
// 构造执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发度
env.setParallelism(1);
// 解析运行参数
ParameterTool paraTool = ParameterTool.fromArgs(args);
// 构造流图,从Kafka读取数据并换行打印
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
new SimpleStringSchema(),
paraTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return "Flink says " + s + System.getProperty("line.separator");
}
}).print();
// 调用execute触发执行
env.execute();
}
}
Scala样例代码
功能介绍
在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。
代码样例
下面列出producer和consumer主要逻辑代码作为演示。 完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka
//producer代码
object WriteIntoKafka {
def main(args: Array[String]) {
// 打印出执行flink run的参考命令
System.out.println("use command as: ")
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
" /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005")
System.out.println("******************************************************************************************")
System.out.println("<topic> is the kafka topic name")
System.out.println("<bootstrap.servers> is the ip:port list of brokers")
System.out.println("******************************************************************************************")
// 构造执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并发度
env.setParallelism(1)
// 解析运行参数
val paraTool = ParameterTool.fromArgs(args)
// 构造流图,将自定义Source生成的数据写入Kafka
val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator)
messageStream.addSink(new FlinkKafkaProducer010(
paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
// 调用execute触发执行
env.execute
}
}
// 自定义Source,每隔1s持续产生消息
class SimpleStringGenerator extends SourceFunction[String] {
var running = true
var i = 0
override def run(ctx: SourceContext[String]) {
while (running) {
ctx.collect("element-" + i)
i += 1
Thread.sleep(1000)
}
}
override def cancel() {
running = false
}
}
//consumer代码
object ReadFromKafka {
def main(args: Array[String]) {
// 打印出执行flink run的参考命令
System.out.println("use command as: ")
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
" /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005")
System.out.println("******************************************************************************************")
System.out.println("<topic> is the kafka topic name")
System.out.println("<bootstrap.servers> is the ip:port list of brokers")
System.out.println("******************************************************************************************")
// 构造执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并发度
env.setParallelism(1)
// 解析运行参数
val paraTool = ParameterTool.fromArgs(args)
// 构造流图,从Kafka读取数据并换行打印
val messageStream = env.addSource(new FlinkKafkaConsumer010(
paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
messageStream
.map(s => "Flink says " + s + System.getProperty("line.separator")).print()
// 调用execute触发执行
env.execute()
}
}
