gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
  • ( 页面6 )
Flink 5月 8,2023

Flink CDC的日志解析

在开发Flink CDC时,可以看到类似下面的日志:

com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] – Binlog offset on checkpoint 83: {transaction_id=null, ts_sec=0, file=mysql_binary_log.000031, pos=488646219, kind=SPECIFIC, gtids=0ada2b25-c265-11e9-8a8d-fa163e713fa8:1-2781408, row=0, event=0, server_id=1}

根据日志可以做下面的解析:

  • 你的Flink任务是使用Flink CDC Connector来从MySQL读取数据,并且使用MySqlSourceReader来读取MySQL的binlog。
  • 你的Flink任务在checkpoint 83时,记录了当前的binlog偏移量,用于在故障恢复时重新定位数据源。
  • 你的binlog偏移量包含了以下几个字段:
    • transaction_id: 当前事务的ID,如果没有事务,则为null。
    • ts_sec: 当前事件的时间戳,单位为秒。
    • file: 当前binlog文件的名称。
    • pos: 当前binlog文件的位置,单位为字节。
    • kind: 当前事件的类型,可以是SPECIFIC(特定事件),ROW(行事件),DDL(数据定义语言事件)或DML(数据操作语言事件)。
    • gtids: 当前全局事务标识符集合,用于跨多个MySQL服务器标识事务。
    • row: 当前行事件的行号,从0开始。
    • event: 当前行事件的事件号,从0开始。
    • server_id: 当前MySQL服务器的ID。
作者 east
Flink 5月 8,2023

运行flink出错:Could not acquire the minimum required resources.

运行flink任务报下面错误:

Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) … 37 more Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

这个错误表示你的Flink集群没有足够的任务槽来满足你的作业需求。你需要增加每个TaskManager的槽数或者增加TaskManager的实例数12。Flink本身不能触发动态扩缩容,你只能手动启动更多的TaskManager或者修改TaskManager的配置并重启1。如果你的TaskManager在作业运行时挂掉了,你可以定义一个重启策略(注意你需要开启检查点)3。

作者 east
Flink 4月 28,2023

CDH上Flink执行sql-client命令提示[ERROR] Could not execute SQL statement. Reason: java.net.ConnectException: 拒绝连接

在cdh6.3.2上执行sql-client的命令,提示:

[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: 拒绝连接

这可能是由于以下原因:

  • 您没有启动flink集群,或者flink集群不可用。您可以检查flink集群的状态,或者尝试重新启动flink集群。
  • 您没有正确地配置flink sql-client的环境,或者您的配置文件有误。您可以检查您的sql-client-defaults.yaml文件,或者参考官方文档来设置flink sql-client的环境。
  • 您的网络有问题,或者您的防火墙阻止了flink sql-client的连接。您可以检查您的网络连接,或者允许flink sql-client通过防火墙。

原来是需要启动本地集群:

./start-cluster.sh
cd /opt/cloudera/parcels/FLINK-1.14.4-BIN-SCALA_2.12/lib/flink/bin

然后再次启动sql客户端即可执行成功。

作者 east
Flink 4月 25,2023

运行flink的word count提示: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation

写了一个flink简单word count的例子:


import org.apache.flink.api.scala._

object FlinkWordCount {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val words = List("hello", "world", "flink", "scala", "hello", "flink")

    // 为DataStream和DataSet的上下文提供隐式的类型信息
    implicit val typeInfo = createTypeInformation[WordCount]

    val wordDataSet = env.fromCollection(words)

    val wordCountDataSet = wordDataSet
      .map(word => WordCount(word, 1))
      .groupBy(0)
      .sum(1)

   
  }

  case class WordCount(word: String, count: Int)
}

提示could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation

各种修改后还是报错,后来想到scala版本问题。
Flink 1.10.0以上版本需要Scala 2.12.x,如果使用Scala 2.11.x版本,就会报错。请确认一下你的Scala版本是否为2.12.x,如果不是请升级Scala版本到2.12.x后再运行代码。

作者 east
Flink 4月 24,2023

Flink CDC获取mysql 主从分库,分库分表的binlog

Flink CDC可以获取MySQL主从分库,分库分表的binlog,但是需要注意以下几点:

  • Flink CDC需要配置MySQL的binlog模式为row,以及开启GTID(全局事务标识符),以便正确地识别和处理binlog事件
  • Flink CDC需要配置MySQL的主从复制关系,以及指定主库或从库的地址,以便正确地连接和读取binlog数据
  • Flink CDC需要配置MySQL的分库分表规则,以及指定分库分表的键,以便正确地合并和转换binlog数据

一个可能的Flink CDC获取MySQL主从分库,分库分表的binlog的示例如下:

-- 创建MySQL主从分库,分库分表的表对象
CREATE TABLE mysql_user (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING,
  age INT
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'master-host', -- 主库地址
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'test', -- 分库名
  'table-name' = 'user_${0-9}', -- 分表名
  'server-id' = '5400', -- 从库ID
  'scan.startup.mode' = 'initial', -- 初始扫描模式
  'scan.split.size' = '8096', -- 扫描块大小
  'debezium.snapshot.locking.mode' = 'none', -- 快照锁定模式
  'debezium.gtid.source.includes' = '', -- GTID包含范围
  'debezium.gtid.source.excludes' = '' -- GTID排除范围
);

-- 使用Flink SQL或DataStream API将MySQL的增量数据写入到其他目标,如Hive、Doris等
INSERT INTO hive_user SELECT * FROM mysql_user;

-- DataStream API
DataStream<Row> stream = tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM mysql_user"), Row.class);
tEnv.createTemporaryView("stream", stream);
tEnv.executeSql("INSERT INTO doris_user SELECT * FROM stream");
作者 east
Flink 4月 24,2023

对比flink cdc和canal获取mysql binlog优缺点

Flink CDC和Canal都是用于获取MySQL binlog的工具,但是有以下几点优缺点对比:

  • Flink CDC是一个基于Flink的库,可以直接在Flink中使用,无需额外的组件或服务,而Canal是一个独立的服务,需要单独部署和运行,增加了系统的复杂度和成本
  • Flink CDC支持多种数据库的数据变化捕获,如MySQL、PostgreSQL、MongoDB等,而Canal只支持MySQL和MariaDB的数据变化捕获
  • Flink CDC支持Exactly-Once语义,保证数据的一致性和准确性,而Canal只支持At-Least-Once语义,可能会出现数据的重复或丢失
  • Flink CDC支持自动元数据更新,无需手动创建或维护表结构,而Canal需要手动创建或维护表结构,增加了开发和维护的工作量
  • Flink CDC支持Flink SQL和DataStream API的使用,方便进行数据处理和转换,而Canal只提供了Kafka、RocketMQ等消息队列的接口,需要额外的消费者进行数据处理和转换
作者 east
Flink, Spark 4月 19,2022

大数据实时流式处理:Apache Flink vs Apache

对更快数据处理的需求一直在增加,实时流数据处理是目前的解决方案。虽然 Apache Spark 仍在许多组织中用于大数据处理,但 Apache Flink 已经迅速成为替代方案。事实上,许多人认为它有可能取代 Apache Spark,因为它能够实时处理流数据。当然,Flink 能否取代 Spark 尚无定论,因为 Flink 还没有经过广泛的测试。但实时处理和低数据延迟是其两个决定性特征。同时,这需要考虑到 Apache Spark 可能不会失宠,因为它的批处理能力仍然很重要。

流式数据处理案例

对于基于批处理的所有优点,实时流数据处理似乎是一个强有力的案例。流式数据处理使快速设置和加载数据仓库成为可能。具有低数据延迟的流处理器可以快速提供对数据的更多见解。所以,你有更多的时间来了解发生了什么。除了更快的处理之外,还有另一个显着的好处:您有更多的时间来设计对事件的适当响应。例如,在异常检测的情况下,更低的延迟和更快的检测使您能够确定最佳响应,这是防止安全网站受到欺诈攻击或工业设备损坏等情况的关键。因此,您可以防止重大损失。

什么是 Apache Flink?

Apache Flink 是一种大数据处理工具,以在大规模分布式系统上以低数据延迟和高容错性快速处理大数据而著称。它的定义特征是它能够实时处理流数据。

Apache Flink 最初是一个学术开源项目,当时它被称为 Stratosphere。后来,它成为了 Apache 软件基金会孵化器的一部分。为避免与其他项目名称冲突,将名称更改为 Flink。 Flink 这个名字很合适,因为它意味着敏捷。即使选择的标志,松鼠也是合适的,因为松鼠代表了敏捷、敏捷和速度的美德。

自从加入 Apache 软件基金会后,它作为大数据处理工具迅速崛起,并在 8 个月内开始受到更广泛受众的关注。人们对 Flink 的兴趣日益浓厚,这反映在 2015 年的多次会议的参会人数上。2015 年 5 月在伦敦举行的 Strata 会议和 2015 年 6 月在圣何塞举行的 Hadoop 峰会上,有很多人参加了关于 Flink 的会议。 2015 年 8 月,超过 60 人参加了在圣何塞 MapR 总部举办的湾区 Apache Flink 聚会。

下图给出了 Flink 的 Lambda 架构。

Spark 和 Flink 的比较

虽然 Spark 和 Flink 之间有一些相似之处,例如它们的 API 和组件,但在数据处理方面,相似之处并不重要。 下面给出了 Flink 和 Spark 之间的比较。

数据处理

Spark 以批处理模式处理数据,而 Flink 实时处理流数据。 Spark 处理数据块,称为 RDD,而 Flink 可以实时处理一行一行的数据。 因此,虽然 Spark 始终存在最小数据延迟,但 Flink 并非如此。

迭代

Spark 支持批量数据迭代,但 Flink 可以使用其流式架构原生迭代其数据。 下图显示了迭代处理是如何发生的。

内存管理

Flink 可以自动适应不同的数据集,但 Spark 需要手动优化和调整其作业以适应单个数据集。 Spark 也进行手动分区和缓存。因此,预计处理会有所延迟。

数据流

Flink 能够在需要时为其数据处理提供中间结果。 Spark 遵循过程式编程系统,而 Flink 遵循分布式数据流方法。因此,当需要中间结果时,广播变量用于将预先计算的结果分发到所有工作节点。

数据可视化

Flink 提供了一个 Web 界面来提交和执行所有作业。 Spark 和 Flink 都与 Apache Zeppelin 集成,并提供数据摄取、数据分析、发现、协作和可视化。 Apache Zeppelin 还提供了多语言后端,允许您提交和执行 Flink 程序。

处理时间

以下段落提供了 Flink 和 Spark 在不同作业中所用时间的比较。

为了公平比较,Flink 和 Spark 都以机器规格和节点配置的形式获得了相同的资源。

Flink 处理速度更快,因为它的流水线执行。 处理数据,Spark 用了 2171 秒,而 Flink 用了 1490 秒。

当执行不同数据大小的 TeraSort 时,结果如下:

对于 10 GB 的数据,Flink 需要 157 秒,而 Spark 需要 387 秒。
对于 160 GB 的数据,Flink 需要 3127 秒,而 Spark 需要 4927 秒。
基于批处理或流式数据——哪个过程更好?

这两种工艺各有优势,适用于不同的情况。 尽管许多人声称基于批处理的工具正在失宠,但它不会很快发生。 要了解它们的相对优势,请参见以下比较:

在个别情况下,Flink 和 Spark批 处理都是有用的。以每天计算滚动月销售额的用例为例。在此活动中,需要计算每日销售总额,然后进行累计。在这样的用例中,可能不需要对数据进行流式处理。数据的批处理可以根据日期处理各个批次的销售数据,然后将它们添加。在这种情况下,即使存在一些数据延迟,也可以在稍后将该潜在数据添加到以后的批次中时弥补这些延迟。

有类似的用例需要流处理。以计算每个访问者在网站上花费的每月滚动时间的用例为例。在网站的情况下,访问次数可以每小时、每分钟甚至每天更新一次。但是这种情况下的问题是定义会话。定义会话的开始和结束可能很困难。此外,难以计算或识别不活动的时间段。因此,在这种情况下,定义会话甚至不活动时间段都没有合理的界限。在这种情况下,需要实时处理流数据。

概括

虽然 Spark 在批处理数据处理方面有很多优势,而且它仍然有很多使用场景,但 Flink 似乎正在迅速获得商业方面应用的青睐。 Flink 也可以进行批处理这一事实似乎对其有利。当然,这需要考虑到 Flink 的批处理能力可能与 Spark 不在一个级别。

作者 east
Flink, Spark 4月 13,2022

Flink和Spark的Transformation不同地方对比

1、合并输入流:

在spark有Union
返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。 具体例子可以参考Spark Streaming多个输入流

在Flink中更高级,除了有union合并多个输入流(
union()所连接的两个或多个数据流的数据类型必须一致 ),还有connect()。

①connect()只能连接两个数据流,union()可以连接多个数据流。

②connect()所连接的两个数据流的数据类型可以不一致,union()所连接的两个或多个数据流的数据类型必须一致。

③两个DataStream经过connect()之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且两个流之间可以共享状态。

2、求最大最小的操作

Flink有 max()、 maxBy() 对该字段求最大值 。
min()、minBy对某字段求最小值

作者 east
Flink 4月 8,2022

Flink面试题汇总

1、Flink如何保证精确一次性消费

Flink 保证精确一次性消费主要依赖于两种Flink机制

1、Checkpoint机制

2、二阶段提交机制

Checkpoint机制

主要是当Flink开启Checkpoint的时候,会往Source端插入一条barrir,然后这个barrir随着数据流向一直流动,当流入到一个算子的时候,这个算子就开始制作checkpoint,制作的是从barrir来到之前的时候当前算子的状态,将状态写入状态后端当中。然后将barrir往下流动,当流动到keyby 或者shuffle算子的时候,例如当一个算子的数据,依赖于多个流的时候,这个时候会有barrir对齐,也就是当所有的barrir都来到这个算子的时候进行制作checkpoint,依次进行流动,当流动到sink算子的时候,并且sink算子也制作完成checkpoint会向jobmanager 报告 checkpoint n 制作完成。

二阶段提交机制

Flink 提供了CheckpointedFunction与CheckpointListener这样两个接口,CheckpointedFunction中有snapshotState方法,每次checkpoint触发执行方法,通常会将缓存数据放入状态中,可以理解为一个hook,这个方法里面可以实现预提交,CheckpointListyener中有notifyCheckpointComplete方法,checkpoint完成之后的通知方法,这里可以做一些额外的操作。例如FLinkKafkaConumerBase使用这个来完成Kafka offset的提交,在这个方法里面可以实现提交操作。在2PC中提到如果对应流程例如某个checkpoint失败的话,那么checkpoint就会回滚,不会影响数据一致性,那么如果在通知checkpoint成功的之后失败了,那么就会在initalizeSate方法中完成事务的提交,这样可以保证数据的一致性。最主要是根据checkpoint的状态文件来判断的。

2、flink和spark区别

flink是一个类似spark的“开源技术栈”,因为它也提供了批处理,流式计算,图计算,交互式查询,机器学习等。flink也是内存计算,比较类似spark,但是不一样的是,spark的计算模型基于RDD,将流式计算看成是特殊的批处理,他的DStream其实还是RDD。而flink吧批处理当成是特殊的流式计算,但是批处理和流式计算的层的引擎是两个,抽象了DataSet和DataStream。flink在性能上也表现的很好,流式计算延迟比spark少,能做到真正的流式计算,而spark只能是准流式计算。而且在批处理上,当迭代次数变多,flink的速度比spark还要快,所以如果flink早一点出来,或许比现在的Spark更火。

3、Flink的状态可以用来做什么?

Flink状态主要有两种使用方式:

  1. checkpoint的数据恢复
  2. 逻辑计算

4、Flink的waterMark机制,Flink watermark传递机制

Flink 中的watermark机制是用来处理乱序的,flink的时间必须是event time ,有一个简单的例子就是,假如窗口是5秒,watermark是2秒,那么 总共就是7秒,这个时候什么时候会触发计算呢,假设数据初始时间是1000,那么等到6999的时候会触发5999窗口的计算,那么下一个就是13999的时候触发10999的窗口

其实这个就是watermark的机制,在多并行度中,例如在kafka中会所有的分区都达到才会触发窗口

5、Flink的时间语义

Event Time 事件产生的时间

Ingestion time 事件进入Flink的时间

processing time 事件进入算子的时间

6、Flink window join

1、window join,即按照指定的字段和滚动滑动窗口和会话窗口进行 inner join

2、是coGoup 其实就是left join 和 right join,

3、interval join 也就是 在窗口中进行join 有一些问题,因为有些数据是真的会后到的,时间还很长,那么这个时候就有了interval join但是必须要是事件时间,并且还要指定watermark和水位以及获取事件时间戳。并且要设置 偏移区间,因为join 也不能一直等的。

7、flink窗口函数有哪些

Tumbing window

Silding window

Session window

Count winodw

8、keyedProcessFunction 是如何工作的。假如是event time的话

keyedProcessFunction 是有一个ontime 操作的,假如是 event时间的时候 那么 调用的时间就是查看,event的watermark 是否大于 trigger time 的时间,如果大于则进行计算,不大于就等着,如果是kafka的话,那么默认是分区键最小的时间来进行触发。

9、flink是怎么处理离线数据的例如和离线数据的关联?

1、async io

2、broadcast

3、async io + cache

4、open方法中读取,然后定时线程刷新,缓存更新是先删除,之后再来一条之后再负责写入缓存

10、flink支持的数据类型

DataSet Api 和 DataStream Api、Table Api

11、Flink出现数据倾斜怎么办

Flink数据倾斜如何查看:

在flink的web ui中可以看到数据倾斜的情况,就是每个subtask处理的数据量差距很大,例如有的只有一M 有的100M 这就是严重的数据倾斜了。

KafkaSource端发生的数据倾斜

例如上游kafka发送的时候指定的key出现了数据热点问题,那么就在接入之后,做一个负载均衡(前提下游不是keyby)。

聚合类算子数据倾斜

预聚合加全局聚合

12、flink 维表关联怎么做的

1、async io

2、broadcast

3、async io + cache

4、open方法中读取,然后定时线程刷新,缓存更新是先删除,之后再来一条之后再负责写入缓存

13、Flink checkpoint的超时问题 如何解决。

1、是否网络问题

2、是否是barrir问题

3、查看webui,是否有数据倾斜

4、有数据倾斜的话,那么解决数据倾斜后,会有改善,

14、flinkTopN与离线的TopN的区别

topn 无论是在离线还是在实时计算中都是比较常见的功能,不同于离线计算中的topn,实时数据是持续不断的,这样就给topn的计算带来很大的困难,因为要持续在内存中维持一个topn的数据结构,当有新数据来的时候,更新这个数据结构

15、sparkstreaming 和flink 里checkpoint的区别

sparkstreaming 的checkpoint会导致数据重复消费

但是flink的 checkpoint可以 保证精确一次性,同时可以进行增量,快速的checkpoint的,有三个状态后端,memery、rocksdb、hdfs

16、简单介绍一下cep状态编程

Complex Event Processing(CEP):

FLink Cep 是在FLink中实现的复杂时间处理库,CEP允许在无休止的时间流中检测事件模式,让我们有机会掌握数据中重要的部分,一个或多个由简单事件构成的时间流通过一定的规则匹配,然后输出用户想得到的数据,也就是满足规则的复杂事件。

17、 Flink cep连续事件的可选项有什么

18、如何通过flink的CEP来实现支付延迟提醒

19、Flink cep 你用过哪些业务场景

20、cep底层如何工作

21、cep怎么老化

22、cep性能调优

23、Flink的背压,介绍一下Flink的反压,你们是如何监控和发现的呢。

Flink 没有使用任何复杂的机制来解决反压问题,Flink 在数据传输过程中使用了分布式阻塞队列。我们知道在一个阻塞队列中,当队列满了以后发送者会被天然阻塞住,这种阻塞功能相当于给这个阻塞队列提供了反压的能力。

当你的任务出现反压时,如果你的上游是类似 Kafka 的消息系统,很明显的表现就是消费速度变慢,Kafka 消息出现堆积。

如果你的业务对数据延迟要求并不高,那么反压其实并没有很大的影响。但是对于规模很大的集群中的大作业,反压会造成严重的“并发症”。首先任务状态会变得很大,因为数据大规模堆积在系统中,这些暂时不被处理的数据同样会被放到“状态”中。另外,Flink 会因为数据堆积和处理速度变慢导致 checkpoint 超时,而 checkpoint 是 Flink 保证数据一致性的关键所在,最终会导致数据的不一致发生。

Flink Web UI

Flink 的后台页面是我们发现反压问题的第一选择。Flink 的后台页面可以直观、清晰地看到当前作业的运行状态。

Web UI,需要注意的是,只有用户在访问点击某一个作业时,才会触发反压状态的计算。在默认的设置下,Flink的TaskManager会每隔50ms触发一次反压状态监测,共监测100次,并将计算结果反馈给JobManager,最后由JobManager进行反压比例的计算,然后进行展示。

在生产环境中Flink任务有反压有三种OK、LOW、HIGH

OK正常

LOW一般

HIGH高负载

24、Flink的CBO,逻辑执行计划和物理执行计划

Flink的优化执行其实是借鉴的数据库的优化器来生成的执行计划。

CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Flink 的成本优化器也一样。Flink 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。

// TODO

25、Flink中数据聚合,不使用窗口怎么实现聚合

  • valueState 用于保存单个值
  • ListState 用于保存list元素
  • MapState 用于保存一组键值对
  • ReducingState 提供了和ListState相同的方法,返回一个ReducingFunction聚合后的值。
  • AggregatingState和 ReducingState类似,返回一个AggregatingState内部聚合后的值

26、Flink中state有哪几种存储方式

Memery、RocksDB、HDFS

27、Flink 异常数据怎么处理

异常数据在我们的场景中,一般分为缺失字段和异常值数据。

异常值: 例如宝宝的年龄的数据,例如对于母婴行业来讲,一个宝宝的年龄是一个至关重要的数据,可以说是最重要的,因为宝宝大于3岁几乎就不会在母婴上面购买物品。像我们的有当日、未知、以及很久的时间。这样都属于异常字段,这些数据我们会展示出来给店长和区域经理看,让他们知道多少个年龄是不准的。如果要处理的话,可以根据他购买的时间来进行实时矫正,例如孕妇服装、奶粉的段位、纸尿裤的大小,以及奶嘴啊一些能够区分年龄段的来进行处理。我们并没有实时处理这些数据,我们会有一个底层的策略任务夜维去跑,一个星期跑一次。

缺失字段: 例如有的字段真的缺失的很厉害,能修补就修补。不能修补就放弃,就像上家公司中的新闻推荐过滤器。

28、Flink 监控你们怎么做的

1、我们监控了Flink的任务是否停止

2、我们监控了Flink的Kafka的LAG

3、我们会进行实时数据对账,例如销售额。

29、Flink 有数据丢失的可能吗

Flink有三种数据消费语义:

  1. At Most Once 最多消费一次 发生故障有可能丢失
  2. At Least Once 最少一次 发生故障有可能重复
  3. Exactly-Once 精确一次 如果产生故障,也能保证数据不丢失不重复。

flink 新版本已经不提供 At-Most-Once 语义。

30、Flink interval join 你能简单的写一写吗

DataStream<T> keyed1 = ds1.keyBy(o -> o.getString("key"))
DataStream<T> keyed2 = ds2.keyBy(o -> o.getString("key"))
//右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
keyed1.intervalJoin(keyed2).between(Time.milliseconds(-5), Time.milliseconds(5))

31、Flink 提交的时候 并行度如何制定,以及资源如何配置

并行度根据kafka topic的并行度,一个并行度3个G

32、Flink的boardcast join 的原理是什么

利用 broadcast State 将维度数据流广播到下游所有 task 中。这个 broadcast 的流可以与我们的事件流进行 connect,然后在后续的 process 算子中进行关联操作即可。

33、flink的source端断了,比如kafka出故障,没有数据发过来,怎么处理?

会有报警,监控的kafka偏移量也就是LAG。

34、flink有什么常用的流的API?

window join 啊 cogroup 啊 map flatmap,async io 等

35、flink的水位线,你了解吗,能简单介绍一下吗

Flink 的watermark是一种延迟触发的机制。

一般watermark是和window结合来进行处理乱序数据的,Watermark最根本就是一个时间机制,例如我设置最大乱序时间为2s,窗口时间为5秒,那么就是当事件时间大于7s的时候会触发窗口。当然假如有数据分区的情况下,例如kafka中接入watermake的话,那么watermake是会流动的,取的是所有分区中最小的watermake进行流动,因为只有最小的能够保证,之前的数据都已经来到了,可以触发计算了。

36、Flink怎么维护Checkpoint?在HDFS上存储的话会有小文件吗

默认情况下,如果设置了Checkpoint选项,Flink只保留最近成功生成的1个Checkpoint。当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活。Flink支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置指定最多需要保存Checkpoint的个数。

关于小文件问题可以参考代达罗斯之殇-大数据领域小文件问题解决攻略。

37、Spark和Flink的序列化,有什么区别吗?

Spark 默认使用的是 Java序列化机制,同时还有优化的机制,也就是kryo

Flink是自己实现的序列化机制,也就是TypeInformation

38、Flink是怎么处理迟到数据的?但是实际开发中不能有数据迟到,怎么做?

Flink 的watermark是一种延迟触发的机制。

一般watermark是和window结合来进行处理乱序数据的,Watermark最根本就是一个时间机制,例如我设置最大乱序时间为2s,窗口时间为5秒,那么就是当事件时间大于7s的时候会触发窗口。当然假如有数据分区的情况下,例如kafka中接入watermake的话,那么watermake是会流动的,取的是所有分区中最小的watermake进行流动,因为只有最小的能够保证,之前的数据都已经来到了,可以触发计算了。

39、画出flink执行时的流程图。

40、Flink分区分配策略

41、Flink关闭后状态端数据恢复得慢怎么办?

42、了解flink的savepoint吗?讲一下savepoint和checkpoint的不同和各有什么优势

43、flink的状态后端机制

Flink的状态后端是Flink在做checkpoint的时候将状态快照持久化,有三种状态后端 Memery、HDFS、RocksDB

44、flink中滑动窗口和滚动窗口的区别,实际应用的窗口是哪种?用的是窗口长度和滑动步长是多少?

45、用flink能替代spark的批处理功能吗

Flink 未来的目标是批处理和流处理一体化,因为批处理的数据集你可以理解为是一个有限的数据流。Flink 在批出理方面,尤其是在今年 Flink 1.9 Release 之后,合入大量在 Hive 方面的功能,你可以使用 Flink SQL 来读取 Hive 中的元数据和数据集,并且使用 Flink SQL 对其进行逻辑加工,不过目前 Flink 在批处理方面的性能,还是干不过 Spark的。

目前看来,Flink 在批处理方面还有很多内容要做,当然,如果是实时计算引擎的引入,Flink 当然是首选。

46、flink计算的UV你们是如何设置状态后端保存数据

可以使用布隆过滤器。

47、sparkstreaming和flink在执行任务上有啥区别,不是简单的流处理和微批,sparkstreaming提交任务是分解成stage,flink是转换graph,有啥区别?

48、flink把streamgraph转化成jobGraph是在哪个阶段?

49、Flink中的watermark除了处理乱序数据还有其他作用吗?

还有kafka数据顺序消费的处理。

50、flink你一般设置水位线设置多少

我们之前设置的水位线是6s

52、Flink任务提交流程

Flink任务提交后,Client向HDFS上传Flink的jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动
ApplicationMaster,ApplicationMaster启动后加载Flink的jar包和配置构建环境,然后启动JobManager;之后Application Master向ResourceManager申请资源启动TaskManager
,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在的节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动向JobManager发送心跳,并等待JobManager向其分配任务。

53、Flink技术架构图

54、flink如何实现在指定时间进行计算。

55、手写Flink topN

57、Flink的Join算子有哪些

一般join是发生在window上面的:

1、window join,即按照指定的字段和滚动滑动窗口和会话窗口进行 inner join

2、是coGoup 其实就是left join 和 right join,

3、interval join 也就是 在窗口中进行join 有一些问题,因为有些数据是真的会后到的,时间还很长,那么这个时候就有了interval join但是必须要是事件时间,并且还要指定watermark和水位以及获取事件时间戳。并且要设置 偏移区间,因为join 也不能一直等的。

58、Flink1.10 有什么新特性吗?

内存管理及配置优化

Flink 目前的 TaskExecutor 内存模型存在着一些缺陷,导致优化资源利用率比较困难,例如:

  • 流和批处理内存占用的配置模型不同
  • 流处理中的 RocksDB state backend 需要依赖用户进行复杂的配置

为了让内存配置变的对于用户更加清晰、直观,Flink 1.10 对 TaskExecutor 的内存模型和配置逻辑进行了较大的改动 (FLIP-49 [7])。这些改动使得 Flink 能够更好地适配所有部署环境(例如 Kubernetes, Yarn, Mesos),让用户能够更加严格的控制其内存开销。

Managed 内存扩展

Managed 内存的范围有所扩展,还涵盖了 RocksDB state backend 使用的内存。尽管批处理作业既可以使用堆内内存也可以使用堆外内存,使用 RocksDB state backend 的流处理作业却只能利用堆外内存。因此为了让用户执行流和批处理作业时无需更改集群的配置,我们规定从现在起 managed 内存只能在堆外。

简化 RocksDB 配置

此前,配置像 RocksDB 这样的堆外 state backend 需要进行大量的手动调试,例如减小 JVM 堆空间、设置 Flink 使用堆外内存等。现在,Flink 的开箱配置即可支持这一切,且只需要简单地改变 managed 内存的大小即可调整 RocksDB state backend 的内存预算。

另一个重要的优化是,Flink 现在可以限制 RocksDB 的 native 内存占用,以避免超过总的内存预算—这对于 Kubernetes 等容器化部署环境尤为重要。

统一的作业提交逻辑
在此之前,提交作业是由执行环境负责的,且与不同的部署目标(例如 Yarn, Kubernetes, Mesos)紧密相关。这导致用户需要针对不同环境保留多套配置,增加了管理的成本。

在 Flink 1.10 中,作业提交逻辑被抽象到了通用的 Executor 接口。新增加的 ExecutorCLI (引入了为任意执行目标指定配置参数的统一方法。此外,随着引入 JobClient负责获取 JobExecutionResult,获取作业执行结果的逻辑也得以与作业提交解耦。

原生 Kubernetes 集成(Beta)

对于想要在容器化环境中尝试 Flink 的用户来说,想要在 Kubernetes 上部署和管理一个 Flink standalone 集群,首先需要对容器、算子及像 kubectl 这样的环境工具有所了解。

在 Flink 1.10 中,我们推出了初步的支持 session 模式的主动 Kubernetes 集成(FLINK-9953)。其中,“主动”指 Flink ResourceManager (K8sResMngr) 原生地与 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一样按需申请 pod。用户可以利用 namespace,在多租户环境中以较少的资源开销启动 Flink。这需要用户提前配置好 RBAC 角色和有足够权限的服务账号。

Table API/SQL: 生产可用的 Hive 集成

Flink 1.9 推出了预览版的 Hive 集成。该版本允许用户使用 SQL DDL 将 Flink 特有的元数据持久化到 Hive Metastore、调用 Hive 中定义的 UDF 以及读、写 Hive 中的表。Flink 1.10 进一步开发和完善了这一特性,带来了全面兼容 Hive 主要版本的生产可用的 Hive 集成。

Batch SQL 原生分区支持

此前,Flink 只支持写入未分区的 Hive 表。在 Flink 1.10 中,Flink SQL 扩展支持了 INSERT OVERWRITE 和 PARTITION 的语法(FLIP-63 ),允许用户写入 Hive 中的静态和动态分区。

  • 写入静态分区
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;
  • 写入动态分区
INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

对分区表的全面支持,使得用户在读取数据时能够受益于分区剪枝,减少了需要扫描的数据量,从而大幅提升了这些操作的性能。

另外,除了分区剪枝,Flink 1.10 的 Hive 集成还引入了许多数据读取方面的优化,例如:

  • 投影下推:Flink 采用了投影下推技术,通过在扫描表时忽略不必要的域,最小化 Flink 和 Hive 表之间的数据传输量。这一优化在表的列数较多时尤为有效。
  • LIMIT 下推:对于包含 LIMIT 语句的查询,Flink 在所有可能的地方限制返回的数据条数,以降低通过网络传输的数据量。
  • 读取数据时的 ORC 向量化: 为了提高读取 ORC 文件的性能,对于 Hive 2.0.0 及以上版本以及非复合数据类型的列,Flink 现在默认使用原生的 ORC 向量化读取器。

59、Flink的重启策略

固定延迟重启策略

固定延迟重启策略是尝试给定次数重新启动作业。如果超过最大尝试次数,则作业失败。在两次连续重启尝试之间,会有一个固定的延迟等待时间。

故障率重启策略

故障率重启策略在故障后重新作业,当设置的故障率(failure rate)超过每个时间间隔的故障时,作业最终失败。在两次连续重启尝试之间,重启策略延迟等待一段时间。

无重启策略

作业直接失败,不尝试重启。

后备重启策略

使用群集定义的重新启动策略。这对于启用检查点的流式传输程序很有帮助。默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。

60、Flink什么时候用aggregate()或者process()

aggregate: 增量聚合

process: 全量聚合

当计算累加操作时候可以使用aggregate操作。

当计算窗口内全量数据的时候使用process,例如排序等操作。

61、Flink优化 你了解多少

62、Flink内存溢出怎么办

63、说说Flink中的keyState包含哪些数据结构

64、Flink shardGroup的概念

作者 east
Flink 3月 1,2021

Flink Stream SQL Join程序

场景说明

假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。

基于某些业务要求,开发的Flink应用程序实现功能:实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询。

数据规划

  • 业务1的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。Kafka配置参见样例数据规划章节。
  • 业务2的数据通过socket接收消息记录,可使用netcat命令用户输入模拟数据源。
    • 使用Linux命令netcat -l -p <port>,启动一个简易的文本服务器。
    • 启动应用程序连接netcat监听的port成功后,向netcat终端输入数据信息。

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,构造Table1,保证topic与producer一致。
  3. 从soket中读取数据,构造Table2。
  4. 使用Flink SQL对Table1和Table2进行联合查询,并进行打印。

Java样例代码

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

用户在开发前需要使用对接安全模式的FusionInsight Kafka,则需要引入FusionInsight的kafka-client-0.11.x.x.jar,该jar包可在FusionInsight client目录下获取。

下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.SqlJoinWithSocket

Java样例代码

  1. 每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。
//producer代码
public class WriteIntoKafka {

      public static void main(String[] args) throws Exception {

      // 打印出执行flink run的参考命令
        System.out.println("use command as: ");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

        System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

        System.out.println("******************************************************************************************");

        System.out.println("<topic> is the kafka topic name");

        System.out.println("<bootstrap.servers> is the ip:port list of brokers");

        System.out.println("******************************************************************************************");
       
        // 构造执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并发度
        env.setParallelism(1);
        // 解析运行参数
        ParameterTool paraTool = ParameterTool.fromArgs(args);
        // 构造流图,将自定义Source生成的数据写入Kafka
        DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());

        FlinkKafkaProducer010 producer = new FlinkKafkaProducer010<>(new FlinkKafkaProducer010<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));

        messageStream.addSink(producer);

        // 调用execute触发执行
        env.execute();
     }

// 自定义Source,每隔1s持续产生消息
public static class SimpleStringGenerator implements SourceFunction<String> {
        static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};

        static final String[] SEX = {"MALE", "FEMALE"};

        static final int COUNT = NAME.length;   

        boolean running = true;

        Random rand = new Random(47);

       @Override
        //rand随机产生名字,性别,年龄的组合信息
         public void run(SourceContext<String> ctx) throws Exception {

            while (running) {

                int i = rand.nextInt(COUNT);

                int age = rand.nextInt(70);

                String sexy = SEX[rand.nextInt(2)];

                ctx.collect(NAME[i] + "," + age + "," + sexy);

                thread.sleep(1000);

            }

    }

       @Override

       public void cancel() {

         running = false;

       }

     }

   }

2.生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。

public class SqlJoinWithSocket {
    public static void main(String[] args) throws Exception{

        final String hostname;

        final int port;

        System.out.println("use command as: ");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                + "--hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
                + "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");

        System.out.println("******************************************************************************************");
        System.out.println("<topic> is the kafka topic name");
        System.out.println("<bootstrap.servers> is the ip:port list of brokers");
        System.out.println("******************************************************************************************");

        try {
            final ParameterTool params = ParameterTool.fromArgs(args);

            hostname = params.has("hostname") ? params.get("hostname") : "localhost";

            port = params.getInt("port");

        } catch (Exception e) {
            System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");

            System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                    "type the input text into the command line");

            return;
        }
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        //基于EventTime进行处理
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        env.setParallelism(1);

        ParameterTool paraTool = ParameterTool.fromArgs(args);

        //Stream1,从Kafka中读取数据
        DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
                new SimpleStringSchema(),
                paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String s) throws Exception {
                String[] word = s.split(",");

                return new Tuple3<>(word[0], word[1], word[2]);
            }
        });

        //将Stream1注册为Table1
        tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");

        //Stream2,从Socket中读取数据
        DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n").
                map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        String[] words = s.split("\\s");
                        if (words.length < 2) {
                            return new Tuple2<>();
                        }

                        return new Tuple2<>(words[0], words[1]);
                    }
                });

        //将Stream2注册为Table2
        tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");

        //执行SQL Join进行联合查询
        Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                "FROM Table1 AS t1\n" +
                "JOIN Table2 AS t2\n" +
                "ON t1.name = t2.name\n" +
                "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");

        //将查询结果转换为Stream,并打印输出
        tableEnv.toAppendStream(result, Row.class).print();

        env.execute();
    }
}
作者 east
Flink 3月 1,2021

Flink 配置表与流JOIN程序

场景说明

场景说明

假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,基于某些业务要求,要求开发Flink的应用程序实现如下功能:

  • 实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息; 其中日志文本和csv格式表中的姓名字段可作为关键字,通过该值将两张表联合起来。
  • 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 data.txt:周末两天网民停留日志



LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 NotExist,female,200

  • configtable.csv:网民个人信息,第一列为姓名,第二列为年龄,第三列为公司,第四列为工作地点,第五列为学历,第六列为工作年数,第七列为手机号码,第八列为户籍所在地,第九列为毕业学校,csv标准格式,即分隔符为“,”


username,age,company,workLocation,educational,workYear,phone,nativeLocation,school LiuYang,25,Microsoft,hangzhou,college,5,13512345678,hangzhou zhejiang,wuhan university YuanJing,26,Oracle,shanghai,master,6,13512345679,shijiazhuang hebei,zhejiang university GuoYijun,27,Alibaba,beijing,college,7,13512345680,suzhou jiangsu,qinghua university CaiXuyu,28,Coca Cola,shenzheng,master,8,13512345681,hefei anhui,beijing university Liyuan,29,Tencent,chengdou,doctor,9,13512345682,nanchang jiangxi,nanjing university FangBo,30,Huawei,qingdao,doctor,10,13512345683,xiamen fujian,fudan university

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息,包含对应的个人详细信息。

主要分为七个部分:

  • 修改“import.properties”和“read.properties”配置,配置csv文件字段、Redis读取字段以及Redis的节点信息配置
  • 导入“configtable.csv”配置表进入Redis中存储起来。
  • 读取文本数据,生成相应DataStream,解析数据生成OriginalRecord信息。
  • 调用异步IO的函数,以OriginalRecord用户姓名字段为关键字在Redis中查询对应的个人信息,并转化为UserRecord。
  • 筛选出女性网民上网时间数据信息。
  • 按照姓名进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
  • 筛选连续上网时间超过阈值的用户,并获取结果。
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.java.utils.ParameterTool;

import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.io.File;
import java.io.FileReader;
import java.util.*;

/**
 * Read data from csv file and import to redis.
 */
public class RedisDataImport {
    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "java -cp /opt/FI-Client/Flink/flink/lib/*:/opt/FlinkConfigtableJavaExample.jar" +
                " com.huawei.bigdata.flink.examples.RedisDataImport --configPath <config filePath>" +
                "******************************************************************************************\n" +
                "<config filePath> is for configure file to load\n" +
                "you may write following content into config filePath: \n" +
                "CsvPath=config/configtable.csv\n" +
                "CsvHeaderExist=true\n" +
                "ColumnNames=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // read all configures
        final String configureFilePath = ParameterTool.fromArgs(args).get("configPath", "config/import.properties");
        final String csvFilePath = ParameterTool.fromPropertiesFile(configureFilePath).get("CsvPath", "config/configtable.csv");
        final boolean isHasHeaders = ParameterTool.fromPropertiesFile(configureFilePath).getBoolean("CsvHeaderExist", true);
        final String csvScheme = ParameterTool.fromPropertiesFile(configureFilePath).get("ColumnNames");
        final String redisIPPort = ParameterTool.fromPropertiesFile(configureFilePath).get("Redis_IP_Port");

        // init redis client
        Set<HostAndPort> hosts = new HashSet<HostAndPort>();
        for (String hostAndPort : redisIPPort.split(",")) {
            hosts.add(new HostAndPort(hostAndPort.split(":")[0], Integer.parseInt(hostAndPort.split(":")[1])));
        }
        final JedisCluster client = new JedisCluster(hosts, 15000);

        // get all files under csv file path
        ArrayList<File> files = getListFiles(csvFilePath);
        System.out.println("Read file or directory under  " + csvFilePath
                + ", total file num: " + files.size() + ", columns: " + csvScheme);

        // run read csv file and analyze it
        for (int index = 0; index < files.size(); index++) {
            readWithCsvBeanReader(files.get(index).getAbsolutePath(), csvScheme, isHasHeaders, client);
        }
        client.close();
        System.out.println("Data import finish!!!");
    }

    public static ArrayList<File> getListFiles(Object obj) {
        File directory = null;
        if (obj instanceof File) {
            directory = (File) obj;
        } else {
            directory = new File(obj.toString());
        }
        ArrayList<File> files = new ArrayList<File>();
        if (directory.isFile()) {
            files.add(directory);
            return files;
        } else if (directory.isDirectory()) {
            File[] fileArr = directory.listFiles();
            for (int i = 0; i < fileArr.length; i++) {
                File fileOne = fileArr[i];
                files.addAll(getListFiles(fileOne));
            }
        }
        return files;
    }

    /**
     * Sets up the processors used for read csv. There are 9 CSV columns. Empty
     * columns are read as null (hence the NotNull() for mandatory columns).
     *
     * @return the cell processors
     */
    private static CellProcessor[] getProcessors() {
        final CellProcessor[] processors = new CellProcessor[] {
                new NotNull(), // username
                new NotNull(), // age
                new NotNull(), // company
                new NotNull(), // workLocation
                new NotNull(), // educational
                new NotNull(), // workYear
                new NotNull(), // phone
                new NotNull(), // nativeLocation
                new NotNull(), // school
        };

        return processors;
    }

    private static void readWithCsvBeanReader(String path, String csvScheme, boolean isSkipHeader, JedisCluster client) throws Exception {
        ICsvBeanReader beanReader = null;
        try {
            beanReader = new CsvBeanReader(new FileReader(path), CsvPreference.STANDARD_PREFERENCE);

            // the header elements are used to map the values to the bean (names must match)
            final String[] header = isSkipHeader ? beanReader.getHeader(true) : csvScheme.split(",");
            final CellProcessor[] processors = getProcessors();

            UserInfo userinfo;
            while( (userinfo = beanReader.read(UserInfo.class, header, processors)) != null ) {
                System.out.println(String.format("lineNo=%s, rowNo=%s, userinfo=%s", beanReader.getLineNumber(),
                        beanReader.getRowNumber(), userinfo));

                // set redis key and value
                client.hmset(userinfo.getKeyValue(), userinfo.getMapInfo());
            }
        }
        finally {
            if( beanReader != null ) {
                beanReader.close();
            }
        }
    }



    // define the UserInfo structure
    public static class UserInfo {
        private String username;
        private String age;
        private String company;
        private String workLocation;
        private String educational;
        private String workYear;
        private String phone;
        private String nativeLocation;
        private String school;


        public UserInfo() {

        }

        public UserInfo(String nm, String a, String c, String w, String e, String wy, String p, String nl, String sc) {
            username = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
        }

        public String toString() {
            return "UserInfo-----[username: " + username + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school + "]";
        }

        // get key
        public String getKeyValue() {
            return username;
        }

        public Map<String, String> getMapInfo() {
            Map<String, String> info = new HashMap<String, String>();
            info.put("username", username);
            info.put("age", age);
            info.put("company", company);
            info.put("workLocation", workLocation);
            info.put("educational", educational);
            info.put("workYear", workYear);
            info.put("phone", phone);
            info.put("nativeLocation", nativeLocation);
            info.put("school", school);
            return info;
        }

        /**
         * @return the username
         */
        public String getUsername() {
            return username;
        }

        /**
         * @param username
         *            the username to set
         */
        public void setUsername(String username) {
            this.username = username;
        }

        /**
         * @return the age
         */
        public String getAge() {
            return age;
        }

        /**
         * @param age
         *            the age to set
         */
        public void setAge(String age) {
            this.age = age;
        }

        /**
         * @return the company
         */
        public String getCompany() {
            return company;
        }

        /**
         * @param company
         *            the company to set
         */
        public void setCompany(String company) {
            this.company = company;
        }

        /**
         * @return the workLocation
         */
        public String getWorkLocation() {
            return workLocation;
        }

        /**
         * @param workLocation
         *            the workLocation to set
         */
        public void setWorkLocation(String workLocation) {
            this.workLocation = workLocation;
        }

        /**
         * @return the educational
         */
        public String getEducational() {
            return educational;
        }

        /**
         * @param educational
         *            the educational to set
         */
        public void setEducational(String educational) {
            this.educational = educational;
        }

        /**
         * @return the workYear
         */
        public String getWorkYear() {
            return workYear;
        }

        /**
         * @param workYear
         *            the workYear to set
         */
        public void setWorkYear(String workYear) {
            this.workYear = workYear;
        }

        /**
         * @return the phone
         */
        public String getPhone() {
            return phone;
        }

        /**
         * @param phone
         *            the phone to set
         */
        public void setPhone(String phone) {
            this.phone = phone;
        }

        /**
         * @return the nativeLocation
         */
        public String getNativeLocation() {
            return nativeLocation;
        }

        /**
         * @param nativeLocation
         *            the nativeLocation to set
         */
        public void setNativeLocation(String nativeLocation) {
            this.nativeLocation = nativeLocation;
        }

        /**
         * @return the school
         */
        public String getSchool() {
            return school;
        }

        /**
         * @param school
         *            the school to set
         */
        public void setSchool(String school) {
            this.school = school;
        }
    }
}
package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.*;
import java.util.concurrent.TimeUnit;
/**
 * Read stream data and join from configure table from redis.
 */
public class FlinkConfigtableJavaExample {

    public static void main(String[] args) throws Exception {
        // print comment for command to use run flink
        System.out.println("use command as: \n" +
                "./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkConfigtableJavaExample" +
                " -m yarn-cluster -yt /opt/config -yn 3 -yjm 1024 -ytm 1024 " +
                "/opt/FlinkConfigtableJavaExample.jar --dataPath config/data.txt" +
                "******************************************************************************************\n" +
                "Especially you may write following content into config filePath, as in config/read.properties: \n" +
                "ReadFields=username,age,company,workLocation,educational,workYear,phone,nativeLocation,school\n" +
                "Redis_IP_Port=SZV1000064084:22400,SZV1000064082:22400,SZV1000064085:22400\n" +
                "******************************************************************************************");

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // get configure and read data and transform to OriginalRecord
        final String dataPath = ParameterTool.fromArgs(args).get("dataPath", "config/data.txt");
        DataStream<OriginalRecord> originalStream = env.readTextFile(
                dataPath
        ).map(new MapFunction<String, OriginalRecord>() {
            @Override
            public OriginalRecord map(String value) throws Exception {
                return getRecord(value);
            }
        }).assignTimestampsAndWatermarks(
                new Record2TimestampExtractor()
        ).disableChaining();

        // read from redis and join to the whole user information
        AsyncFunction<OriginalRecord, UserRecord> function = new AsyncRedisRequest();
        // timeout set to 2 minutes, max parallel request num set to 5, you can modify this to optimize
        DataStream<UserRecord> result = AsyncDataStream.unorderedWait(
                originalStream,
                function,
                2,
                TimeUnit.MINUTES,
                5);

        // data transform
        result.filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.sexy.equals("female");
            }
        }).keyBy(
                new UserRecordSelector()
        ).window(
                TumblingEventTimeWindows.of(Time.seconds(30))
        ).reduce(new ReduceFunction<UserRecord>() {
            @Override
            public UserRecord reduce(UserRecord value1, UserRecord value2)
                    throws Exception {
                value1.shoppingTime += value2.shoppingTime;
                return value1;
            }
        }).filter(new FilterFunction<UserRecord>() {
            @Override
            public boolean filter(UserRecord value) throws Exception {
                return value.shoppingTime > 120;
            }
        }).print();

        // execute program
        env.execute("FlinkConfigtable java");
    }

    private static class UserRecordSelector implements KeySelector<UserRecord, String> {
        @Override
        public String getKey(UserRecord value) throws Exception {
            return value.name;
        }
    }

    // class to set watermark and timestamp
    private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<OriginalRecord> {

        // add tag in the data of datastream elements
        @Override
        public long extractTimestamp(OriginalRecord element, long previousTimestamp) {
            return System.currentTimeMillis();
        }

        // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready
        @Override
        public Watermark checkAndGetNextWatermark(OriginalRecord element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1);
        }
    }

    private static OriginalRecord getRecord(String line) {
        String[] elems = line.split(",");
        assert elems.length == 3;
        return new OriginalRecord(elems[0], elems[1], Integer.parseInt(elems[2]));
    }

    public static class OriginalRecord {
        private String name;
        private String sexy;
        private int shoppingTime;

        public OriginalRecord(String n, String s, int t) {
            name = n;
            sexy = s;
            shoppingTime = t;
        }
    }

    public static class UserRecord {
        private String name;
        private int age;
        private String company;
        private String workLocation;
        private String educational;
        private int workYear;
        private String phone;
        private String nativeLocation;
        private String school;
        private String sexy;
        private int shoppingTime;

        public UserRecord(String nm, int a, String c, String w, String e, int wy, String p, String nl, String sc, String sx, int st) {
            name = nm;
            age = a;
            company = c;
            workLocation = w;
            educational = e;
            workYear = wy;
            phone = p;
            nativeLocation = nl;
            school = sc;
            sexy = sx;
            shoppingTime = st;
        }

        public void setInput(String input_nm, String input_sx, int input_st) {
            name = input_nm;
            sexy = input_sx;
            shoppingTime = input_st;
        }

        public String toString() {
            return "UserRecord-----name: " + name + "  age: " + age + "  company: " + company
                    + "  workLocation: " + workLocation + "  educational: " + educational
                    + "  workYear: " + workYear + "  phone: " + phone + "  nativeLocation: " + nativeLocation + "  school: " + school
                    + "  sexy: " + sexy + "  shoppingTime: " + shoppingTime;
        }
    }

    public static class AsyncRedisRequest extends RichAsyncFunction<OriginalRecord, UserRecord>{
        private String fields = "";
        private transient JedisCluster client;
        private LoadingCache<String, UserRecord> cacheRecords;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            // init cache builder
            cacheRecords = CacheBuilder.newBuilder()
                    .maximumSize(10000)
                    .expireAfterAccess(7, TimeUnit.DAYS)
                    .build(new CacheLoader<String, UserRecord>() {
                        public UserRecord load(String key) throws Exception {
                            //load from redis
                            return loadFromRedis(key);
                        }
                    });

            // get configure from config/read.properties, you must put this with commands:
            // ./bin/yarn-session.sh -t config -n 3 -jm 1024 -tm 1024 or
            // ./bin/flink run -m yarn-cluster -yt config -yn 3 -yjm 1024 -ytm 1024 /opt/test.jar
            String configPath = "config/read.properties";
            fields = ParameterTool.fromPropertiesFile(configPath).get("ReadFields");
            final String hostPort = ParameterTool.fromPropertiesFile(configPath).get("Redis_IP_Port");
            // create jedisCluster client
            Set<HostAndPort> hosts = new HashSet<HostAndPort>();
            for (String node : hostPort.split(",")) {
                hosts.add(new HostAndPort(node.split(":")[0], Integer.parseInt(node.split(":")[1])));
            }
            client = new JedisCluster(hosts, 60000);
            System.out.println("JedisCluster init, getClusterNodes: " + client.getClusterNodes().size());
        }

        @Override
        public void close() throws Exception {
            super.close();

            if (client != null) {
                System.out.println("JedisCluster close!!!");
                client.close();
            }
        }

        public UserRecord loadFromRedis(final String key) throws Exception {
            if (client.getClusterNodes().size() <= 0) {
                System.out.println("JedisCluster init failed, getClusterNodes: " + client.getClusterNodes().size());
            }
            if (!client.exists(key)) {
                System.out.println("test-------cannot find data to key:  " + key);
                return new UserRecord(
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        0,
                        "null",
                        "null",
                        "null",
                        "null",
                        0);
            } else {
                // get some fields
                List<String> values = client.hmget(key, fields.split(","));
                System.out.println("test-------key: " + key + "  get some fields:  " + values.toString());
                return new UserRecord(
                        values.get(0),
                        Integer.parseInt(values.get(1)),
                        values.get(2),
                        values.get(3),
                        values.get(4),
                        Integer.parseInt(values.get(5)),
                        values.get(6),
                        values.get(7),
                        values.get(8),
                        "null",
                        0);
            }
        }

        public void asyncInvoke(final OriginalRecord input, final AsyncCollector<UserRecord> collector) throws Exception {
            // set key string, if you key is more than one column, build your key string with columns
            String key = input.name;
            UserRecord info = cacheRecords.get(key);
            info.setInput(input.name, input.sexy, input.shoppingTime);
            collector.collect(Collections.singletonList(info));
        }
    }
}
作者 east
Flink 3月 1,2021

Flink Job Pipeline程序

场景说明

场景说明

本样例中发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据。

数据规划

  1. 发布者Job使用自定义算子每秒钟产生10000条数据
  2. 数据包含两个属性:分别是Int和String类型
  3. 配置文件
    • nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:nettyconnector.registerserver.topic.storage: /flink/nettyconnector
    • nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:nettyconnector.sinkserver.port.range: 28444-28943
    • nettyconnector.sinkserver.subnet:设置网络所属域,例如:nettyconnector.sinkserver.subnet: 10.162.0.0/16
  4. 接口说明
    • 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */ void start(Configuration configuration) throws Exception; /** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */ void createTopicNode(String topic) throw Exception; /** *将信息注册到某个topic节点(目录)下 * @param topic 需要注册到的目录 * @param registerRecord 需要注册的信息 */ void register(String topic, RegisterRecord registerRecord) throws Exception; /** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception; /** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */ void unregister(String topic, int recordId) throws Exception; /** * 查寻信息 * @param 查询信息所在的topic *@recordId 查询信息的ID */ RegisterRecord query(String topic, int recordId) throws Exception; /** * 查询某个Topic是否存在 * @param topic */ Boolean isExist(String topic) throws Exception; /** *关闭注册服务器句柄 */ void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。
    • NettySink算子Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler, int numberOfSubscribedJobs)
      • name:为本NettySink的名称。
      • topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
      • registerServerHandler: 为注册服务器的句柄。
      • numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
    • NettySource算子Class NettySource(String name, String topic, RegisterServerHandler registerServerHandler)
      • name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
      • topic:订阅的NettySink的topic。
      • registerServerHandler:为注册服务器的句柄。

说明:

NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。

开发思路

1. 一个Job作为发布者Job,其余两个作为订阅者Job

2. 发布者Job自己产生数据将其转化成byte[],分别向订阅者发送

3. 订阅者收到byte[]之后将其转化成String类型,并抽样打印输出

Java版代码:

  1. 发布Job自定义Source算子产生数据
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
import java.io.Serializable;
 
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
 
    private boolean isRunning = true;
 
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
 
    }
/**
    * 数据产生函数,每秒钟产生10000条数据
   */
    public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
 
        while(isRunning) {
            for (int i = 0; i < 10000; i++) {
                ctx.collect(Tuple2.of(i, "hello-" + i));
            }
            Thread.sleep(1000);
        }
    }
 
    public void close() {
        isRunning = false;
    }
 
    public void cancel() {
        isRunning = false;
    }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySink {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置job的并发度为2
        env.setBufferTimeout(2);
 
// 创建Zookeeper的注册服务器handler
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// 添加自定义Source算子
        env.addSource(new UserSource())
                .keyBy(0)
                .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
                    //将发送信息转化成字节数组
@Override
                    public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                        return integerStringTuple2.f1.getBytes();
                    }
                }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。
 
        env.execute();
 
    }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource1 {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置job的并发度为2        
env.setParallelism(2);
 
// 创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的消息
        env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
                  // 将接收到的字节流转化成字符串  
    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource2 {
 
    public static void main(String[] args) throws Exception {
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置作业的并发度为2       
 env.setParallelism(2);
 
//创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的数据
        env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
          //将接收到的字节数组转化成字符串
                    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}

Scala样例代码

  1. 发送消息
package com.huawei.bigdata.flink.examples
 
case class Inforamtion(index: Int, content: String) {
 
  def this() = this(0, "")
}
  1. 发布者job自定义source算子产生数据
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
 
  var isRunning = true
 
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
   
  }
 
// 每秒钟产生10000条数据
  override def run(sourceContext: SourceContext[Inforamtion]) = {
 
    while (isRunning) {
      for (i <- 0 until 10000) {
        sourceContext.collect(Inforamtion(i, "hello-" + i));
 
      }
      Thread.sleep(1000)
    }
  }
 
  override def close(): Unit = super.close()
 
  override def cancel() = {
    isRunning = false
  }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
object TestPipeline_NettySink {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置job的并发度为2    
env.setParallelism(2)
//设置Zookeeper为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加用户自定义算子产生数据    
env.addSource(new UserSource)
      .keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组
      .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据
 
    env.execute()
  }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource1 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置Job的并发度为2  
  env.setParallelism(2)
//设置Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收来自发布者的数据
    env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
      .map(x => (1, new String(x)))//将接收到的字节流转化成字符串
      .filter(x => {
        Random.nextInt(50000) == 10
      })
      .print
 
    env.execute()
  }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource2 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job的并发度为2   
 env.setParallelism(2)
//创建Zookeeper作为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收数据    
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
      .map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串
      .filter(x=>{
        Random.nextInt(50000) == 10
      })
      .print()
 
    env.execute()
  }
}
作者 east

上一 1 … 5 6 7 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.