场景说明
场景说明
本样例中发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据。
数据规划
- 发布者Job使用自定义算子每秒钟产生10000条数据
- 数据包含两个属性:分别是Int和String类型
- 配置文件
- nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:nettyconnector.registerserver.topic.storage: /flink/nettyconnector
- nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:nettyconnector.sinkserver.port.range: 28444-28943
- nettyconnector.sinkserver.subnet:设置网络所属域,例如:nettyconnector.sinkserver.subnet: 10.162.0.0/16
- 接口说明
- 注册服务器接口
注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:public interface RegisterServerHandler {
/**
* 启动注册服务器
* @param configuration Flink的Configuration类型
*/
void start(Configuration configuration) throws Exception;
/**
*注册服务器上创建Topic节点(目录)
* @param topic topic节点名称
*/
void createTopicNode(String topic) throw Exception;
/**
*将信息注册到某个topic节点(目录)下
* @param topic 需要注册到的目录
* @param registerRecord 需要注册的信息
*/
void register(String topic, RegisterRecord registerRecord) throws Exception;
/**
*删除topic节点
* @param topic 待删除topic
*/
void deleteTopicNode(String topic) throws Exception;
/**
*注销注册信息
*@param topic 注册信息所在的topic
*@param recordId 待注销注册信息ID
*/
void unregister(String topic, int recordId) throws Exception;
/**
* 查寻信息
* @param 查询信息所在的topic
*@recordId 查询信息的ID
*/
RegisterRecord query(String topic, int recordId) throws Exception;
/**
* 查询某个Topic是否存在
* @param topic
*/
Boolean isExist(String topic) throws Exception;
/**
*关闭注册服务器句柄
*/
void shutdown() throws Exception;
工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。
- NettySink算子Class NettySink(String name,
String topic,
RegisterServerHandler registerServerHandler,
int numberOfSubscribedJobs)
- name:为本NettySink的名称。
- topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
- registerServerHandler:
为注册服务器的句柄。
- numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
- NettySource算子Class NettySource(String name,
String topic,
RegisterServerHandler registerServerHandler)
- name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
- topic:订阅的NettySink的topic。
- registerServerHandler:为注册服务器的句柄。
说明:
NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。
开发思路
1.
一个Job作为发布者Job,其余两个作为订阅者Job
2.
发布者Job自己产生数据将其转化成byte[],分别向订阅者发送
3. 订阅者收到byte[]之后将其转化成String类型,并抽样打印输出
Java版代码:
- 发布Job自定义Source算子产生数据
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.io.Serializable;
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
private boolean isRunning = true;
public void open(Configuration configuration) throws Exception {
super.open(configuration);
}
/**
* 数据产生函数,每秒钟产生10000条数据
*/
public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
while(isRunning) {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple2.of(i, "hello-" + i));
}
Thread.sleep(1000);
}
}
public void close() {
isRunning = false;
}
public void cancel() {
isRunning = false;
}
}
- 发布者代码
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySink {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置job的并发度为2
env.setBufferTimeout(2);
// 创建Zookeeper的注册服务器handler
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// 添加自定义Source算子
env.addSource(new UserSource())
.keyBy(0)
.map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
//将发送信息转化成字节数组
@Override
public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.f1.getBytes();
}
}).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。
env.execute();
}
}
- 第一个订阅者
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置job的并发度为2
env.setParallelism(2);
// 创建Zookeeper的注册服务器句柄
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的消息
env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
// 将接收到的字节流转化成字符串
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
}
- 第二个订阅者
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置作业的并发度为2
env.setParallelism(2);
//创建Zookeeper的注册服务器句柄
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的数据
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
//将接收到的字节数组转化成字符串
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
}
Scala样例代码
- 发送消息
package com.huawei.bigdata.flink.examples
case class Inforamtion(index: Int, content: String) {
def this() = this(0, "")
}
- 发布者job自定义source算子产生数据
package com.huawei.bigdata.flink.examples
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
var isRunning = true
override def open(parameters: Configuration): Unit = {
super.open(parameters)
}
// 每秒钟产生10000条数据
override def run(sourceContext: SourceContext[Inforamtion]) = {
while (isRunning) {
for (i <- 0 until 10000) {
sourceContext.collect(Inforamtion(i, "hello-" + i));
}
Thread.sleep(1000)
}
}
override def close(): Unit = super.close()
override def cancel() = {
isRunning = false
}
}
- 发布者代码
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
object TestPipeline_NettySink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置job的并发度为2
env.setParallelism(2)
//设置Zookeeper为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加用户自定义算子产生数据
env.addSource(new UserSource)
.keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组
.addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据
env.execute()
}
}
- 第一个订阅者
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置Job的并发度为2
env.setParallelism(2)
//设置Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收来自发布者的数据
env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
.map(x => (1, new String(x)))//将接收到的字节流转化成字符串
.filter(x => {
Random.nextInt(50000) == 10
})
.print
env.execute()
}
}
- 第二个订阅者
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job的并发度为2
env.setParallelism(2)
//创建Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收数据
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串
.filter(x=>{
Random.nextInt(50000) == 10
})
.print()
env.execute()
}
}