gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
  • ( 页面7 )
Flink 3月 1,2021

Flink Job Pipeline程序

场景说明

场景说明

本样例中发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据。

数据规划

  1. 发布者Job使用自定义算子每秒钟产生10000条数据
  2. 数据包含两个属性:分别是Int和String类型
  3. 配置文件
    • nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:nettyconnector.registerserver.topic.storage: /flink/nettyconnector
    • nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:nettyconnector.sinkserver.port.range: 28444-28943
    • nettyconnector.sinkserver.subnet:设置网络所属域,例如:nettyconnector.sinkserver.subnet: 10.162.0.0/16
  4. 接口说明
    • 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */ void start(Configuration configuration) throws Exception; /** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */ void createTopicNode(String topic) throw Exception; /** *将信息注册到某个topic节点(目录)下 * @param topic 需要注册到的目录 * @param registerRecord 需要注册的信息 */ void register(String topic, RegisterRecord registerRecord) throws Exception; /** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception; /** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */ void unregister(String topic, int recordId) throws Exception; /** * 查寻信息 * @param 查询信息所在的topic *@recordId 查询信息的ID */ RegisterRecord query(String topic, int recordId) throws Exception; /** * 查询某个Topic是否存在 * @param topic */ Boolean isExist(String topic) throws Exception; /** *关闭注册服务器句柄 */ void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。
    • NettySink算子Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler, int numberOfSubscribedJobs)
      • name:为本NettySink的名称。
      • topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
      • registerServerHandler: 为注册服务器的句柄。
      • numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
    • NettySource算子Class NettySource(String name, String topic, RegisterServerHandler registerServerHandler)
      • name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
      • topic:订阅的NettySink的topic。
      • registerServerHandler:为注册服务器的句柄。

说明:

NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。

开发思路

1. 一个Job作为发布者Job,其余两个作为订阅者Job

2. 发布者Job自己产生数据将其转化成byte[],分别向订阅者发送

3. 订阅者收到byte[]之后将其转化成String类型,并抽样打印输出

Java版代码:

  1. 发布Job自定义Source算子产生数据
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
import java.io.Serializable;
 
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
 
    private boolean isRunning = true;
 
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
 
    }
/**
    * 数据产生函数,每秒钟产生10000条数据
   */
    public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
 
        while(isRunning) {
            for (int i = 0; i < 10000; i++) {
                ctx.collect(Tuple2.of(i, "hello-" + i));
            }
            Thread.sleep(1000);
        }
    }
 
    public void close() {
        isRunning = false;
    }
 
    public void cancel() {
        isRunning = false;
    }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySink {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置job的并发度为2
        env.setBufferTimeout(2);
 
// 创建Zookeeper的注册服务器handler
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// 添加自定义Source算子
        env.addSource(new UserSource())
                .keyBy(0)
                .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
                    //将发送信息转化成字节数组
@Override
                    public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                        return integerStringTuple2.f1.getBytes();
                    }
                }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。
 
        env.execute();
 
    }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource1 {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置job的并发度为2        
env.setParallelism(2);
 
// 创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的消息
        env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
                  // 将接收到的字节流转化成字符串  
    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource2 {
 
    public static void main(String[] args) throws Exception {
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置作业的并发度为2       
 env.setParallelism(2);
 
//创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的数据
        env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
          //将接收到的字节数组转化成字符串
                    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}

Scala样例代码

  1. 发送消息
package com.huawei.bigdata.flink.examples
 
case class Inforamtion(index: Int, content: String) {
 
  def this() = this(0, "")
}
  1. 发布者job自定义source算子产生数据
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
 
  var isRunning = true
 
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
   
  }
 
// 每秒钟产生10000条数据
  override def run(sourceContext: SourceContext[Inforamtion]) = {
 
    while (isRunning) {
      for (i <- 0 until 10000) {
        sourceContext.collect(Inforamtion(i, "hello-" + i));
 
      }
      Thread.sleep(1000)
    }
  }
 
  override def close(): Unit = super.close()
 
  override def cancel() = {
    isRunning = false
  }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
object TestPipeline_NettySink {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置job的并发度为2    
env.setParallelism(2)
//设置Zookeeper为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加用户自定义算子产生数据    
env.addSource(new UserSource)
      .keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组
      .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据
 
    env.execute()
  }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource1 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置Job的并发度为2  
  env.setParallelism(2)
//设置Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收来自发布者的数据
    env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
      .map(x => (1, new String(x)))//将接收到的字节流转化成字符串
      .filter(x => {
        Random.nextInt(50000) == 10
      })
      .print
 
    env.execute()
  }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource2 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job的并发度为2   
 env.setParallelism(2)
//创建Zookeeper作为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收数据    
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
      .map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串
      .filter(x=>{
        Random.nextInt(50000) == 10
      })
      .print()
 
    env.execute()
  }
}
作者 east
Flink 3月 1,2021

Flink异步Checkpoint机制程序

场景说明

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。

数据规划

  1. 使用自定义算子每秒钟产生大约10000条数据。
  2. 产生的数据为一个四元组(Long,String,String,Integer)。
  3. 数据经统计后,统计结果打印到终端输出。
  4. 打印输出的结果为Long类型的数据。

开发思路

  1. source算子每隔1秒钟发送10000条数据,并注入到Window算子中。
  2. window算子每隔1秒钟统计一次最近4秒钟内数据数量。
  3. 每隔1秒钟将统计结果打印到终端。具体查看方式请参考查看调测结果。
  4. 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。

Java样例代码

功能介绍

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

代码样例

  1. 快照数据 该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。import java.io.Seriablizale; // 该类作为快照的一部分,保存用户自定义状态 public class UDFState implements Serializable { private long count; // 初始化用户自定义状态 public UDFState() { count = 0L; } // 设置用户自定义状态 public void setState(long count) { this.count = count; } // 获取用户自定义状态 public long geState() { return this.count; } }
  2. 带checkpoint的数据源 source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.List; import java.util.Random; // 该类是带checkpoint的source算子 public class SEventSourceWithChk extends RichSourceFunction<Tuple4<Long, String, String, Integer>> implements ListCheckpointed<UDFState> { private Long count = 0L; private boolean isRunning = true; private String alphabet = “abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321”; // 算子的主要逻辑,每秒钟向流图中注入10000个元组 public void run(SourceContext<Tuple4<Long, String, String, Integer>> ctx) throws Exception { Random random = new Random(); while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple4.of(random.nextLong(), “hello-” + count, alphabet, 1)) count++; } Thread.sleep(1000); } } // 任务取消时调用 public void cancel() { isRunning = false; } // 制作自定义快照 public List<UDFState> snapshotState(long l, long ll) throws Exception { UDFState udfState = new UDFState(); List<UDFState> listState = new ArrayList<UDFState>(); udfState.setState(count); listState.add(udfState); return listState; } // 从自定义快照中恢复数据 public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); count = udfState.getState(); } }
  3. 带checkpoint的窗口定义 该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; // 该类是带checkpoint的window算子 public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> { private Long total = 0L; // window算子实现逻辑,统计window中元组的个数 void apply(Tuple key, TimeWindow window, Iterable<Tuple4<Long, String, String, Integer>> input, Collector<Long> out) throws Exception { long count = 0L; for (Tuple4<Long, String, String, Integer> event : input) { count++; } total += count; out.collect(count); } // 制作自定义快照 public List<UDFState> snapshotState(Long l, Long ll) { List<UDFState> listState = new ArrayList<UDFState>(); UDFState udfState = new UDFState(); udfState.setState(total); listState.add(udfState); return listState; } // 从自定义快照中恢复状态 public void restoreState(List<UDFState> list) throws Exception { UDFState udfState = list.get(0); total = udfState.getState(); } }
  4. 应用代码 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了processing time。import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkProcessingTimeAPIChkMain { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置相关配置,并开启checkpoint功能 env.setStateBackend(new FsStateBackend(“hdfs://hacluster/flink-checkpoint/checkpoint/”)); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig.setCheckpointInterval(6000); // 应用逻辑 env.addSource(new SEventSourceWithChk()) .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk()) .print() env.execute(); } }

Scala样例代码

功能介绍

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性。

代码样例

  1. 发送数据形式case class SEvent(id: Long, name: String, info: String, count: Int)
  2. 快照数据 该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。// 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count }
  3. 带checkpoint的数据源 source算子的代码,该段代码每发送10000条数据休息1秒钟,制作快照时将到目前为止已经发送的数据的条数保存在UDFState中;从快照中状态恢复时,读取UDFState中的数据条数并重新赋值给count变量。import java.util import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext // 该类是带有checkpoint的source算子 class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFState]{ private var count = 0L private var isRunning = true private val alphabet = “abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321” // source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, “hello-“+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } // 任务取消时调用 override def cancel(): Unit = { isRunning = false; } override def close(): Unit = super.close() // 制作快照 override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(count) udfList.add(udfState) udfList } // 从快照中获取状态 override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) count = udfState.getState } }
  4. 带checkpoint的窗口定义 该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。import java.util import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.checkpoint.ListCheckpointed import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector // 该类是带checkpoint的window算子 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 制作自定义状态快照 override def snapshotState(l: Long, l1: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } // 从自定义快照中恢复状态 override def restoreState(list: util.List[UDFState]): Unit = { val udfState = list.get(0) total = udfState.getState } }
  5. 应用代码 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使用了event time。import com.hauwei.rt.flink.core.{SEvent, SEventSourceWithChk, WindowStatisticWithChk} import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend(“hdfs://hacluster/flink-checkpoint/checkpoint/”)) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(2000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) // 应用逻辑 env.addSource(new SEventSourceWithChk) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }
作者 east
Flink 3月 1,2021

Flink向Kafka生产并消费数据程序

场景说明

假定某个Flink业务每秒就会收到1个消息记录。

基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。

数据规划

Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。

  1. 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
  2. 创建Topic。 创建topic的命令格式: bin/kafka-topics.sh –create –zookeeper {zkQuorum}/kafka –partitions {partitionNum} –replication-factor {replicationNum} –topic {Topic}

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,保证topic与producer一致。
  3. 在数据内容中增加前缀并进行打印。

java版代码:

Java样例代码





//producer代码 public class WriteIntoKafka { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); // 构造执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发度 env.setParallelism(1); // 解析运行参数 ParameterTool paraTool = ParameterTool.fromArgs(args); // 构造流图,将自定义Source生成的数据写入Kafka DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())); // 调用execute触发执行 env.execute(); } // 自定义Source,每隔1s持续产生消息 public static class SimpleStringGenerator implements SourceFunction<String> { private static final long serialVersionUID = 2174904787118597072L; boolean running = true; long i = 0; @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { ctx.collect("element-" + (i++)); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } } //consumer代码 public class ReadFromKafka { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); // 构造执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发度 env.setParallelism(1); // 解析运行参数 ParameterTool paraTool = ParameterTool.fromArgs(args); // 构造流图,从Kafka读取数据并换行打印 DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())); messageStream.rebalance().map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { return "Flink says " + s + System.getProperty("line.separator"); } }).print(); // 调用execute触发执行 env.execute(); } }

scala版本代码:

Scala样例代码





//producer代码 object WriteIntoKafka { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") // 构造执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并发度 env.setParallelism(1) // 解析运行参数 val paraTool = ParameterTool.fromArgs(args) // 构造流图,将自定义Source生成的数据写入Kafka val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator) messageStream.addSink(new FlinkKafkaProducer010( paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) // 调用execute触发执行 env.execute } } // 自定义Source,每隔1s持续产生消息 class SimpleStringGenerator extends SourceFunction[String] { var running = true var i = 0 override def run(ctx: SourceContext[String]) { while (running) { ctx.collect("element-" + i) i += 1 Thread.sleep(1000) } } override def cancel() { running = false } } //consumer代码 object ReadFromKafka { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") // 构造执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并发度 env.setParallelism(1) // 解析运行参数 val paraTool = ParameterTool.fromArgs(args) // 构造流图,从Kafka读取数据并换行打印 val messageStream = env.addSource(new FlinkKafkaConsumer010( paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) messageStream .map(s => "Flink says " + s + System.getProperty("line.separator")).print() // 调用execute触发执行 env.execute() } }
作者 east
Flink 3月 1,2021

Flink统计连续网购时间超过2个小时的女性网民信息例子

Java样例代码

场景说明

场景说明

假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能:

说明:

DataStream应用程序可以在Windows环境和Linux环境中运行。

  • 实时统计总计网购时间超过2个小时的女性网民信息。
  • 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • log2.txt:周日网民停留日志。LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60

数据规划

DataStream样例工程的数据存储在文本中。

将log1.txt和log2.txt放置在某路径下,例如”/opt/log1.txt”和”/opt/log2.txt”。

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

主要分为四个部分:

  1. 读取文本数据,生成相应DataStream,解析数据生成UserRecord信息。
  2. 筛选女性网民上网时间数据信息。
  3. 按照姓名、性别进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
  4. 筛选连续上网时间超过阈值的用户,并获取结果。

功能介绍

统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印。

java版代码:

Java样例代码











// 参数解析: // <filePath>为文本读取路径,用逗号分隔。 // <windowTime>为统计数据的窗口跨度,时间单位都是分。 public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); // 读取文本路径信息,并使用逗号分隔 final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); // 构造执行环境,使用eventTime处理窗口数据 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 读取文本数据流 DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint java"); } // 构造keyBy的关键字作为分组依据 private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } // 解析文本行数据,构造UserRecord数据结构 private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } // UserRecord数据结构的定义,并重写了toString打印方法 public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { // add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } }

scala版本:

Scala样例代码











// 参数解析: // filePath为文本读取路径,用逗号分隔。 // windowTime;为统计数据的窗口跨度,时间单位都是分。 object FlinkStreamScalaExample { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") System.out.println("******************************************************************************************") System.out.println("<filePath> is for text file to read data, use comma to separate") System.out.println("<windowTime> is the width of the window, time as minutes") System.out.println("******************************************************************************************") // 读取文本路径信息,并使用逗号分隔 val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) assert(filePaths.length > 0) // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) // 构造执行环境,使用eventTime处理窗口数据 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 读取文本数据流 val unionStream = if (filePaths.length > 1) { val firstStream = env.readTextFile(filePaths.apply(0)) firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) } else { env.readTextFile(filePaths.apply(0)) } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(getRecord(_)) .assignTimestampsAndWatermarks(new Record2TimestampExtractor) .filter(_.sexy == "female") .keyBy("name", "sexy") .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) .filter(_.shoppingTime > 120).print() // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint scala") } // 解析文本行数据,构造UserRecord数据结构 def getRecord(line: String): UserRecord = { val elems = line.split(",") assert(elems.length == 3) val name = elems(0) val sexy = elems(1) val time = elems(2).toInt UserRecord(name, sexy, time) } // UserRecord数据结构的定义 case class UserRecord(name: String, sexy: String, shoppingTime: Int) // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { // add tag in the data of datastream elements override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { System.currentTimeMillis() } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } } }
作者 east
Flink 10月 26,2020

Flink 向Kafka生产并消费数据程序

场景说明

假定某个Flink业务每秒就会收到1个消息记录。

基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。

数据规划

Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。

  1. 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
  2. 创建Topic。 创建topic的命令格式: bin/kafka-topics.sh –create –zookeeper {zkQuorum}/kafka –partitions {partitionNum} –replication-factor {replicationNum} –topic {Topic} 表1 参数说明 参数名 说明 {zkQuorum} ZooKeeper集群信息,格式为IP:port。 {partitionNum} topic的分区数。 {replicationNum} topic中每个partition数据的副本数。 {Topic} topic名称。 示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002,Topic名称为topic1的数据为例。bin/kafka-topics.sh –create –zookeeper 10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002/kafka –partitions 5 –replication-factor 1 –topic topic1

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,保证topic与producer一致。
  3. 在数据内容中增加前缀并进行打印。

Java样例代码

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

下面列出producer和consumer主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka

//producer代码
public class WriteIntoKafka {
  public static void main(String[] args) throws Exception {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ");
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
        " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005");
    System.out.println("******************************************************************************************");
    System.out.println("<topic> is the kafka topic name");
    System.out.println("<bootstrap.servers> is the ip:port list of brokers");
    System.out.println("******************************************************************************************");

    // 构造执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并发度
    env.setParallelism(1);
    // 解析运行参数
    ParameterTool paraTool = ParameterTool.fromArgs(args);
    // 构造流图,将自定义Source生成的数据写入Kafka
    DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
    messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"),
        new SimpleStringSchema(),
        paraTool.getProperties()));
    // 调用execute触发执行
    env.execute();
  }

  // 自定义Source,每隔1s持续产生消息
  public static class SimpleStringGenerator implements SourceFunction<String> {
    private static final long serialVersionUID = 2174904787118597072L;
    boolean running = true;
    long i = 0;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
      while (running) {
        ctx.collect("element-" + (i++));
        Thread.sleep(1000);
      }
    }

    @Override
    public void cancel() {
      running = false;
    }
  }
}

//consumer代码
public class ReadFromKafka {
  public static void main(String[] args) throws Exception {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ");
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
        " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
    System.out.println("******************************************************************************************");
    System.out.println("<topic> is the kafka topic name");
    System.out.println("<bootstrap.servers> is the ip:port list of brokers");
    System.out.println("******************************************************************************************");

    // 构造执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并发度
    env.setParallelism(1);
    // 解析运行参数
    ParameterTool paraTool = ParameterTool.fromArgs(args);
    // 构造流图,从Kafka读取数据并换行打印
    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
        new SimpleStringSchema(),
        paraTool.getProperties()));
    messageStream.rebalance().map(new MapFunction<String, String>() {
      @Override
      public String map(String s) throws Exception {
        return "Flink says " + s + System.getProperty("line.separator");
      }
    }).print();
    // 调用execute触发执行
    env.execute();
  }
}

Scala样例代码

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

下面列出producer和consumer主要逻辑代码作为演示。 完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka

//producer代码
object WriteIntoKafka {
  def main(args: Array[String]) {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ")
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
      " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005")
    System.out.println("******************************************************************************************")
    System.out.println("<topic> is the kafka topic name")
    System.out.println("<bootstrap.servers> is the ip:port list of brokers")
    System.out.println("******************************************************************************************")

    // 构造执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并发度
    env.setParallelism(1)
    // 解析运行参数
    val paraTool = ParameterTool.fromArgs(args)
    // 构造流图,将自定义Source生成的数据写入Kafka
    val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator)
    messageStream.addSink(new FlinkKafkaProducer010(
      paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
    // 调用execute触发执行
    env.execute
  }
}

// 自定义Source,每隔1s持续产生消息
class SimpleStringGenerator extends SourceFunction[String] {
  var running = true
  var i = 0

  override def run(ctx: SourceContext[String]) {
    while (running) {
      ctx.collect("element-" + i)
      i += 1
      Thread.sleep(1000)
    }
  }

  override def cancel() {
    running = false
  }
}

//consumer代码
object ReadFromKafka {
  def main(args: Array[String]) {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ")
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
      " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005")
    System.out.println("******************************************************************************************")
    System.out.println("<topic> is the kafka topic name")
    System.out.println("<bootstrap.servers> is the ip:port list of brokers")
    System.out.println("******************************************************************************************")

    // 构造执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并发度
    env.setParallelism(1)
    // 解析运行参数
    val paraTool = ParameterTool.fromArgs(args)
    // 构造流图,从Kafka读取数据并换行打印
    val messageStream = env.addSource(new FlinkKafkaConsumer010(
      paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
    messageStream
      .map(s => "Flink says " + s + System.getProperty("line.separator")).print()
    // 调用execute触发执行
    env.execute()
  }
}
作者 east
Flink 10月 26,2020

Spark Streaming从Kafka读取数据再写入HBase 实例

Java样例代码

功能介绍

在Spark应用中,通过使用Spark Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase

/**  * 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表  */ public class SparkOnStreamingToHbase {   public static void main(String[] args) throws Exception {     if (args.length < 3) {       printUsage();     }     String checkPointDir = args[0];     String topics = args[1];     final String brokers = args[2];     Duration batchDuration = Durations.seconds(5);     SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase");     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration);     // 设置Spark Streaming的CheckPoint目录     if (!"nocp".equals(checkPointDir)) {       jssc.checkpoint(checkPointDir);     }     final String columnFamily = "cf";     HashMap<String, String> kafkaParams = new HashMap<String, String>();     kafkaParams.put("metadata.broker.list", brokers);     String[] topicArr = topics.split(",");     Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));     // 通过brokers和topics直接创建kafka stream     // 接收Kafka中数据,生成相应DStream     JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class,       StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map(       new Function<Tuple2<String, String>, String>() {         public String call(Tuple2<String, String> tuple2) {           // map(_._1)是消息的key, map(_._2)是消息的value           return tuple2._2();         }       }     );     lines.foreachRDD(       new Function<JavaRDD<String>, Void>() {         public Void call(JavaRDD<String> rdd) throws Exception {           rdd.foreachPartition(             new VoidFunction<Iterator<String>>() {               public void call(Iterator<String> iterator) throws Exception {                 hBaseWriter(iterator, columnFamily);               }             }           );           return null;         }       }     );     jssc.start();     jssc.awaitTermination();   }   /**    * 在executor端写入数据    * @param iterator  消息    * @param columnFamily    */   private static void hBaseWriter(Iterator<String> iterator, String columnFamily) throws IOException {     Configuration conf = HBaseConfiguration.create();     Connection connection = null;     Table table = null;     try {       connection = ConnectionFactory.createConnection(conf);       table = connection.getTable(TableName.valueOf("table1"));       List<Get> rowList = new ArrayList<Get>();       while (iterator.hasNext()) {         Get get = new Get(iterator.next().getBytes());         rowList.add(get);       }       // 获取table1的数据       Result[] resultDataBuffer = table.get(rowList);       // 设置table1的数据       List<Put> putList = new ArrayList<Put>();       for (int i = 0; i < resultDataBuffer.length; i++) {         String row = new String(rowList.get(i).getRow());         Result resultData = resultDataBuffer[i];         if (!resultData.isEmpty()) {           // 根据列簇和列,获取旧值           String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));           Put put = new Put(Bytes.toBytes(row));           // 计算结果           int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid);           put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));           putList.add(put);         }       }       if (putList.size() > 0) {         table.put(putList);       }     } catch (IOException e) {       e.printStackTrace();     } finally {       if (table != null) {         try {           table.close();         } catch (IOException e) {           e.printStackTrace();         }       }       if (connection != null) {         try {           // 关闭Hbase连接.           connection.close();         } catch (IOException e) {           e.printStackTrace();         }       }     }   }     private static void printUsage() {     System.out.println("Usage: {checkPointDir} {topic} {brokerList}");     System.exit(1);   } }


Scala样例代码

功能介绍

在Spark应用中,通过使用Spark Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase

/**
  * 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
  */
object SparkOnStreamingToHbase {
  def main(args: Array[String]) {
    if (args.length < 3) {
      printUsage
    }

    val Array(checkPointDir, topics, brokers) = args
    val sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置Spark Streaming的CheckPoint目录
    if (!"nocp".equals(checkPointDir)) {
      ssc.checkpoint(checkPointDir)
    }

    val columnFamily = "cf"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
    )

    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    // map(_._1)是消息的key, map(_._2)是消息的value
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    lines.foreachRDD(rdd => {
      //partition运行在executor上
      rdd.foreachPartition(iterator => hBaseWriter(iterator, columnFamily))
    })

    ssc.start()
    ssc.awaitTermination()
  }

  
  /**
   * 在executor端写入数据
   * @param iterator  消息
   * @param columnFamily
   */
  def hBaseWriter(iterator: Iterator[String], columnFamily: String): Unit = {
    val conf = HBaseConfiguration.create()
    var table: Table = null
    var connection: Connection = null
    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf("table1"))
      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row.getBytes)
        rowList.add(get)
      }
      // 获取table1的数据
      val resultDataBuffer = table.get(rowList)
      // 设置table1的数据
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        val row = iteratorArray(i)
        val resultData = resultDataBuffer(i)
        if (!resultData.isEmpty) {
          // 根据列簇和列,获取旧值
          val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
          val put = new Put(Bytes.toBytes(row))
          // 计算结果
          val resultValue = row.toInt + aCid.toInt
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
          putList.add(put)
        }
      }
      if (putList.size() > 0) {
        table.put(putList)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close()
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // 关闭Hbase连接.
          connection.close()
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    }
  }
  

  private def printUsage {
    System.out.println("Usage: {checkPointDir} {topic} {brokerList}")
    System.exit(1)
  }
}
作者 east
Flink 10月 26,2020

flink调优经验

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 调用rebalance操作,使数据分区均匀。

缓冲区超时设置

  • 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
  • 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例可以参考如下:env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
作者 east

上一 1 … 6 7

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (494)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (36)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.