gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
Flink 8月 14,2025

Flink Checkpoint时间设置太小有什么影响

在面试时被问到这个问题,当时有点蒙圈没答好,现在记录下来好以后复习:

在 Flink 里,如果 checkpoint 时间间隔(interval)设置得太小,会带来一系列性能和稳定性方面的影响,主要包括:


1. 计算资源被 Checkpoint 占用过多

  • 原理:每次 checkpoint 都会触发状态快照(state snapshot)写入持久化存储(如 HDFS、S3、RocksDB 本地+远程),这是一个 IO 密集操作,还会涉及数据序列化、网络传输。
  • 影响:
    • CPU 用于业务逻辑计算的时间减少
    • IO 带宽被 checkpoint 数据占用
    • 延迟增加,因为算子线程要等待 barrier 对齐(barrier alignment)

2. Barrier 对齐等待时间增加

  • 如果上游多个分区流速不同,Flink 在做 checkpoint 时需要等待所有输入流的 barrier 对齐。
  • 频繁 checkpoint 会导致算子更频繁进入 barrier 对齐状态,导致吞吐下降。

3. 状态后端压力过大

  • 以 RocksDB StateBackend 为例:
    • 每次 checkpoint 都会触发 RocksDB 的 SST 文件刷写(flush)和元数据更新
    • 频繁刷写会增加磁盘 IO 竞争和后台 compaction 压力
    • 如果状态数据较大,频繁 checkpoint 会让 RocksDB compaction 线程长期占用 CPU/IO

4. 反而可能导致 checkpoint 堆积 / 失败

  • 原因:
    • Checkpoint 需要在指定超时时间内完成,如果间隔太小,上一个 checkpoint 还没完成,下一个就开始了,可能导致多个 checkpoint 并发执行(如果 maxConcurrentCheckpoints > 1),进而造成存储、网络压力飙升。
    • 在极端情况下,会触发 checkpoint 超时、失败,甚至导致作业频繁重启。

5. 延迟敏感任务反而变慢

  • 在低延迟实时任务中,业务逻辑和 checkpoint 操作会竞争资源,如果 checkpoint 太频繁,反而可能 吞吐下降、延迟上升。

6. 容错收益递减

  • 理论上,checkpoint 间隔越短,失败恢复时丢失的数据就越少。
  • 但在实际生产中,checkpoint 间隔小于 5 秒往往收益很低,而性能损耗却非常明显(尤其是状态数据大的任务)。
  • 如果作业不是超高可用的金融级场景,过于频繁 checkpoint 意义不大。
作者 east
Flink 4月 19,2025

两种Flink 生成水印的方案的对比

在开发中用到flink下面2种生成水印方式,它们优劣和适用场景如何。

方案1:
.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[RunData](Duration.ofSeconds(5))
          .withIdleness(Duration.ofSeconds(5))
          .withTimestampAssigner(new SerializableTimestampAssigner[RunData] {
            override def extractTimestamp(element: RunData, recordTimestamp: Long): Long = {
              Math.max(element.getTime.longValue(), recordTimestamp)            
            }
          })
      ),

方案2:
.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[RunData](Duration.ofSeconds(5))
          .withIdleness(Duration.ofSeconds(5))
          .withTimestampAssigner(new SerializableTimestampAssigner[RunData] {
            override def extractTimestamp(element: RunData, recordTimestamp: Long): Long = {
              element.getTime.longValue()      
            }
          })
      )


✅ 方案区别概述:

方案时间戳提取逻辑
方案1使用 Math.max(element.getTime, recordTimestamp) 取当前数据时间戳与 recordTimestamp 的较大值
方案2直接返回 element.getTime,不考虑 recordTimestamp

🧠 方案分析


🔹方案1:使用 Math.max(element.getTime, recordTimestamp)

✅ 优点:

  1. 更稳健处理乱序数据:
    • 如果 recordTimestamp(通常为前一条数据的时间戳)大于当前数据的时间戳,那么直接使用 element.getTime 会导致时间回退,从而影响 Watermark 计算,进而导致窗口触发不正确。
    • 该逻辑防止了 “时间倒退” 的现象。
  2. 适合数据源存在时钟漂移或乱序程度较高的情况:
    • 尤其是传感器数据、日志数据等不按顺序到达场景。
    • 若数据有一定延迟,选最大值可防止系统过早推进 Watermark。

❌ 缺点:

  1. 可能高估时间戳,导致 Watermark 滞后:
    • 如果 recordTimestamp 一直偏大,会导致 Watermark = 当前最大时间戳 - 允许延迟 总是处于偏后的状态,进而 延迟窗口触发时间,影响下游计算的实时性。
  2. 不太适用于严格顺序数据或延迟容忍低的场景:
    • 因为该策略会让窗口的计算始终比实际事件时间慢。

✅ 适用场景:

  • 数据存在明显乱序或可能存在时间回退的场景。
  • IoT 场景、传感器采集、日志采集系统。
  • 对准确性高于实时性的业务逻辑(如:统计分析类任务)。

🔹方案2:使用 element.getTime 直接提取时间戳

✅ 优点:

  1. 时间戳准确,还原数据本身的时间:
    • 直接以 数据本身时间 为准,更加符合事件时间语义。
    • 如果数据是有序或近乎有序的,它能够 精准推进 Watermark,使窗口 尽早触发。
  2. 低延迟,适合对实时性要求高的场景:
    • 例如实时告警系统、实时监控、业务中台。

❌ 缺点:

  1. 存在时间回退风险:
    • 如果某条数据时间比前面数据的还小,可能导致 Watermark 被“拉回”,系统可能拒绝处理这条数据或错误触发窗口。
  2. 依赖数据源严格有序性或 Watermark 容错时间足够长:
    • 对乱序不友好,需要配合合理的 forBoundedOutOfOrderness 延迟容忍时间。

✅ 适用场景:

  • 数据基本有序或有轻微乱序(容忍在 Duration.ofSeconds(5) 内)。
  • 实时性要求高的业务场景(如金融交易监控、实时用户行为处理)。
  • 数据源可靠性高(Kafka 的 event-time 顺序性高)。

✅ 总结对比表

项目方案1(Math.max)方案2(直接提取)
容错能力⭐⭐⭐⭐(强)⭐⭐(弱)
实时性⭐⭐(延迟高)⭐⭐⭐⭐(延迟低)
复杂数据处理⭐⭐⭐⭐⭐⭐
精确性(恢复事件时间)⭐⭐⭐⭐⭐⭐⭐
是否适合乱序数据是否(需轻度乱序)
场景推荐传感器、日志、数据分析实时告警、交易监控、行为分析

💡 建议:

  • 如果你对 数据顺序不确定、业务允许延迟:选 方案1。
  • 如果你 清楚数据较有序、实时性优先:选 方案2。
  • 两者都设置了 .withIdleness(Duration.ofSeconds(5)),意味着在数据空闲 5 秒内,Flink 会认为该 source 闲置,避免整个 Watermark 卡住 —— 这是对空数据源的一种保护机制,适合多 source 场景。

如果 用 Math.min(eventTime, recordTimestamp) 的效果是啥?

理论上的优势:

优势说明
✅ 防止因“异常大时间戳”推高 Watermark有些数据可能时间戳未来太远(如某个设备出错时间为2099年),使用 min 可以防止 Watermark 被拉得太靠前,导致后续正常数据全是迟到数据
✅ 更严谨处理数据乱跳若数据到达时间波动大,但我们只取历史最小时间戳推进,可以更“保守”处理

❌ 实际上的问题和风险:

问题说明
❌ 时间倒退严重如果 recordTimestamp 比当前数据早,就永远使用旧的时间,窗口不会推进,Watermark卡死
❌ Watermark 不会正常前进因为时间戳总是被压制成“更早的”,所以 Watermark 永远低于真实事件时间
❌ 数据无法被触发处理Flink 的窗口系统等 Watermark 过去“窗口边界”才会触发计算,这种写法可能导致窗口永远不触发,任务“看起来没问题但没产出”!
作者 east
bug清单, Flink 3月 5,2025

解决Flink SQL:Exception in thread “main” org.apache.flink.table.api.ValidationException: Rowtime attribute ‘ptime’ must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type ‘BIGINT’.

在开发Flink SQL时报错:

在flink 1.16版本中执行报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: Rowtime attribute 'ptime' must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type 'BIGINT'.

	at org.apache.flink.table.api.TableSchema.validateColumnsAndWatermarkSpecs(TableSchema.java:535)

	at org.apache.flink.table.api.TableSchema.access$100(TableSchema.java:73)

	at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:802)

	at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.build(MergeTableLikeUtil.java:534)

	at org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:154)

	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:171)

	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:74)

	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:330)

	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)

	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)

	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:758)

	at com.chuneng.saas.doris.FlinkDorisExtremeValueCalculation.main(FlinkDorisExtremeValueCalculation.java:44)

原因分析

错误的核心在于:Flink 要求用于定义 WATERMARK 的字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型,但你的 ptime 字段被定义为 BIGINT 类型。尽管你在 WATERMARK 中尝试将 ptime 转换为 TIMESTAMP,但 Flink 的 WATERMARK 语法要求直接引用一个已经存在的 TIMESTAMP 类型字段,而不是在 WATERMARK 定义中动态转换类型。

解决方案

你需要通过 计算列(Computed Column) 将 ptime 的 BIGINT 类型转换为 TIMESTAMP 类型,然后将 WATERMARK 绑定到计算列上。

例如像下面这样:

String sourceDDL = "CREATE TABLE ods_t_iot_data (" +
        "pid STRING," +
        "pvalue DECIMAL(13,3)," +
        "ptime BIGINT," +
        "ds DATE," +
        // 定义计算列,将 BIGINT 类型的 ptime 转换为 TIMESTAMP
        "event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ptime / 1000))," + 
        // 绑定 WATERMARK 到计算列 event_time
        "WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '10.0.0.1:8030'," +
        "'table.identifier' = 'cnsaas.ods_t_iot'," +
        "'username' = 'root'," +
        "'password' = ''" +
        ")";
作者 east
bug清单, Flink 3月 5,2025

解决Flink SQL报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”

在开发Flink SQL时报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”

这个错误表明您的Flink作业涉及更新或删除操作(如CDC场景),而JDBC Sink表没有定义主键。以下是详细分析和解决方案:

错误原因分析

当Flink作业涉及UPDATE/DELETE操作时(常见于CDC场景或使用Retract流),JDBC Sink表必须显式声明主键,因为:

  1. 主键用于定位目标数据库中需要更新的记录
  2. 没有主键时,Flink无法确定如何执行更新/删除操作

解决方案

在表定义中添加 PRIMARY KEY 声明(需与目标数据库表主键一致),例如:

val sinkDDL =
  """
    |CREATE TABLE dws_iot_extreme_info_min (
    |  device_code STRING,
    |  `day` DATE,
    |  ptime BIGINT,
    |  station_sn STRING,
    |  unit STRING,
    |  cu_device_code STRING,
    |  max_u_value DECIMAL(5,3),
    |  min_u_value DECIMAL(5,3),
    |  max_t_value DECIMAL(5,2),
    |  min_t_value DECIMAL(5,2),
    |  max_soc_value DECIMAL(5,2),
    |  min_soc_value DECIMAL(5,2),
    |  max_u_time BIGINT,
    |  min_u_time BIGINT,
    |  max_t_time BIGINT,
    |  min_t_time BIGINT,
    |  max_soc_time BIGINT,
    |  min_soc_time BIGINT,
    |  dt DATE,
    |  PRIMARY KEY (device_code, `day`) NOT ENFORCED  -- 添加主键声明
    |) WITH (
    |  'connector' = 'jdbc',
    |  'url' = 'jdbc:mysql://10.0.2.2:3306/cnsaas',
    |  'table-name' = 'dws_bigdata_device_extreme_info_min',
    |  'driver' = 'com.mysql.cj.jdbc.Driver',
    |  'username' = 'root',
    |  'password' = '',
    |  'sink.buffer-flush.max-rows' = '1000',    
    |  'sink.buffer-flush.interval' = '1s',      
    |  'sink.max-retries' = '3'                 
    |)
    |""".stripMargin

关键修改点说明

  1. 主键声明:PRIMARY KEY (device_code, `day`) NOT ENFORCED
    • 主键字段需与目标数据库表的主键一致
    • NOT ENFORCED 表示Flink不会校验数据主键约束,由数据库负责
  2. 目标表要求:
    • MySQL数据库中 dws_iot_extreme_info_min 表必须有相同的主键定义
    • 可通过以下SQL确保主键存在:
    • ALTER TABLE dws_iot_extreme_info_min ADD PRIMARY KEY (device_code, day);
作者 east
Flink 3月 5,2025

Flink Checkpoint 详解

一、Checkpoint 的原理

  1. 核心概念
    Checkpoint 是 Flink 的容错机制,通过定期生成分布式快照,记录流处理应用的全局状态(算子状态、键控状态等)。当发生故障时,Flink 可从最近的 Checkpoint 恢复,保证 Exactly-Once 语义。
  2. 实现机制
    • Chandy-Lamport 算法:基于 Barrier 的分布式快照。
    • 流程:
      1. 触发:JobManager 定期触发 Checkpoint,向所有 Source 发送 Barrier。
      2. Barrier 传播:Source 插入 Barrier 到数据流,算子接收到 Barrier 后暂停处理新数据,将当前状态异步持久化。
      3. 状态存储:状态写入外部存储(如 HDFS、S3)。
      4. 确认机制:所有算子确认状态保存后,Checkpoint 完成。
  3. 一致性语义
    • EXACTLY_ONCE:精确一次,通过对齐 Barrier 确保状态与数据流严格一致。
    • AT_LEAST_ONCE:至少一次,可能重复处理数据。

二、Checkpoint 的使用方法

  1. 基础配置
    在 Flink 作业中启用 Checkpoint,设置间隔、存储路径和模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint,间隔 60 秒
env.enableCheckpointing(60000);
// 配置 Exactly-Once 语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置 Checkpoint 存储路径
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints/");
// 其他配置(超时时间、最小间隔等)
env.getCheckpointConfig().setCheckpointTimeout(30000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  1. 状态后端选择
    • FsStateBackend:状态存储在内存,快照存于文件系统(适合状态较小场景)。
    • RocksDBStateBackend:状态存储在 RocksDB,支持超大状态(需权衡性能)。
  2. 恢复作业
    通过命令行或 API 从指定 Checkpoint 恢复作业:bash复制./bin/flink run -s hdfs:///checkpoints/1234 …

三、Checkpoint 的应用场景

  1. 容错恢复
    节点故障、网络中断时,从 Checkpoint 恢复状态,避免数据丢失。
  2. 有状态计算
    • 窗口聚合(如每小时销售额统计)。
    • 复杂事件处理(CEP)中的模式状态。
    • 连接操作(如流-流 Join 的中间状态)。
  3. 作业升级与扩缩容
    通过 Savepoint(手动触发的 Checkpoint)暂停作业,修改并行度或代码后恢复。

四、Flink SQL 流式写入数据到表时为何需要 Checkpoint

  1. 保障 Exactly-Once 语义
    • 当使用 Flink SQL 写入 Kafka、JDBC 表等外部系统时,Checkpoint 通过**两阶段提交协议(2PC)**协调事务:
      1. 预提交阶段:数据写入外部系统,但未提交。
      2. 提交阶段:Checkpoint 完成后,所有事务统一提交。
    • 若故障发生,Flink 回滚到上一个 Checkpoint,确保数据不重复、不丢失。
  2. 维护内部状态一致性
    • 即使目标表不支持事务(如 HBase),Checkpoint 仍保障 Flink 内部状态(如去重状态、窗口状态)的正确恢复。
  3. 避免数据丢失
    • 未启用 Checkpoint 时,若作业崩溃,可能丢失未持久化的状态和数据,导致写入结果不完整。

五、示例:Flink SQL 写入 Kafka

-- 启用 Checkpoint(隐式通过 ExecutionConfig)
SET 'execution.checkpointing.interval' = '60s';

-- 定义 Kafka Sink 表
CREATE TABLE kafka_sink (
    user_id STRING,
    count BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'output_topic',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'sink.transactional-id-prefix' = 'tx-' -- 启用 Kafka 事务
);

-- 流式写入
INSERT INTO kafka_sink 
SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id;
  • 依赖 Checkpoint:Kafka Sink 通过事务提交机制与 Checkpoint 绑定,确保每条数据仅写入一次。

总结

  • Checkpoint 原理:基于 Barrier 的分布式快照,保障状态一致性。
  • 使用场景:容错、有状态计算、作业维护。
  • Flink SQL 写入表:Checkpoint 是保证端到端 Exactly-Once 的核心机制,协调外部系统事务与内部状态恢复。
作者 east
Flink 1月 24,2025

解决flink报错:org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] cannot be applied to (com.chuneng.saas.dao.SinkToTDengine) iotStream.sinkTo(new SinkToTDengine)

在scala用下面的代码,

iotStream.sinkTo(new SinkToTDengine)

出现报错:

overloaded method value sinkTo with alternatives:
  (sink: org.apache.flink.api.connector.sink2.Sink[com.chuneng.saas.vo.IotData])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] <and>
  (sink: org.apache.flink.api.connector.sink.Sink[com.chuneng.saas.vo.IotData, _, _, _])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData]
 cannot be applied to (com.chuneng.saas.dao.SinkToTDengine)
    iotStream.sinkTo(new SinkToTDengine)

遇到的错误是由于 sinkTo 方法期望的参数类型与提供的 SinkToTDengine 类型不匹配。具体来说,sinkTo 方法期望的是一个实现了 Sink 接口的类型,而您的 SinkToTDengine 类继承自 RichSinkFunction,这导致类型不兼容。

错误原因分析

这表明 sinkTo 方法期望的是 Sink[IotData] 或 Sink[IotData, _, _, _] 类型,而您传递的是 SinkToTDengine,它继承自 RichSinkFunction<IotData>,因此类型不匹配。

解决方案

要解决这个问题,您需要将 SinkToTDengine 转换为 Flink 支持的 Sink 类型。

使用 addSink 方法

Flink 提供了 addSink 方法,可以直接接受实现了 SinkFunction 的自定义 Sink。这是最直接和常用的方式。

修改后的代码示例:

iotStream
  .map(rd => {
    val iotData = new IotData()
    iotData.setPid(rd.getDeviceCode.replaceAll(".", "-"))
    iotData.setVal(rd.getCompensationMW.toString)

    // 将13位时间戳转换为%Y-%m-%d %H:%M:%S.%f格式,毫秒保留3位小数
    val timestamp = rd.getTime // 假设rd.getTime()返回的是13位时间戳(毫秒级)
    val instant = Instant.ofEpochMilli(timestamp)
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
      .withLocale(Locale.CHINA)
      .withZone(ZoneId.systemDefault())
    val formattedTime = formatter.format(instant)

    iotData.setTs(formattedTime)
    iotData
  })
  .print()
  .addSink(new SinkToTDengine()) // 使用 addSink 方法
作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7通过Flink Doris Connector写入Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage

在flink 1.7项目,通过Flink Doris Connector,采用批处理读取Doris数据进行计算然后写入到doris的另外一个表。采用flink sql方式。

原来的代码进行脱敏后的代码如下:

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);

// 数据源配置
String sourceDDL = "CREATE TABLE <SOURCE_TABLE_NAME>(" +
        "pid STRING," +
        "pvalue decimal(39,3)," +
        "ptime TIMESTAMP(3)," +
        "ds DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'" +
        ")";

tableEnvironment.executeSql(sourceDDL);
// 获取当前时间戳
String timestamp = String.valueOf(System.currentTimeMillis());

// 目标 Doris 表 DDL
String sinkDDL = "CREATE TABLE <TARGET_TABLE_NAME> (" +
        "station_sn STRING," +
        "pid_system_code STRING," +
        "`day` STRING," +
        "`value` STRING," +
        "created_at TIMESTAMP(3)," +
        "dt DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'," +
        "'sink.label-prefix' = '<LABEL_PREFIX>_" + timestamp + "'" +
        ")";

执行报错如下:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
	at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
	at com.chuneng.saas.doris.FlinkBatchSql.main(FlinkBatchSql.java:68)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
	at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
	at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
	... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage@265569e2
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.getCheckpointCommittables(CommittableCollector.java:241)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:748)

这可能是 Flink 批处理模式下通常不需要 checkpoint,但 Doris Sink Connector 默认可能依赖 checkpoint 相关逻辑,从而导致 NullPointerException。

添加与 sink 行为相关的参数, 设置不用 checkpoint 。

修改后的sink如下:

String sinkDDL = “CREATE TABLE (” +
“station_sn STRING,” +
“pid_system_code STRING,” +
“day STRING,” +
“value STRING,” +
“created_at TIMESTAMP(3),” +
“dt DATE” +
“) WITH (” +
“‘connector’ = ‘doris’,” +
“‘fenodes’ = ‘:’,” +
“‘table.identifier’ = ‘.’,” +
“‘username’ = ”,” +
“‘password’ = ”,” +
“‘sink.label-prefix’ = ‘_” + timestamp + “‘,” +

"'doris.batch.size' = '1000'," +  // 批量写入大小
"'sink.enable-2pc' = 'false'" + // 禁用两阶段提交
")";
作者 east
Flink 1月 23,2025

flink 1.12用Flink SQL写入Doris的坑

在flink 1.12,用flink sql写入doris,相关pom配置如下:

   <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.12_2.11</artifactId>
      <version>1.0.3</version>
    </dependency>

to_date('2025-01-14')实际写入到doris变成了另外一个日期,非常坑。而且 
Flink Connector 24.0.0 版本之后支持使用Arrow Flight SQL 读取数据 ,速度提高非常快。

flink-doris-connector各版本兼容如下。

版本兼容​

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11,1.12,1.13,1.140.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8–
1.3.01.161.0+8–
1.4.01.15,1.16,1.171.0+8–
1.5.21.15,1.16,1.17,1.181.0+8–
1.6.21.15,1.16,1.17,1.18,1.191.0+8–
24.0.11.15,1.16,1.17,1.18,1.19,1.201.0+8–

比较好选择是选择Flink 1.16以上(可以兼容hive语法90
%以上)。升级到flink 1.17后,to_date(‘2025-01-14’)返回结果果然正常了。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7 Exception in thread “main” org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table

问题分析

  1. 报错提示的主要内容
    • ValidationException: Unable to create a sink for writing table ...
    • Could not load service provider for factories 和 org.apache.flink.table.planner.delegation.DefaultExecutorFactory not a subtype。
    • 这些问题通常是因为 Flink 运行环境或依赖配置不正确。
  2. 可能原因
    • Flink 和 Doris 依赖版本不匹配:
      • 使用的 Flink Doris Connector 是 flink-doris-connector-1.17,其版本号为 24.0.1,需要确保它与当前 Flink 的版本(1.17.x)兼容。或者flink的jar包有的不是1.17.x版本,和上面的 link-doris-connector-1.17 不兼容。

解决方案

1. 检查 Flink 和 Doris Connector 的兼容性

  • 确认 Flink 和 Doris Connector 的版本兼容。
  • 当前使用的是 flink-doris-connector-1.17,对应 Flink 1.17.x。如果使用的是其他版本的 Flink(如 1.16 或 1.18),需要更换依赖:
<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>

2. 添加 Flink Doris Connector 所需的运行时依赖

确保项目中包含以下依赖(建议手动检查 pom.xml 是否缺失),检查flink的jar是否都是1.17.x版本:

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.0</version>
    <scope>provided</scope>
</dependency>

确保 flink-table-planner_2.12 版本与 Flink 版本匹配。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 找不到org.apache.flink.table.descriptors.TableDescriptor

flink版本1.7的项目代码如下:

   StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        // 设置 Flink SQL 环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);

运行报错找不到org.apache.flink.table.descriptors.TableDescriptor。

问题分析

  1. 依赖冲突或缺失:
    • Flink 1.17.2 中 TableDescriptor 类已被废弃。Flink 1.13 开始引入了 TableDescriptor 的新概念,用于定义表源和表目标,而旧版依赖中的 org.apache.flink.table.descriptors 相关类在后续版本中被逐步移除。
    • 如果代码中还有引用 org.apache.flink.table.descriptors 包下的类(如连接器或格式描述符),可能导致运行时报错。
  2. API 版本不匹配:
    • 在 Flink 1.17.2 中,推荐使用 Table API 的新方式(TableDescriptor 不再使用)。这可能意味着您正在使用旧版本的 API,或者您的代码依赖了不兼容的旧包。

解决方案

1. 检查代码中是否仍在使用旧版 API

移除任何对 org.apache.flink.table.descriptors 的直接依赖。使用以下代码替换旧方法:

// 创建 Flink 流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 创建 Flink SQL 表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

// 定义表源或目标时,使用 Table API 的新方式
TableDescriptor descriptor = TableDescriptor.forConnector("kafka") // 替换为实际使用的连接器
        .schema(Schema.newBuilder()
                .column("field1", DataTypes.STRING())
                .column("field2", DataTypes.INT())
                .build())
        .format("json") // 替换为实际使用的格式
        .option("property.key", "value") // 替换为实际连接器选项
        .build();

tableEnv.createTemporaryTable("my_table", descriptor);

2. 更新依赖

确保项目使用的依赖与 Flink 1.17.2 版本兼容。在 pom.xml 或 build.gradle 文件中明确声明以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.2</version>
    <scope>provided</scope>
</dependency>

3. 清理旧依赖

如果仍需要使用 TableDescriptor 类,请确认没有混用老旧版本的连接器或额外库,例如 flink-connector-kafka 等。检查项目中是否存在以下老依赖,并替换为新版依赖:

旧版依赖示例:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.12.x</version>
</dependency>

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList

运行flink 1.7的项目,报错如下:

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList at org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase.<init>(FlinkPreparingTableBase.java:92) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.<init>(ExpandingPreparingTable.java:42) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.<init>(QueryOperationCatalogViewTable.java:49) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.create(QueryOperationCatalogViewTable.java:58) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.convertQueryOperationView(FlinkCalciteCatalogReader.java:146) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.toPreparingTable(FlinkCalciteCatalogReader.java:110) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:91) at org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:229) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:144) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:110) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2490) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at com.chuneng.saas.doris.FlinkCuSohJdbcSqlAnalyze.main(FlinkCuSohJdbcSqlAnalyze.java:98) Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

  • 从报错信息 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList 和 Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 可以看出,程序在运行时无法找到 org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 这个类。
  • 这通常是因为相应的依赖库没有被正确地添加到项目的类路径中,导致 JVM 在运行时无法加载所需的类。

修改方案:

  1. 确认你是否在项目的构建文件(如 Maven 的 pom.xml 或 Gradle 的 build.gradle)中添加了 Apache Flink 相关的依赖。
  2. 确保使用的 Flink 版本是兼容的,并且其依赖的 Guava 版本是 flink-shaded-guava 的 18 版本。
  3. 对于 Maven 项目,检查 pom.xml 中是否有类似如下的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-guava</artifactId>
    <version>18.0</version>
</dependency>
  1. 对于 Gradle 项目,检查 build.gradle 中是否有类似如下的依赖:

implementation 'org.apache.flink:flink-shaded-guava:18.0'
  1. 如果已经添加了依赖,可能是因为依赖冲突导致无法找到正确的类。可以使用 mvn dependency:tree(对于 Maven)或 gradle dependencies(对于 Gradle)命令查看依赖树,找出是否有多个版本的 Guava 被引入,然后通过排除冲突的依赖来解决。
作者 east
Flink 1月 22,2025

Flink1.7官方文档中文翻译:及时流处理

简介#

及时流处理是有状态流处理的一种扩展,其中时间在计算中发挥一定作用。例如,在进行时间序列分析、基于特定时间段(通常称为窗口)进行聚合,或者在处理事件时事件发生的时间很关键等情况时,都会涉及到及时流处理。
在接下来的章节中,我们将重点介绍在使用 Flink 进行及时流处理应用开发时,需要考虑的一些主题。
返回顶部

时间概念:事件时间与处理时间#

在流处理程序中提及时间(例如定义窗口时),可以涉及不同的时间概念:

  • 处理时间:处理时间指的是执行相应操作的机器的系统时间。

当一个流处理程序基于处理时间运行时,所有基于时间的操作(如时间窗口)将使用运行相应操作符的机器的系统时钟。一个按小时划分的处理时间窗口将包含在系统时钟显示整点之间到达特定操作符的所有记录。例如,如果一个应用程序在上午 9:15 开始运行,第一个按小时划分的处理时间窗口将包含上午 9:15 到 10:00 之间处理的事件,下一个窗口将包含上午 10:00 到 11:00 之间处理的事件,依此类推。
处理时间是最简单的时间概念,无需在流和机器之间进行协调。它能提供最佳性能和最低延迟。然而,在分布式和异步环境中,处理时间不具备确定性,因为它易受记录进入系统的速度(例如从消息队列进入)、记录在系统内操作符之间流动的速度以及中断(计划内或其他情况)的影响。

  • 事件时间:事件时间是每个事件在其产生设备上发生的时间。这个时间通常在记录进入 Flink 之前就嵌入其中,并且可以从每条记录中提取出事件时间戳。在事件时间中,时间的推进取决于数据,而非任何物理时钟。基于事件时间的程序必须指定如何生成事件时间水印,这是一种在事件时间中标记时间推进的机制。这种水印机制将在后续章节中介绍。

在理想情况下,无论事件何时到达或其顺序如何,基于事件时间的处理都能产生完全一致且确定的结果。然而,除非已知事件按时间戳顺序到达,否则事件时间处理在等待乱序事件时会产生一定延迟。由于只能等待有限的时间,这就限制了基于事件时间的应用程序的确定性程度。
假设所有数据都已到达,即使处理乱序或迟到的事件,或者重新处理历史数据,基于事件时间的操作也会按预期执行,并产生正确且一致的结果。例如,一个按小时划分的事件时间窗口将包含所有携带的事件时间戳属于该小时的记录,无论它们到达的顺序如何,也无论它们何时被处理。(有关更多信息,请参阅 “延迟” 部分。)
请注意,有时基于事件时间的程序在实时处理实时数据时,会使用一些基于处理时间的操作,以确保它们能够及时推进。
事件时间与处理时间

Event Time and Processing Time


事件时间与水印#

注意:Flink 实现了数据流模型中的许多技术。若要深入了解事件时间和水印,可查看以下文章。

  • Tyler Akidau 的《Streaming 101》
  • 《数据流模型》论文

一个支持事件时间的流处理器需要一种方式来衡量事件时间的推进。例如,一个构建按小时划分窗口的窗口操作符,需要在事件时间超过一小时结束时得到通知,以便该操作符能够关闭正在处理的窗口。
事件时间可以独立于处理时间(由物理时钟测量)推进。例如,在一个程序中,某个操作符的当前事件时间可能略落后于处理时间(考虑到接收事件的延迟),但两者以相同速度推进。另一方面,另一个流处理程序可能通过快速处理已经缓冲在 Kafka 主题(或其他消息队列)中的一些历史数据,在仅几秒钟的处理时间内推进数周的事件时间。
Flink 中衡量事件时间推进的机制是水印。水印作为数据流的一部分流动,并携带一个时间戳 t。Watermark (t) 声明在该流中事件时间已到达时间 t,这意味着该流中不应再有时间戳 t’ <= t 的元素(即时间戳早于或等于水印的事件)。
下图展示了带有(逻辑)时间戳的事件流以及同步流动的水印。在这个例子中,事件是按(时间戳)顺序排列的,这意味着水印只是流中的周期性标记。
有序事件流和水印
水印对于乱序流至关重要,如下图所示,其中事件并非按时间戳排序。一般来说,水印表明在流中的那个点,所有到某个时间戳的事件都应该已经到达。一旦水印到达一个操作符,该操作符可以将其内部事件时间时钟推进到水印的值。
无序事件流和水印
请注意,新创建的流元素的事件时间继承自产生它们的事件,或者触发这些元素创建的水印。

并行流中的水印#

水印在源函数处或紧随源函数之后生成。源函数的每个并行子任务通常独立生成其水印。这些水印定义了该特定并行源的事件时间。
随着水印在流处理程序中流动,它们会推进水印到达的操作符处的事件时间。每当一个操作符推进其事件时间时,它会为下游的后续操作符生成一个新的水印。
有些操作符会消费多个输入流,例如 union 操作符,或者在 keyBy (…) 或 partition (…) 函数之后的操作符。这样的操作符的当前事件时间是其输入流事件时间的最小值。随着其输入流更新它们的事件时间,该操作符的事件时间也会更新。
下图展示了事件和水印在并行流中流动,以及操作符跟踪事件时间的示例。
并行数据流、操作符与事件和水印

延迟#

有可能某些元素会违反水印条件,即即使在 Watermark (t) 出现之后,仍会出现更多时间戳 t’ <= t 的元素。实际上,在许多实际场景中,某些元素可能会被任意延迟,使得无法指定一个时间,保证在该时间之前具有特定事件时间戳的所有元素都已到达。此外,即使延迟可以界定,将水印延迟太多通常也不可取,因为这会导致事件时间窗口的评估出现过多延迟。
因此,流处理程序可能会明确预期一些延迟元素。延迟元素是指在系统的事件时间时钟(由水印指示)已经超过延迟元素的时间戳之后才到达的元素。有关如何在事件时间窗口中处理延迟元素的更多信息,请参阅 “允许的延迟”。

窗口化#

对流中的事件进行聚合(例如计数、求和)与批处理中的方式不同。例如,不可能对流中的所有元素进行计数,因为流通常是无限的(无界的)。相反,对流的聚合(计数、求和等)是通过窗口来界定范围的,例如 “过去 5 分钟的计数” 或 “过去 100 个元素的求和”。
窗口可以由时间驱动(例如:每 30 秒)或由数据驱动(例如:每 100 个元素)。通常可以区分不同类型的窗口,例如滚动窗口(无重叠)、滑动窗口(有重叠)和会话窗口(由不活动间隙分隔)。
时间窗口和计数窗口
有关窗口的更多示例,请查看此博客文章,或者查看 DataStream API 的窗口文档。介绍一下Flink的时间概念如何在Flink中使用事件时间?Flink的窗口操作符有哪些?

作者 east

1 2 … 7 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (493)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (35)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.