Flink Job Pipeline程序
场景说明
场景说明
本样例中发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据。
数据规划
- 发布者Job使用自定义算子每秒钟产生10000条数据
- 数据包含两个属性:分别是Int和String类型
- 配置文件
- nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:nettyconnector.registerserver.topic.storage: /flink/nettyconnector
- nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:nettyconnector.sinkserver.port.range: 28444-28943
- nettyconnector.sinkserver.subnet:设置网络所属域,例如:nettyconnector.sinkserver.subnet: 10.162.0.0/16
- 接口说明
- 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */ void start(Configuration configuration) throws Exception; /** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */ void createTopicNode(String topic) throw Exception; /** *将信息注册到某个topic节点(目录)下 * @param topic 需要注册到的目录 * @param registerRecord 需要注册的信息 */ void register(String topic, RegisterRecord registerRecord) throws Exception; /** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception; /** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */ void unregister(String topic, int recordId) throws Exception; /** * 查寻信息 * @param 查询信息所在的topic *@recordId 查询信息的ID */ RegisterRecord query(String topic, int recordId) throws Exception; /** * 查询某个Topic是否存在 * @param topic */ Boolean isExist(String topic) throws Exception; /** *关闭注册服务器句柄 */ void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。
- NettySink算子Class NettySink(String name,
String topic,
RegisterServerHandler registerServerHandler,
int numberOfSubscribedJobs)
- name:为本NettySink的名称。
- topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
- registerServerHandler: 为注册服务器的句柄。
- numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
- NettySource算子Class NettySource(String name,
String topic,
RegisterServerHandler registerServerHandler)
- name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
- topic:订阅的NettySink的topic。
- registerServerHandler:为注册服务器的句柄。

说明:
NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。
开发思路
1. 一个Job作为发布者Job,其余两个作为订阅者Job
2. 发布者Job自己产生数据将其转化成byte[],分别向订阅者发送
3. 订阅者收到byte[]之后将其转化成String类型,并抽样打印输出
Java版代码:
- 发布Job自定义Source算子产生数据
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.io.Serializable; public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable { private boolean isRunning = true; public void open(Configuration configuration) throws Exception { super.open(configuration); } /** * 数据产生函数,每秒钟产生10000条数据 */ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple2.of(i, "hello-" + i)); } Thread.sleep(1000); } } public void close() { isRunning = false; } public void cancel() { isRunning = false; } }
- 发布者代码
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.sink.NettySink; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySink { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置job的并发度为2 env.setBufferTimeout(2); // 创建Zookeeper的注册服务器handler ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); // 添加自定义Source算子 env.addSource(new UserSource()) .keyBy(0) .map(new MapFunction<Tuple2<Integer,String>, byte[]>() { //将发送信息转化成字节数组 @Override public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception { return integerStringTuple2.f1.getBytes(); } }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。 env.execute(); } }
- 第一个订阅者
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource1 { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置job的并发度为2 env.setParallelism(2); // 创建Zookeeper的注册服务器句柄 ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //添加NettySource算子,接收来自发布者的消息 env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { // 将接收到的字节流转化成字符串 @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
- 第二个订阅者
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置作业的并发度为2 env.setParallelism(2); //创建Zookeeper的注册服务器句柄 ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //添加NettySource算子,接收来自发布者的数据 env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { //将接收到的字节数组转化成字符串 @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
Scala样例代码
- 发送消息
package com.huawei.bigdata.flink.examples case class Inforamtion(index: Int, content: String) { def this() = this(0, "") }
- 发布者job自定义source算子产生数据
package com.huawei.bigdata.flink.examples import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{ var isRunning = true override def open(parameters: Configuration): Unit = { super.open(parameters) } // 每秒钟产生10000条数据 override def run(sourceContext: SourceContext[Inforamtion]) = { while (isRunning) { for (i <- 0 until 10000) { sourceContext.collect(Inforamtion(i, "hello-" + i)); } Thread.sleep(1000) } } override def close(): Unit = super.close() override def cancel() = { isRunning = false } }
- 发布者代码
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.sink.NettySink import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ object TestPipeline_NettySink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置job的并发度为2 env.setParallelism(2) //设置Zookeeper为注册服务器 val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //添加用户自定义算子产生数据 env.addSource(new UserSource) .keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组 .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据 env.execute() } }
- 第一个订阅者
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置Job的并发度为2 env.setParallelism(2) //设置Zookeeper作为注册服务器 val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //添加NettySource算子,接收来自发布者的数据 env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(x => (1, new String(x)))//将接收到的字节流转化成字符串 .filter(x => { Random.nextInt(50000) == 10 }) .print env.execute() } }
- 第二个订阅者
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置job的并发度为2 env.setParallelism(2) //创建Zookeeper作为注册服务器 val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //添加NettySource算子,接收数据 env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串 .filter(x=>{ Random.nextInt(50000) == 10 }) .print() env.execute() } }