gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
Flink 4月 19,2025

两种Flink 生成水印的方案的对比

在开发中用到flink下面2种生成水印方式,它们优劣和适用场景如何。

方案1:
.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[RunData](Duration.ofSeconds(5))
          .withIdleness(Duration.ofSeconds(5))
          .withTimestampAssigner(new SerializableTimestampAssigner[RunData] {
            override def extractTimestamp(element: RunData, recordTimestamp: Long): Long = {
              Math.max(element.getTime.longValue(), recordTimestamp)            
            }
          })
      ),

方案2:
.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[RunData](Duration.ofSeconds(5))
          .withIdleness(Duration.ofSeconds(5))
          .withTimestampAssigner(new SerializableTimestampAssigner[RunData] {
            override def extractTimestamp(element: RunData, recordTimestamp: Long): Long = {
              element.getTime.longValue()      
            }
          })
      )


✅ 方案区别概述:

方案时间戳提取逻辑
方案1使用 Math.max(element.getTime, recordTimestamp) 取当前数据时间戳与 recordTimestamp 的较大值
方案2直接返回 element.getTime,不考虑 recordTimestamp

🧠 方案分析


🔹方案1:使用 Math.max(element.getTime, recordTimestamp)

✅ 优点:

  1. 更稳健处理乱序数据:
    • 如果 recordTimestamp(通常为前一条数据的时间戳)大于当前数据的时间戳,那么直接使用 element.getTime 会导致时间回退,从而影响 Watermark 计算,进而导致窗口触发不正确。
    • 该逻辑防止了 “时间倒退” 的现象。
  2. 适合数据源存在时钟漂移或乱序程度较高的情况:
    • 尤其是传感器数据、日志数据等不按顺序到达场景。
    • 若数据有一定延迟,选最大值可防止系统过早推进 Watermark。

❌ 缺点:

  1. 可能高估时间戳,导致 Watermark 滞后:
    • 如果 recordTimestamp 一直偏大,会导致 Watermark = 当前最大时间戳 - 允许延迟 总是处于偏后的状态,进而 延迟窗口触发时间,影响下游计算的实时性。
  2. 不太适用于严格顺序数据或延迟容忍低的场景:
    • 因为该策略会让窗口的计算始终比实际事件时间慢。

✅ 适用场景:

  • 数据存在明显乱序或可能存在时间回退的场景。
  • IoT 场景、传感器采集、日志采集系统。
  • 对准确性高于实时性的业务逻辑(如:统计分析类任务)。

🔹方案2:使用 element.getTime 直接提取时间戳

✅ 优点:

  1. 时间戳准确,还原数据本身的时间:
    • 直接以 数据本身时间 为准,更加符合事件时间语义。
    • 如果数据是有序或近乎有序的,它能够 精准推进 Watermark,使窗口 尽早触发。
  2. 低延迟,适合对实时性要求高的场景:
    • 例如实时告警系统、实时监控、业务中台。

❌ 缺点:

  1. 存在时间回退风险:
    • 如果某条数据时间比前面数据的还小,可能导致 Watermark 被“拉回”,系统可能拒绝处理这条数据或错误触发窗口。
  2. 依赖数据源严格有序性或 Watermark 容错时间足够长:
    • 对乱序不友好,需要配合合理的 forBoundedOutOfOrderness 延迟容忍时间。

✅ 适用场景:

  • 数据基本有序或有轻微乱序(容忍在 Duration.ofSeconds(5) 内)。
  • 实时性要求高的业务场景(如金融交易监控、实时用户行为处理)。
  • 数据源可靠性高(Kafka 的 event-time 顺序性高)。

✅ 总结对比表

项目方案1(Math.max)方案2(直接提取)
容错能力⭐⭐⭐⭐(强)⭐⭐(弱)
实时性⭐⭐(延迟高)⭐⭐⭐⭐(延迟低)
复杂数据处理⭐⭐⭐⭐⭐⭐
精确性(恢复事件时间)⭐⭐⭐⭐⭐⭐⭐
是否适合乱序数据是否(需轻度乱序)
场景推荐传感器、日志、数据分析实时告警、交易监控、行为分析

💡 建议:

  • 如果你对 数据顺序不确定、业务允许延迟:选 方案1。
  • 如果你 清楚数据较有序、实时性优先:选 方案2。
  • 两者都设置了 .withIdleness(Duration.ofSeconds(5)),意味着在数据空闲 5 秒内,Flink 会认为该 source 闲置,避免整个 Watermark 卡住 —— 这是对空数据源的一种保护机制,适合多 source 场景。

如果 用 Math.min(eventTime, recordTimestamp) 的效果是啥?

理论上的优势:

优势说明
✅ 防止因“异常大时间戳”推高 Watermark有些数据可能时间戳未来太远(如某个设备出错时间为2099年),使用 min 可以防止 Watermark 被拉得太靠前,导致后续正常数据全是迟到数据
✅ 更严谨处理数据乱跳若数据到达时间波动大,但我们只取历史最小时间戳推进,可以更“保守”处理

❌ 实际上的问题和风险:

问题说明
❌ 时间倒退严重如果 recordTimestamp 比当前数据早,就永远使用旧的时间,窗口不会推进,Watermark卡死
❌ Watermark 不会正常前进因为时间戳总是被压制成“更早的”,所以 Watermark 永远低于真实事件时间
❌ 数据无法被触发处理Flink 的窗口系统等 Watermark 过去“窗口边界”才会触发计算,这种写法可能导致窗口永远不触发,任务“看起来没问题但没产出”!
作者 east
bug清单, Flink 3月 5,2025

解决Flink SQL:Exception in thread “main” org.apache.flink.table.api.ValidationException: Rowtime attribute ‘ptime’ must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type ‘BIGINT’.

在开发Flink SQL时报错:

在flink 1.16版本中执行报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: Rowtime attribute 'ptime' must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type 'BIGINT'.

	at org.apache.flink.table.api.TableSchema.validateColumnsAndWatermarkSpecs(TableSchema.java:535)

	at org.apache.flink.table.api.TableSchema.access$100(TableSchema.java:73)

	at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:802)

	at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.build(MergeTableLikeUtil.java:534)

	at org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:154)

	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:171)

	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:74)

	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:330)

	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)

	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)

	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:758)

	at com.chuneng.saas.doris.FlinkDorisExtremeValueCalculation.main(FlinkDorisExtremeValueCalculation.java:44)

原因分析

错误的核心在于:Flink 要求用于定义 WATERMARK 的字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型,但你的 ptime 字段被定义为 BIGINT 类型。尽管你在 WATERMARK 中尝试将 ptime 转换为 TIMESTAMP,但 Flink 的 WATERMARK 语法要求直接引用一个已经存在的 TIMESTAMP 类型字段,而不是在 WATERMARK 定义中动态转换类型。

解决方案

你需要通过 计算列(Computed Column) 将 ptime 的 BIGINT 类型转换为 TIMESTAMP 类型,然后将 WATERMARK 绑定到计算列上。

例如像下面这样:

String sourceDDL = "CREATE TABLE ods_t_iot_data (" +
        "pid STRING," +
        "pvalue DECIMAL(13,3)," +
        "ptime BIGINT," +
        "ds DATE," +
        // 定义计算列,将 BIGINT 类型的 ptime 转换为 TIMESTAMP
        "event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ptime / 1000))," + 
        // 绑定 WATERMARK 到计算列 event_time
        "WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '10.0.0.1:8030'," +
        "'table.identifier' = 'cnsaas.ods_t_iot'," +
        "'username' = 'root'," +
        "'password' = ''" +
        ")";
作者 east
bug清单, Flink 3月 5,2025

解决Flink SQL报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”

在开发Flink SQL时报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”

这个错误表明您的Flink作业涉及更新或删除操作(如CDC场景),而JDBC Sink表没有定义主键。以下是详细分析和解决方案:

错误原因分析

当Flink作业涉及UPDATE/DELETE操作时(常见于CDC场景或使用Retract流),JDBC Sink表必须显式声明主键,因为:

  1. 主键用于定位目标数据库中需要更新的记录
  2. 没有主键时,Flink无法确定如何执行更新/删除操作

解决方案

在表定义中添加 PRIMARY KEY 声明(需与目标数据库表主键一致),例如:

val sinkDDL =
  """
    |CREATE TABLE dws_iot_extreme_info_min (
    |  device_code STRING,
    |  `day` DATE,
    |  ptime BIGINT,
    |  station_sn STRING,
    |  unit STRING,
    |  cu_device_code STRING,
    |  max_u_value DECIMAL(5,3),
    |  min_u_value DECIMAL(5,3),
    |  max_t_value DECIMAL(5,2),
    |  min_t_value DECIMAL(5,2),
    |  max_soc_value DECIMAL(5,2),
    |  min_soc_value DECIMAL(5,2),
    |  max_u_time BIGINT,
    |  min_u_time BIGINT,
    |  max_t_time BIGINT,
    |  min_t_time BIGINT,
    |  max_soc_time BIGINT,
    |  min_soc_time BIGINT,
    |  dt DATE,
    |  PRIMARY KEY (device_code, `day`) NOT ENFORCED  -- 添加主键声明
    |) WITH (
    |  'connector' = 'jdbc',
    |  'url' = 'jdbc:mysql://10.0.2.2:3306/cnsaas',
    |  'table-name' = 'dws_bigdata_device_extreme_info_min',
    |  'driver' = 'com.mysql.cj.jdbc.Driver',
    |  'username' = 'root',
    |  'password' = '',
    |  'sink.buffer-flush.max-rows' = '1000',    
    |  'sink.buffer-flush.interval' = '1s',      
    |  'sink.max-retries' = '3'                 
    |)
    |""".stripMargin

关键修改点说明

  1. 主键声明:PRIMARY KEY (device_code, `day`) NOT ENFORCED
    • 主键字段需与目标数据库表的主键一致
    • NOT ENFORCED 表示Flink不会校验数据主键约束,由数据库负责
  2. 目标表要求:
    • MySQL数据库中 dws_iot_extreme_info_min 表必须有相同的主键定义
    • 可通过以下SQL确保主键存在:
    • ALTER TABLE dws_iot_extreme_info_min ADD PRIMARY KEY (device_code, day);
作者 east
Flink 3月 5,2025

Flink Checkpoint 详解

一、Checkpoint 的原理

  1. 核心概念
    Checkpoint 是 Flink 的容错机制,通过定期生成分布式快照,记录流处理应用的全局状态(算子状态、键控状态等)。当发生故障时,Flink 可从最近的 Checkpoint 恢复,保证 Exactly-Once 语义。
  2. 实现机制
    • Chandy-Lamport 算法:基于 Barrier 的分布式快照。
    • 流程:
      1. 触发:JobManager 定期触发 Checkpoint,向所有 Source 发送 Barrier。
      2. Barrier 传播:Source 插入 Barrier 到数据流,算子接收到 Barrier 后暂停处理新数据,将当前状态异步持久化。
      3. 状态存储:状态写入外部存储(如 HDFS、S3)。
      4. 确认机制:所有算子确认状态保存后,Checkpoint 完成。
  3. 一致性语义
    • EXACTLY_ONCE:精确一次,通过对齐 Barrier 确保状态与数据流严格一致。
    • AT_LEAST_ONCE:至少一次,可能重复处理数据。

二、Checkpoint 的使用方法

  1. 基础配置
    在 Flink 作业中启用 Checkpoint,设置间隔、存储路径和模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint,间隔 60 秒
env.enableCheckpointing(60000);
// 配置 Exactly-Once 语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置 Checkpoint 存储路径
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints/");
// 其他配置(超时时间、最小间隔等)
env.getCheckpointConfig().setCheckpointTimeout(30000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  1. 状态后端选择
    • FsStateBackend:状态存储在内存,快照存于文件系统(适合状态较小场景)。
    • RocksDBStateBackend:状态存储在 RocksDB,支持超大状态(需权衡性能)。
  2. 恢复作业
    通过命令行或 API 从指定 Checkpoint 恢复作业:bash复制./bin/flink run -s hdfs:///checkpoints/1234 …

三、Checkpoint 的应用场景

  1. 容错恢复
    节点故障、网络中断时,从 Checkpoint 恢复状态,避免数据丢失。
  2. 有状态计算
    • 窗口聚合(如每小时销售额统计)。
    • 复杂事件处理(CEP)中的模式状态。
    • 连接操作(如流-流 Join 的中间状态)。
  3. 作业升级与扩缩容
    通过 Savepoint(手动触发的 Checkpoint)暂停作业,修改并行度或代码后恢复。

四、Flink SQL 流式写入数据到表时为何需要 Checkpoint

  1. 保障 Exactly-Once 语义
    • 当使用 Flink SQL 写入 Kafka、JDBC 表等外部系统时,Checkpoint 通过**两阶段提交协议(2PC)**协调事务:
      1. 预提交阶段:数据写入外部系统,但未提交。
      2. 提交阶段:Checkpoint 完成后,所有事务统一提交。
    • 若故障发生,Flink 回滚到上一个 Checkpoint,确保数据不重复、不丢失。
  2. 维护内部状态一致性
    • 即使目标表不支持事务(如 HBase),Checkpoint 仍保障 Flink 内部状态(如去重状态、窗口状态)的正确恢复。
  3. 避免数据丢失
    • 未启用 Checkpoint 时,若作业崩溃,可能丢失未持久化的状态和数据,导致写入结果不完整。

五、示例:Flink SQL 写入 Kafka

-- 启用 Checkpoint(隐式通过 ExecutionConfig)
SET 'execution.checkpointing.interval' = '60s';

-- 定义 Kafka Sink 表
CREATE TABLE kafka_sink (
    user_id STRING,
    count BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'output_topic',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'sink.transactional-id-prefix' = 'tx-' -- 启用 Kafka 事务
);

-- 流式写入
INSERT INTO kafka_sink 
SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id;
  • 依赖 Checkpoint:Kafka Sink 通过事务提交机制与 Checkpoint 绑定,确保每条数据仅写入一次。

总结

  • Checkpoint 原理:基于 Barrier 的分布式快照,保障状态一致性。
  • 使用场景:容错、有状态计算、作业维护。
  • Flink SQL 写入表:Checkpoint 是保证端到端 Exactly-Once 的核心机制,协调外部系统事务与内部状态恢复。
作者 east
Flink 1月 24,2025

解决flink报错:org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] cannot be applied to (com.chuneng.saas.dao.SinkToTDengine) iotStream.sinkTo(new SinkToTDengine)

在scala用下面的代码,

iotStream.sinkTo(new SinkToTDengine)

出现报错:

overloaded method value sinkTo with alternatives:
  (sink: org.apache.flink.api.connector.sink2.Sink[com.chuneng.saas.vo.IotData])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] <and>
  (sink: org.apache.flink.api.connector.sink.Sink[com.chuneng.saas.vo.IotData, _, _, _])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData]
 cannot be applied to (com.chuneng.saas.dao.SinkToTDengine)
    iotStream.sinkTo(new SinkToTDengine)

遇到的错误是由于 sinkTo 方法期望的参数类型与提供的 SinkToTDengine 类型不匹配。具体来说,sinkTo 方法期望的是一个实现了 Sink 接口的类型,而您的 SinkToTDengine 类继承自 RichSinkFunction,这导致类型不兼容。

错误原因分析

这表明 sinkTo 方法期望的是 Sink[IotData] 或 Sink[IotData, _, _, _] 类型,而您传递的是 SinkToTDengine,它继承自 RichSinkFunction<IotData>,因此类型不匹配。

解决方案

要解决这个问题,您需要将 SinkToTDengine 转换为 Flink 支持的 Sink 类型。

使用 addSink 方法

Flink 提供了 addSink 方法,可以直接接受实现了 SinkFunction 的自定义 Sink。这是最直接和常用的方式。

修改后的代码示例:

iotStream
  .map(rd => {
    val iotData = new IotData()
    iotData.setPid(rd.getDeviceCode.replaceAll(".", "-"))
    iotData.setVal(rd.getCompensationMW.toString)

    // 将13位时间戳转换为%Y-%m-%d %H:%M:%S.%f格式,毫秒保留3位小数
    val timestamp = rd.getTime // 假设rd.getTime()返回的是13位时间戳(毫秒级)
    val instant = Instant.ofEpochMilli(timestamp)
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
      .withLocale(Locale.CHINA)
      .withZone(ZoneId.systemDefault())
    val formattedTime = formatter.format(instant)

    iotData.setTs(formattedTime)
    iotData
  })
  .print()
  .addSink(new SinkToTDengine()) // 使用 addSink 方法
作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7通过Flink Doris Connector写入Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage

在flink 1.7项目,通过Flink Doris Connector,采用批处理读取Doris数据进行计算然后写入到doris的另外一个表。采用flink sql方式。

原来的代码进行脱敏后的代码如下:

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);

// 数据源配置
String sourceDDL = "CREATE TABLE <SOURCE_TABLE_NAME>(" +
        "pid STRING," +
        "pvalue decimal(39,3)," +
        "ptime TIMESTAMP(3)," +
        "ds DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'" +
        ")";

tableEnvironment.executeSql(sourceDDL);
// 获取当前时间戳
String timestamp = String.valueOf(System.currentTimeMillis());

// 目标 Doris 表 DDL
String sinkDDL = "CREATE TABLE <TARGET_TABLE_NAME> (" +
        "station_sn STRING," +
        "pid_system_code STRING," +
        "`day` STRING," +
        "`value` STRING," +
        "created_at TIMESTAMP(3)," +
        "dt DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'," +
        "'sink.label-prefix' = '<LABEL_PREFIX>_" + timestamp + "'" +
        ")";

执行报错如下:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
	at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
	at com.chuneng.saas.doris.FlinkBatchSql.main(FlinkBatchSql.java:68)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
	at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
	at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
	... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage@265569e2
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.getCheckpointCommittables(CommittableCollector.java:241)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:748)

这可能是 Flink 批处理模式下通常不需要 checkpoint,但 Doris Sink Connector 默认可能依赖 checkpoint 相关逻辑,从而导致 NullPointerException。

添加与 sink 行为相关的参数, 设置不用 checkpoint 。

修改后的sink如下:

String sinkDDL = “CREATE TABLE (” +
“station_sn STRING,” +
“pid_system_code STRING,” +
“day STRING,” +
“value STRING,” +
“created_at TIMESTAMP(3),” +
“dt DATE” +
“) WITH (” +
“‘connector’ = ‘doris’,” +
“‘fenodes’ = ‘:’,” +
“‘table.identifier’ = ‘.’,” +
“‘username’ = ”,” +
“‘password’ = ”,” +
“‘sink.label-prefix’ = ‘_” + timestamp + “‘,” +

"'doris.batch.size' = '1000'," +  // 批量写入大小
"'sink.enable-2pc' = 'false'" + // 禁用两阶段提交
")";
作者 east
Flink 1月 23,2025

flink 1.12用Flink SQL写入Doris的坑

在flink 1.12,用flink sql写入doris,相关pom配置如下:

   <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.12_2.11</artifactId>
      <version>1.0.3</version>
    </dependency>

to_date('2025-01-14')实际写入到doris变成了另外一个日期,非常坑。而且 
Flink Connector 24.0.0 版本之后支持使用Arrow Flight SQL 读取数据 ,速度提高非常快。

flink-doris-connector各版本兼容如下。

版本兼容​

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11,1.12,1.13,1.140.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8–
1.3.01.161.0+8–
1.4.01.15,1.16,1.171.0+8–
1.5.21.15,1.16,1.17,1.181.0+8–
1.6.21.15,1.16,1.17,1.18,1.191.0+8–
24.0.11.15,1.16,1.17,1.18,1.19,1.201.0+8–

比较好选择是选择Flink 1.16以上(可以兼容hive语法90
%以上)。升级到flink 1.17后,to_date(‘2025-01-14’)返回结果果然正常了。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7 Exception in thread “main” org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table

问题分析

  1. 报错提示的主要内容
    • ValidationException: Unable to create a sink for writing table ...
    • Could not load service provider for factories 和 org.apache.flink.table.planner.delegation.DefaultExecutorFactory not a subtype。
    • 这些问题通常是因为 Flink 运行环境或依赖配置不正确。
  2. 可能原因
    • Flink 和 Doris 依赖版本不匹配:
      • 使用的 Flink Doris Connector 是 flink-doris-connector-1.17,其版本号为 24.0.1,需要确保它与当前 Flink 的版本(1.17.x)兼容。或者flink的jar包有的不是1.17.x版本,和上面的 link-doris-connector-1.17 不兼容。

解决方案

1. 检查 Flink 和 Doris Connector 的兼容性

  • 确认 Flink 和 Doris Connector 的版本兼容。
  • 当前使用的是 flink-doris-connector-1.17,对应 Flink 1.17.x。如果使用的是其他版本的 Flink(如 1.16 或 1.18),需要更换依赖:
<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>

2. 添加 Flink Doris Connector 所需的运行时依赖

确保项目中包含以下依赖(建议手动检查 pom.xml 是否缺失),检查flink的jar是否都是1.17.x版本:

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.0</version>
    <scope>provided</scope>
</dependency>

确保 flink-table-planner_2.12 版本与 Flink 版本匹配。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 找不到org.apache.flink.table.descriptors.TableDescriptor

flink版本1.7的项目代码如下:

   StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        // 设置 Flink SQL 环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);

运行报错找不到org.apache.flink.table.descriptors.TableDescriptor。

问题分析

  1. 依赖冲突或缺失:
    • Flink 1.17.2 中 TableDescriptor 类已被废弃。Flink 1.13 开始引入了 TableDescriptor 的新概念,用于定义表源和表目标,而旧版依赖中的 org.apache.flink.table.descriptors 相关类在后续版本中被逐步移除。
    • 如果代码中还有引用 org.apache.flink.table.descriptors 包下的类(如连接器或格式描述符),可能导致运行时报错。
  2. API 版本不匹配:
    • 在 Flink 1.17.2 中,推荐使用 Table API 的新方式(TableDescriptor 不再使用)。这可能意味着您正在使用旧版本的 API,或者您的代码依赖了不兼容的旧包。

解决方案

1. 检查代码中是否仍在使用旧版 API

移除任何对 org.apache.flink.table.descriptors 的直接依赖。使用以下代码替换旧方法:

// 创建 Flink 流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 创建 Flink SQL 表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

// 定义表源或目标时,使用 Table API 的新方式
TableDescriptor descriptor = TableDescriptor.forConnector("kafka") // 替换为实际使用的连接器
        .schema(Schema.newBuilder()
                .column("field1", DataTypes.STRING())
                .column("field2", DataTypes.INT())
                .build())
        .format("json") // 替换为实际使用的格式
        .option("property.key", "value") // 替换为实际连接器选项
        .build();

tableEnv.createTemporaryTable("my_table", descriptor);

2. 更新依赖

确保项目使用的依赖与 Flink 1.17.2 版本兼容。在 pom.xml 或 build.gradle 文件中明确声明以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.2</version>
    <scope>provided</scope>
</dependency>

3. 清理旧依赖

如果仍需要使用 TableDescriptor 类,请确认没有混用老旧版本的连接器或额外库,例如 flink-connector-kafka 等。检查项目中是否存在以下老依赖,并替换为新版依赖:

旧版依赖示例:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.12.x</version>
</dependency>

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList

运行flink 1.7的项目,报错如下:

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList at org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase.<init>(FlinkPreparingTableBase.java:92) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.<init>(ExpandingPreparingTable.java:42) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.<init>(QueryOperationCatalogViewTable.java:49) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.create(QueryOperationCatalogViewTable.java:58) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.convertQueryOperationView(FlinkCalciteCatalogReader.java:146) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.toPreparingTable(FlinkCalciteCatalogReader.java:110) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:91) at org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:229) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:144) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:110) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2490) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at com.chuneng.saas.doris.FlinkCuSohJdbcSqlAnalyze.main(FlinkCuSohJdbcSqlAnalyze.java:98) Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

  • 从报错信息 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList 和 Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 可以看出,程序在运行时无法找到 org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 这个类。
  • 这通常是因为相应的依赖库没有被正确地添加到项目的类路径中,导致 JVM 在运行时无法加载所需的类。

修改方案:

  1. 确认你是否在项目的构建文件(如 Maven 的 pom.xml 或 Gradle 的 build.gradle)中添加了 Apache Flink 相关的依赖。
  2. 确保使用的 Flink 版本是兼容的,并且其依赖的 Guava 版本是 flink-shaded-guava 的 18 版本。
  3. 对于 Maven 项目,检查 pom.xml 中是否有类似如下的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-guava</artifactId>
    <version>18.0</version>
</dependency>
  1. 对于 Gradle 项目,检查 build.gradle 中是否有类似如下的依赖:

implementation 'org.apache.flink:flink-shaded-guava:18.0'
  1. 如果已经添加了依赖,可能是因为依赖冲突导致无法找到正确的类。可以使用 mvn dependency:tree(对于 Maven)或 gradle dependencies(对于 Gradle)命令查看依赖树,找出是否有多个版本的 Guava 被引入,然后通过排除冲突的依赖来解决。
作者 east
Flink 1月 22,2025

Flink1.7官方文档中文翻译:及时流处理

简介#

及时流处理是有状态流处理的一种扩展,其中时间在计算中发挥一定作用。例如,在进行时间序列分析、基于特定时间段(通常称为窗口)进行聚合,或者在处理事件时事件发生的时间很关键等情况时,都会涉及到及时流处理。
在接下来的章节中,我们将重点介绍在使用 Flink 进行及时流处理应用开发时,需要考虑的一些主题。
返回顶部

时间概念:事件时间与处理时间#

在流处理程序中提及时间(例如定义窗口时),可以涉及不同的时间概念:

  • 处理时间:处理时间指的是执行相应操作的机器的系统时间。

当一个流处理程序基于处理时间运行时,所有基于时间的操作(如时间窗口)将使用运行相应操作符的机器的系统时钟。一个按小时划分的处理时间窗口将包含在系统时钟显示整点之间到达特定操作符的所有记录。例如,如果一个应用程序在上午 9:15 开始运行,第一个按小时划分的处理时间窗口将包含上午 9:15 到 10:00 之间处理的事件,下一个窗口将包含上午 10:00 到 11:00 之间处理的事件,依此类推。
处理时间是最简单的时间概念,无需在流和机器之间进行协调。它能提供最佳性能和最低延迟。然而,在分布式和异步环境中,处理时间不具备确定性,因为它易受记录进入系统的速度(例如从消息队列进入)、记录在系统内操作符之间流动的速度以及中断(计划内或其他情况)的影响。

  • 事件时间:事件时间是每个事件在其产生设备上发生的时间。这个时间通常在记录进入 Flink 之前就嵌入其中,并且可以从每条记录中提取出事件时间戳。在事件时间中,时间的推进取决于数据,而非任何物理时钟。基于事件时间的程序必须指定如何生成事件时间水印,这是一种在事件时间中标记时间推进的机制。这种水印机制将在后续章节中介绍。

在理想情况下,无论事件何时到达或其顺序如何,基于事件时间的处理都能产生完全一致且确定的结果。然而,除非已知事件按时间戳顺序到达,否则事件时间处理在等待乱序事件时会产生一定延迟。由于只能等待有限的时间,这就限制了基于事件时间的应用程序的确定性程度。
假设所有数据都已到达,即使处理乱序或迟到的事件,或者重新处理历史数据,基于事件时间的操作也会按预期执行,并产生正确且一致的结果。例如,一个按小时划分的事件时间窗口将包含所有携带的事件时间戳属于该小时的记录,无论它们到达的顺序如何,也无论它们何时被处理。(有关更多信息,请参阅 “延迟” 部分。)
请注意,有时基于事件时间的程序在实时处理实时数据时,会使用一些基于处理时间的操作,以确保它们能够及时推进。
事件时间与处理时间

Event Time and Processing Time


事件时间与水印#

注意:Flink 实现了数据流模型中的许多技术。若要深入了解事件时间和水印,可查看以下文章。

  • Tyler Akidau 的《Streaming 101》
  • 《数据流模型》论文

一个支持事件时间的流处理器需要一种方式来衡量事件时间的推进。例如,一个构建按小时划分窗口的窗口操作符,需要在事件时间超过一小时结束时得到通知,以便该操作符能够关闭正在处理的窗口。
事件时间可以独立于处理时间(由物理时钟测量)推进。例如,在一个程序中,某个操作符的当前事件时间可能略落后于处理时间(考虑到接收事件的延迟),但两者以相同速度推进。另一方面,另一个流处理程序可能通过快速处理已经缓冲在 Kafka 主题(或其他消息队列)中的一些历史数据,在仅几秒钟的处理时间内推进数周的事件时间。
Flink 中衡量事件时间推进的机制是水印。水印作为数据流的一部分流动,并携带一个时间戳 t。Watermark (t) 声明在该流中事件时间已到达时间 t,这意味着该流中不应再有时间戳 t’ <= t 的元素(即时间戳早于或等于水印的事件)。
下图展示了带有(逻辑)时间戳的事件流以及同步流动的水印。在这个例子中,事件是按(时间戳)顺序排列的,这意味着水印只是流中的周期性标记。
有序事件流和水印
水印对于乱序流至关重要,如下图所示,其中事件并非按时间戳排序。一般来说,水印表明在流中的那个点,所有到某个时间戳的事件都应该已经到达。一旦水印到达一个操作符,该操作符可以将其内部事件时间时钟推进到水印的值。
无序事件流和水印
请注意,新创建的流元素的事件时间继承自产生它们的事件,或者触发这些元素创建的水印。

并行流中的水印#

水印在源函数处或紧随源函数之后生成。源函数的每个并行子任务通常独立生成其水印。这些水印定义了该特定并行源的事件时间。
随着水印在流处理程序中流动,它们会推进水印到达的操作符处的事件时间。每当一个操作符推进其事件时间时,它会为下游的后续操作符生成一个新的水印。
有些操作符会消费多个输入流,例如 union 操作符,或者在 keyBy (…) 或 partition (…) 函数之后的操作符。这样的操作符的当前事件时间是其输入流事件时间的最小值。随着其输入流更新它们的事件时间,该操作符的事件时间也会更新。
下图展示了事件和水印在并行流中流动,以及操作符跟踪事件时间的示例。
并行数据流、操作符与事件和水印

延迟#

有可能某些元素会违反水印条件,即即使在 Watermark (t) 出现之后,仍会出现更多时间戳 t’ <= t 的元素。实际上,在许多实际场景中,某些元素可能会被任意延迟,使得无法指定一个时间,保证在该时间之前具有特定事件时间戳的所有元素都已到达。此外,即使延迟可以界定,将水印延迟太多通常也不可取,因为这会导致事件时间窗口的评估出现过多延迟。
因此,流处理程序可能会明确预期一些延迟元素。延迟元素是指在系统的事件时间时钟(由水印指示)已经超过延迟元素的时间戳之后才到达的元素。有关如何在事件时间窗口中处理延迟元素的更多信息,请参阅 “允许的延迟”。

窗口化#

对流中的事件进行聚合(例如计数、求和)与批处理中的方式不同。例如,不可能对流中的所有元素进行计数,因为流通常是无限的(无界的)。相反,对流的聚合(计数、求和等)是通过窗口来界定范围的,例如 “过去 5 分钟的计数” 或 “过去 100 个元素的求和”。
窗口可以由时间驱动(例如:每 30 秒)或由数据驱动(例如:每 100 个元素)。通常可以区分不同类型的窗口,例如滚动窗口(无重叠)、滑动窗口(有重叠)和会话窗口(由不活动间隙分隔)。
时间窗口和计数窗口
有关窗口的更多示例,请查看此博客文章,或者查看 DataStream API 的窗口文档。介绍一下Flink的时间概念如何在Flink中使用事件时间?Flink的窗口操作符有哪些?

作者 east
Flink 1月 22,2025

Flink1.7官方文档中文翻译:有状态流处理

什么是状态?#
虽然数据流中的许多操作通常一次仅处理单个事件(例如事件解析器),但有些操作会在多个事件间记住相关信息(例如窗口操作符)。这些操作被称为有状态操作。
有状态操作的一些示例:

  • 当应用程序搜索特定的事件模式时,状态会存储到目前为止遇到的事件序列。
  • 按分钟 / 小时 / 天聚合事件时,状态保存待处理的聚合结果。
  • 在一系列数据点上训练机器学习模型时,状态保存模型参数的当前版本。
  • 当需要管理历史数据时,状态允许高效访问过去发生的事件。

Flink 需要了解状态,以便使用检查点和保存点实现容错。
了解状态还有助于对 Flink 应用程序进行重新缩放,这意味着 Flink 会负责在并行实例间重新分配状态。
可查询状态允许你在运行时从 Flink 外部访问状态。
在处理状态时,了解 Flink 的状态后端可能也会有所帮助。Flink 提供了不同的状态后端,用于指定状态的存储方式和存储位置。

键控状态 #
键控状态维护在一个可视为嵌入式键值存储的结构中。状态与有状态操作符读取的流严格分区并一起分布。因此,仅在键控流上才能访问键值状态,即在进行键控 / 分区数据交换之后,并且只能访问与当前事件的键相关联的值。将流的键与状态的键对齐,可确保所有状态更新都是本地操作,无需事务开销即可保证一致性。这种对齐还使 Flink 能够透明地重新分配状态并调整流分区。

状态与分区

键控状态进一步组织为所谓的键组。键组是 Flink 重新分配键控状态的基本单元;键组的数量与定义的最大并行度完全相同。在执行过程中,键控操作符的每个并行实例处理一个或多个键组的键。

State and Partitioning

状态持久性#

Flink 通过流重放和检查点相结合的方式实现容错。一个检查点标记每个输入流中的特定点,以及每个操作符的相应状态。通过恢复操作符的状态并从检查点处重新播放记录,流数据流可以从检查点恢复,同时保持一致性(精确一次处理语义)。
检查点间隔是在执行期间容错开销与恢复时间(需要重新播放的记录数)之间进行权衡的一种方式。
容错机制持续对分布式流数据流进行快照。对于状态较小的流应用程序,这些快照非常轻量级,可以频繁进行,而对性能影响不大。流应用程序的状态存储在可配置的位置,通常是分布式文件系统中。
如果程序发生故障(由于机器、网络或软件故障),Flink 会停止分布式流数据流。然后系统重新启动操作符,并将它们重置到最近一次成功的检查点。输入流被重置到状态快照的位置。作为重新启动的并行数据流一部分处理的任何记录,都保证不会影响先前检查点的状态。
默认情况下,检查点功能是禁用的。有关如何启用和配置检查点的详细信息,请参阅 “检查点”。
为使此机制充分发挥其保证作用,数据流源(如消息队列或代理)需要能够将流倒回到最近定义的点。Apache Kafka 具备此能力,Flink 与 Kafka 的连接器利用了这一点。有关 Flink 连接器提供的保证的更多信息,请参阅 “数据源和接收器的容错保证”。
由于 Flink 的检查点是通过分布式快照实现的,我们可互换使用 “快照” 和 “检查点” 这两个词。通常我们也用 “快照” 一词来指代检查点或保存点。

检查点#

Flink 容错机制的核心部分是对分布式数据流和操作符状态进行一致性快照。这些快照作为一致性检查点,系统在发生故障时可以回退到这些检查点。Flink 进行这些快照的机制在《分布式数据流的轻量级异步快照》中有描述。它受标准的 Chandy – Lamport 分布式快照算法启发,并专门针对 Flink 的执行模型进行了定制。
请记住,与检查点相关的所有操作都可以异步完成。检查点屏障不会同步移动,操作可以异步对其状态进行快照。
自 Flink 1.11 起,检查点可以在有对齐或无对齐的情况下进行。在本节中,我们先描述对齐检查点。

屏障#

Flink 分布式快照中的一个核心元素是流屏障。这些屏障被注入到数据流中,并作为数据流的一部分与记录一起流动。屏障永远不会超过记录,它们严格按顺序流动。一个屏障将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录集。每个屏障携带它前面推送的快照的 ID。屏障不会中断流的流动,因此非常轻量级。来自不同快照的多个屏障可以同时存在于流中,这意味着各种快照可以并发发生。
数据流中的检查点屏障:流屏障在流源处被注入到并行数据流中。注入快照 n 的屏障的点(我们称之为 Sn)是源流中快照覆盖数据的位置。例如,在 Apache Kafka 中,这个位置将是分区中最后一条记录的偏移量。这个位置 Sn 会报告给检查点协调器(Flink 的 JobManager)。

Checkpoint barriers in data streams


然后屏障向下游流动。当一个中间操作符从其所有输入流接收到快照 n 的屏障时,它会向其所有输出流发送一个快照 n 的屏障。一旦一个接收器操作符(流 DAG 的末端)从其所有输入流接收到屏障 n,它就会向检查点协调器确认快照 n。在所有接收器都确认一个快照后,该快照被视为完成。

Aligning data streams at operators with multiple inputs


一旦快照 n 完成,作业将不再要求源提供 Sn 之前的记录,因为此时这些记录(及其衍生记录)将已经通过整个数据流拓扑。
在具有多个输入的操作符处对齐数据流:接收多个输入流的操作符需要在快照屏障上对齐输入流。上图说明了这一点:

  • 一旦操作符从传入流接收到快照屏障 n,在它也从其他输入接收到屏障 n 之前,它不能处理来自该流的任何更多记录。否则,它会将属于快照 n 的记录与属于快照 n + 1 的记录混合。
  • 一旦最后一个流接收到屏障 n,操作符会发出所有挂起的输出记录,然后自己发出快照 n 屏障。
  • 它对状态进行快照,并从所有输入流恢复处理记录,在处理来自流的记录之前先处理输入缓冲区中的记录。
  • 最后,操作符将状态异步写入状态后端。

请注意,所有具有多个输入的操作符以及在洗牌后消耗多个上游子任务输出流的操作符都需要进行对齐。

操作符状态快照#

当操作符包含任何形式的状态时,此状态也必须是快照的一部分。
操作符在从其输入流接收到所有快照屏障的时间点,并且在向其输出流发送屏障之前,对其状态进行快照。在该时间点,已经对屏障之前的记录进行了所有状态更新,并且尚未应用依赖于屏障之后记录的更新。由于快照的状态可能很大,它存储在可配置的状态后端中。默认情况下,这是 JobManager 的内存,但对于生产使用,应配置分布式可靠存储(如 HDFS)。在状态存储之后,操作符确认检查点,向输出流发送快照屏障,然后继续执行。
生成的快照现在包含:

  • 对于每个并行流数据源,启动快照时流中的偏移量 / 位置。
  • 对于每个操作符,指向作为快照一部分存储的状态的指针。

检查点机制图示

Illustration of the Checkpointing Mechanism

恢复#

在此机制下的恢复很简单:发生故障时,Flink 选择最新完成的检查点 k。然后系统重新部署整个分布式数据流,并为每个操作符提供作为检查点 k 一部分进行快照的状态。源被设置为从位置 Sk 开始读取流。例如在 Apache Kafka 中,这意味着告诉消费者从偏移量 Sk 开始获取数据。
如果状态是增量快照的,操作符从最新的完整快照状态开始,然后对该状态应用一系列增量快照更新。
有关更多信息,请参阅 “重启策略”。

非对齐检查点#

检查点也可以以非对齐方式执行。基本思想是,只要飞行中的数据成为操作符状态的一部分,检查点就可以超过所有飞行中的数据。
请注意,这种方法实际上更接近 Chandy – Lamport 算法,但 Flink 仍然在源中插入屏障,以避免使检查点协调器过载。
非对齐检查点:该图展示了一个操作符如何处理非对齐检查点屏障:

  • 操作符对存储在其输入缓冲区中的第一个屏障做出反应。
  • 它立即通过将屏障添加到输出缓冲区的末尾,将其转发到下游操作符。
  • 操作符标记所有被超过的记录以异步存储,并创建自己状态的快照。
  • 因此,操作符仅短暂停止输入处理以标记缓冲区、转发屏障并创建其他状态的快照。

非对齐检查点确保屏障尽快到达接收器。它特别适用于至少有一个缓慢移动数据路径的应用程序,在这种情况下对齐时间可能长达数小时。但是,由于它会增加额外的 I/O 压力,当到状态后端的 I/O 成为瓶颈时,它并无帮助。有关其他限制,请参阅操作中的更深入讨论。
请注意,保存点始终是对齐的。

非对齐恢复#

在非对齐检查点中,操作符在开始处理来自上游操作符的任何数据之前,首先恢复飞行中的数据。除此之外,它执行与对齐检查点恢复期间相同的步骤。

状态后端#

键 / 值索引存储的确切数据结构取决于所选的状态后端。一种状态后端将数据存储在内存哈希表中,另一种状态后端使用 RocksDB 作为键值存储。除了定义保存状态的数据结构之外,状态后端还实现了对键值状态进行时间点快照并将该快照作为检查点一部分存储的逻辑。可以在不更改应用程序逻辑的情况下配置状态后端。
检查点和快照

checkpoints and snapshots

保存点#

所有使用检查点的程序都可以从保存点恢复执行。保存点允许在不丢失任何状态的情况下更新程序和 Flink 集群。
保存点是手动触发的检查点,它对程序进行快照并将其写入状态后端。它们依赖于常规的检查点机制来实现这一点。
保存点与检查点类似,不同之处在于它们由用户触发,并且在新的检查点完成时不会自动过期。为了正确使用保存点,了解检查点与保存点之间的区别非常重要,“检查点与保存点” 中对此进行了描述。

精确一次与至少一次#

对齐步骤可能会给流程序增加延迟。通常,这种额外延迟在几毫秒量级,但我们也看到过一些异常值的延迟明显增加的情况。对于要求所有记录始终具有超低延迟(几毫秒)的应用程序,Flink 提供了一个开关,可在检查点期间跳过流对齐。一旦操作符从每个输入看到检查点屏障,仍会立即进行检查点快照。
当跳过对齐时,即使在检查点 n 的一些检查点屏障到达后,操作符仍会继续处理所有输入。这样,在为检查点 n 拍摄状态快照之前,操作符也会处理属于检查点 n + 1 的元素。在恢复时,这些记录将作为重复项出现,因为它们既包含在检查点 n 的状态快照中,又将作为检查点 n 之后的数据的一部分被重新播放。
仅对于具有多个前驱(连接)的操作符以及具有多个发送者(在流重新分区 / 洗牌之后)的操作符才会发生对齐。因此,仅包含易于并行的流操作(map ()、flatMap ()、filter () 等)的数据流实际上即使在至少一次模式下也能提供精确一次的保证。

批处理程序中的状态与容错#

Flink 将批处理程序作为流程序的一种特殊情况执行,其中流是有界的(元素数量有限)。DataSet 在内部被视为数据流。因此,上述概念同样适用于批处理程序,与适用于流程序的方式相同,但有一些小的例外:

  • 批处理程序的容错不使用检查点。恢复通过完全重放流来实现。这是可行的,因为输入是有界的。这将成本更多地推向恢复阶段,但使常规处理更便宜,因为它避免了检查点。
  • DataSet API 中的有状态操作使用简化的内存 / 外存数据结构,而不是键 / 值索引。
  • DataSet API 引入了特殊的同步(基于超步)迭代,这仅在有界流上才可行。有关详细信息,请查看迭代文档。
作者 east

1 2 … 7 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.