gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面2 )
doris 3月 6,2025

Apache Doris内存与超时参数配置详解

以下是根据你的需求整理的Apache Doris内存与超时参数配置详解,结合了相关技术文档和实践经验:

一、查询任务内存限制调整

1. ​默认内存限制与问题定位

Apache Doris默认限制单个BE节点上的查询任务内存使用不超过2GB(即exec_mem_limit=2147483648字节)。当复杂查询或大规模数据操作超过此限制时,会触发Memory limit exceeded错误。通过以下命令可查看当前配置:

SHOW VARIABLES LIKE "%mem_limit%";

输出示例:

+----------------+------------+
| Variable_name  | Value      |
+----------------+------------+
| exec_mem_limit | 2147483648 |  -- 默认2GB
| load_mem_limit | 0          |  -- 导入任务内存限制(0表示无限制)
+----------------+------------+

2. ​调整内存限制的方法

  • ​临时调整(当前会话生效)​​
    通过SET命令修改会话级参数,适用于单次查询优化:sqlSET exec_mem_limit = 8589934592; -- 设置为8GB
  • ​永久生效(全局配置)​​
    添加GLOBAL参数以全局生效,需在FE节点配置中持久化:sqlSET GLOBAL exec_mem_limit = 8589934592;​
  • 注意事项:
    • 若集群资源有限,需结合BE节点的总内存(通过mem_limit参数控制)合理分配,避免单任务占用过高导致节点OOM6。
    • 高并发场景建议通过资源标签(Resource Label)隔离关键任务,防止资源争用6。

二、查询超时时间优化

1. ​默认超时机制与配置查询

Doris默认查询超时时间为300秒(5分钟),可通过以下命令查看:

sqlSHOW VARIABLES LIKE "%query_timeout%";

输出示例:

+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| query_timeout | 300   |
+---------------+-------+

2. ​延长超时时间的操作

  • ​临时调整(当前会话生效)​​sqlSET query_timeout = 600; -- 设置为600秒(10分钟)
  • ​永久生效(全局配置)​​sqlSET GLOBAL query_timeout = 600;​动态生效特性:与内存参数不同,超时参数修改后通常无需重启集群即可生效7。

三、关联参数与调优实践

1. ​内存管理联动配置

  • ​BE节点总内存限制(mem_limit)​​
    控制单个BE进程的最大内存使用,默认根据物理内存自动计算(如物理内存的90%或物理内存-6.4GB)。建议在be.conf中显式设置,避免资源竞争4。bashmem_limit = 80% # 或具体值如32G
  • ​Compaction内存限制(compaction_memory_bytes_limit)​​
    控制数据合并任务的内存使用上限,默认值根据系统配置动态调整。若频繁因Compaction导致查询内存不足,可适当调低此参数5。

2. ​高并发场景优化策略

  • ​并行度与资源分配​
    提升查询并行度(parallel_degree)可加速处理,但需平衡CPU和内存消耗。例如:sqlSET GLOBAL parallel_degree = 16; -- 根据CPU核心数调整
  • ​物化视图与分区优化​
    对高频查询创建物化视图或合理分区,减少单次查询的数据扫描量,间接降低内存需求6。

四、操作验证与监控

  1. ​验证参数生效​
    修改后通过SHOW VARIABLES确认新值是否生效,并执行测试查询观察内存使用情况。
  2. ​日志与监控工具​
    • ​FE日志:检查fe.log中内存超限或超时任务记录3。
    • ​BE监控:通过curl http://BE_IP:8040/api/compaction/show?tablet_id=XXX查看Tablet状态3。
    • ​系统视图:使用SHOW PROC '/admin/stats'实时监控资源使用6
作者 east
bug清单, Flink 3月 5,2025

解决Flink SQL:Exception in thread “main” org.apache.flink.table.api.ValidationException: Rowtime attribute ‘ptime’ must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type ‘BIGINT’.

在开发Flink SQL时报错:

在flink 1.16版本中执行报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: Rowtime attribute 'ptime' must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type 'BIGINT'.

	at org.apache.flink.table.api.TableSchema.validateColumnsAndWatermarkSpecs(TableSchema.java:535)

	at org.apache.flink.table.api.TableSchema.access$100(TableSchema.java:73)

	at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:802)

	at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.build(MergeTableLikeUtil.java:534)

	at org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:154)

	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:171)

	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:74)

	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:330)

	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)

	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)

	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:758)

	at com.chuneng.saas.doris.FlinkDorisExtremeValueCalculation.main(FlinkDorisExtremeValueCalculation.java:44)

原因分析

错误的核心在于:Flink 要求用于定义 WATERMARK 的字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型,但你的 ptime 字段被定义为 BIGINT 类型。尽管你在 WATERMARK 中尝试将 ptime 转换为 TIMESTAMP,但 Flink 的 WATERMARK 语法要求直接引用一个已经存在的 TIMESTAMP 类型字段,而不是在 WATERMARK 定义中动态转换类型。

解决方案

你需要通过 计算列(Computed Column) 将 ptime 的 BIGINT 类型转换为 TIMESTAMP 类型,然后将 WATERMARK 绑定到计算列上。

例如像下面这样:

String sourceDDL = "CREATE TABLE ods_t_iot_data (" +
        "pid STRING," +
        "pvalue DECIMAL(13,3)," +
        "ptime BIGINT," +
        "ds DATE," +
        // 定义计算列,将 BIGINT 类型的 ptime 转换为 TIMESTAMP
        "event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ptime / 1000))," + 
        // 绑定 WATERMARK 到计算列 event_time
        "WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '10.0.0.1:8030'," +
        "'table.identifier' = 'cnsaas.ods_t_iot'," +
        "'username' = 'root'," +
        "'password' = ''" +
        ")";
作者 east
bug清单, Flink 3月 5,2025

解决Flink SQL报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”

在开发Flink SQL时报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”

这个错误表明您的Flink作业涉及更新或删除操作(如CDC场景),而JDBC Sink表没有定义主键。以下是详细分析和解决方案:

错误原因分析

当Flink作业涉及UPDATE/DELETE操作时(常见于CDC场景或使用Retract流),JDBC Sink表必须显式声明主键,因为:

  1. 主键用于定位目标数据库中需要更新的记录
  2. 没有主键时,Flink无法确定如何执行更新/删除操作

解决方案

在表定义中添加 PRIMARY KEY 声明(需与目标数据库表主键一致),例如:

val sinkDDL =
  """
    |CREATE TABLE dws_iot_extreme_info_min (
    |  device_code STRING,
    |  `day` DATE,
    |  ptime BIGINT,
    |  station_sn STRING,
    |  unit STRING,
    |  cu_device_code STRING,
    |  max_u_value DECIMAL(5,3),
    |  min_u_value DECIMAL(5,3),
    |  max_t_value DECIMAL(5,2),
    |  min_t_value DECIMAL(5,2),
    |  max_soc_value DECIMAL(5,2),
    |  min_soc_value DECIMAL(5,2),
    |  max_u_time BIGINT,
    |  min_u_time BIGINT,
    |  max_t_time BIGINT,
    |  min_t_time BIGINT,
    |  max_soc_time BIGINT,
    |  min_soc_time BIGINT,
    |  dt DATE,
    |  PRIMARY KEY (device_code, `day`) NOT ENFORCED  -- 添加主键声明
    |) WITH (
    |  'connector' = 'jdbc',
    |  'url' = 'jdbc:mysql://10.0.2.2:3306/cnsaas',
    |  'table-name' = 'dws_bigdata_device_extreme_info_min',
    |  'driver' = 'com.mysql.cj.jdbc.Driver',
    |  'username' = 'root',
    |  'password' = '',
    |  'sink.buffer-flush.max-rows' = '1000',    
    |  'sink.buffer-flush.interval' = '1s',      
    |  'sink.max-retries' = '3'                 
    |)
    |""".stripMargin

关键修改点说明

  1. 主键声明:PRIMARY KEY (device_code, `day`) NOT ENFORCED
    • 主键字段需与目标数据库表的主键一致
    • NOT ENFORCED 表示Flink不会校验数据主键约束,由数据库负责
  2. 目标表要求:
    • MySQL数据库中 dws_iot_extreme_info_min 表必须有相同的主键定义
    • 可通过以下SQL确保主键存在:
    • ALTER TABLE dws_iot_extreme_info_min ADD PRIMARY KEY (device_code, day);
作者 east
Flink 3月 5,2025

Flink Checkpoint 详解

一、Checkpoint 的原理

  1. 核心概念
    Checkpoint 是 Flink 的容错机制,通过定期生成分布式快照,记录流处理应用的全局状态(算子状态、键控状态等)。当发生故障时,Flink 可从最近的 Checkpoint 恢复,保证 Exactly-Once 语义。
  2. 实现机制
    • Chandy-Lamport 算法:基于 Barrier 的分布式快照。
    • 流程:
      1. 触发:JobManager 定期触发 Checkpoint,向所有 Source 发送 Barrier。
      2. Barrier 传播:Source 插入 Barrier 到数据流,算子接收到 Barrier 后暂停处理新数据,将当前状态异步持久化。
      3. 状态存储:状态写入外部存储(如 HDFS、S3)。
      4. 确认机制:所有算子确认状态保存后,Checkpoint 完成。
  3. 一致性语义
    • EXACTLY_ONCE:精确一次,通过对齐 Barrier 确保状态与数据流严格一致。
    • AT_LEAST_ONCE:至少一次,可能重复处理数据。

二、Checkpoint 的使用方法

  1. 基础配置
    在 Flink 作业中启用 Checkpoint,设置间隔、存储路径和模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint,间隔 60 秒
env.enableCheckpointing(60000);
// 配置 Exactly-Once 语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置 Checkpoint 存储路径
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints/");
// 其他配置(超时时间、最小间隔等)
env.getCheckpointConfig().setCheckpointTimeout(30000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  1. 状态后端选择
    • FsStateBackend:状态存储在内存,快照存于文件系统(适合状态较小场景)。
    • RocksDBStateBackend:状态存储在 RocksDB,支持超大状态(需权衡性能)。
  2. 恢复作业
    通过命令行或 API 从指定 Checkpoint 恢复作业:bash复制./bin/flink run -s hdfs:///checkpoints/1234 …

三、Checkpoint 的应用场景

  1. 容错恢复
    节点故障、网络中断时,从 Checkpoint 恢复状态,避免数据丢失。
  2. 有状态计算
    • 窗口聚合(如每小时销售额统计)。
    • 复杂事件处理(CEP)中的模式状态。
    • 连接操作(如流-流 Join 的中间状态)。
  3. 作业升级与扩缩容
    通过 Savepoint(手动触发的 Checkpoint)暂停作业,修改并行度或代码后恢复。

四、Flink SQL 流式写入数据到表时为何需要 Checkpoint

  1. 保障 Exactly-Once 语义
    • 当使用 Flink SQL 写入 Kafka、JDBC 表等外部系统时,Checkpoint 通过**两阶段提交协议(2PC)**协调事务:
      1. 预提交阶段:数据写入外部系统,但未提交。
      2. 提交阶段:Checkpoint 完成后,所有事务统一提交。
    • 若故障发生,Flink 回滚到上一个 Checkpoint,确保数据不重复、不丢失。
  2. 维护内部状态一致性
    • 即使目标表不支持事务(如 HBase),Checkpoint 仍保障 Flink 内部状态(如去重状态、窗口状态)的正确恢复。
  3. 避免数据丢失
    • 未启用 Checkpoint 时,若作业崩溃,可能丢失未持久化的状态和数据,导致写入结果不完整。

五、示例:Flink SQL 写入 Kafka

-- 启用 Checkpoint(隐式通过 ExecutionConfig)
SET 'execution.checkpointing.interval' = '60s';

-- 定义 Kafka Sink 表
CREATE TABLE kafka_sink (
    user_id STRING,
    count BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'output_topic',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'sink.transactional-id-prefix' = 'tx-' -- 启用 Kafka 事务
);

-- 流式写入
INSERT INTO kafka_sink 
SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id;
  • 依赖 Checkpoint:Kafka Sink 通过事务提交机制与 Checkpoint 绑定,确保每条数据仅写入一次。

总结

  • Checkpoint 原理:基于 Barrier 的分布式快照,保障状态一致性。
  • 使用场景:容错、有状态计算、作业维护。
  • Flink SQL 写入表:Checkpoint 是保证端到端 Exactly-Once 的核心机制,协调外部系统事务与内部状态恢复。
作者 east
doris 2月 7,2025

Doris更新某一列完整教程

通过临时表来更新
例如有下面的表结构

ods_t_iot (
pid varchar(255) NOT NULL,
ptime bigint NOT NULL,
pvalue decimal(38,9) NOT NULL,
ds varchar(30) NULL
) ENGINE=OLAP
UNIQUE KEY(pid, ptime, pvalue, ds)
需要把ds这一列更新为2025-02-07

  1. 创建一个新表并导入数据
    由于不能直接使用 UPDATE,一种常见的方法是创建一个新表,然后通过批量插入的方式将数据导入,并修改 ds 列的值。下面是详细步骤:

步骤 1: 创建一个新表
首先,我们需要创建一个新的表,结构与原表一致,唯一的区别是我们会把 ds 列的默认值设置为 ‘2025-02-07’,并确保与原表的列顺序、类型保持一致。

CREATE TABLE ods_t_iot_new like ods_t_iot;
步骤 2: 使用 INSERT INTO SELECT 语句从原表导入数据
接下来,我们将原表的数据导入到新表中,同时确保 ds 列的值设置为 ‘2025-02-07’。可以使用以下 SQL:

INSERT INTO ods_t_iot_new (pid, ptime, pvalue, ds) SELECT pid, ptime, pvalue, IFNULL(ds, ‘2025-02-07’) AS ds FROM ods_t_iot;
在这里,IFNULL(ds, ‘2025-02-07’) 会将原表中 ds 列为 NULL 的数据替换为 ‘2025-02-07’。

步骤 3: 删除原表并重命名新表
导入数据完成后,我们可以删除原表并将新表重命名为原表名。这样,所有数据都已经更新,ds 列也被设置为 ‘2025-02-07’。

DROP TABLE ods_t_iot;

ALTER TABLE ods_t_iot_new RENAME ods_t_iot;

作者 east
大数据开发, 面试 2月 3,2025

二本生的破局:从迷茫到上岸知名互联网大数据岗,我是这样做的

​

大学时的迷茫,直到如今回想起来,依然觉得有点“懵懂”。我本科就读于一所普通的二本院校,专业是计算机相关。对于未来的职业方向,我一直没有一个明确的目标。身边的师兄师姐们给出了很多建议。Java开发的岗位早已是“烂大街”的状态,几乎人人都能进入,但也难以脱颖而出。C++的就业方向则相对小众,竞争相对较小,但是行业需求并不大。大数据开发虽然岗位不多,但薪资可观,且未来的增长潜力巨大——这让我决心全力投入到大数据的学习和实践中。

刷题:编程能力的磨砺之路

提升编程能力,绝对是每个技术开发者必须经历的过程。对于我来说,刷题成为了提升编程能力的“必经之路”。无论是面试中的笔试,还是实际的编码工作,扎实的编程能力总是能起到决定性的作用。通过LeetCode,我挑战了200+道题。最初,很多题目都让我头大,甚至看半天也没有任何思路,感觉几乎要放弃。但我始终坚持每一道题,仔细分析每种解题方法,总结出最优的解法。

解决不了的问题,我会去寻找答案,去研究大神们是如何分析并解决问题的,理解他们的思路之后,我自己再亲手实现一遍。这个过程不仅是对算法的磨练,更是对思维方式的训练。

除了LeetCode,我还选择了《剑指 Offer》这本书。这本书的题目紧扣面试核心,尤其适合那些准备大数据或Java开发岗位的应聘者。书中不仅有面试中常见的题目,还特别注重对思路的培养。虽然有些题目一开始并不容易通过,但在面试现场,面试官更看重的是你的思考过程和解决方案,而不是你是否一次性通过了所有测试用例。

特别是大厂高频手撕面试题,更是要准备熟悉:

从上千份大厂面经呕心沥血整理:大厂高频手撕面试题(数据结构和算法篇 ,Java实现亲试可跑)


数据库:SQL技能是大数据开发的根基

SQL是大数据开发者必须掌握的一项技能,尤其是在与数据库打交道的过程中,它是操作数据的核心工具。对我来说,SQL不仅仅是日常工作中处理数据的工具,它更是提高自己业务理解和解决问题的“钥匙”。

我在学习SQL时,花了大量时间深入理解了它的高级用法,特别是窗口函数、子查询等复杂查询方法。为了更加深入地理解SQL的应用,我做了大量的练习,包括针对具体业务场景的SQL题目,如留存分析、在线时长统计、漏斗分析、连续登录天数计算等。这些练习不仅提升了我的SQL能力,还帮助我理解了如何通过SQL解决实际工作中的问题。面试中,我几乎没有遇到过SQL方面的难题,面试官通常会对我解决问题的方式给予高度评价。

大厂面试手撕SQL面试题(Hive实现:样例数据、详细思路、亲试可行的运行截图)_hive sql经典面试题-CSDN博客


理论学习:扎实的知识体系是进阶的基石

尽管编程和SQL能力非常重要,但大数据开发所涉及的理论知识同样不容忽视。这些理论知识不仅能帮助你解决实际问题,更能让你站在更高的角度去思考和分析问题。为了在大数据开发领域立足,我专注于构建一个完整的知识体系,包括数据仓库理论、操作系统原理、计算机网络、Java基础等方面的知识。

数据仓库与分层理论:
为了深入了解大数据在企业中的实际应用,我精读了《维度建模工具箱》和《大数据之路:阿里巴巴大数据实践》这两本书。《维度建模工具箱》详细介绍了如何进行数据建模,特别是维度建模,这对构建灵活且高效的数据仓库架构至关重要。而《大数据之路:阿里巴巴大数据实践》则通过阿里巴巴的真实案例,帮助我理解了大数据的应用场景以及发展趋势。

操作系统与计算机网络:
作为计算机专业的基础课程,操作系统和计算机网络对于大数据开发的深入理解同样重要。我花了两周时间复习了这些基础课程,结合项目中的实际应用加深理解。例如,在操作系统的进程管理与内存管理的复习过程中,我结合了大数据处理中的并发控制、资源分配等内容;在学习计算机网络的TCP/IP协议时,我思考这些知识如何在Hadoop等大数据组件的网络通信中得以应用。

Java编程与多线程:
Java是大数据开发中最常用的编程语言之一,我花了一个月时间复习Java的基础、源码、多线程以及JVM的相关内容。通过深入分析Java的类库和源码,我理解了Java的底层实现。多线程和JVM的内容相对复杂,但我通过阅读经典书籍并结合实际案例进行了深入学习,并最终在面试中得心应手。

MySQL优化:
MySQL作为常用的关系型数据库,优化MySQL的性能也是大数据开发者的必修课。我读了两遍《高性能MySQL》,这本书涵盖了MySQL的性能优化、索引优化、存储引擎等核心知识。在实际项目中,通过对MySQL的优化,我提升了数据存取效率,减少了响应时间,确保了数据库在大数据场景下的高效运行。


大数据组件:从基础到深入,掌握核心技术

在学习了编程语言和理论知识之后,我将重点放在了大数据开发的核心技术上。对于一个大数据开发者来说,掌握相关的大数据组件和框架是进入行业的必要条件。

Hadoop:
Hadoop作为大数据处理的基础框架,我通过反复研读《Hadoop权威指南》,深入了解了Hadoop的架构、HDFS(分布式文件系统)、MapReduce(分布式计算框架)和YARN(资源管理器)。我不仅仅停留在理论层面,还通过搭建Hadoop集群,进行了一些实际操作,积累了大量的实践经验。

Hive:
Hive是基于Hadoop的SQL查询工具,我不仅学习了HiveQL的基本语法,还深入研究了Hive的性能调优和配置优化。在项目中,我通过实践学会了如何使用Hive进行数据仓库建设和数据分析,同时优化了查询性能,提高了数据处理效率。

Spark与其他工具:
虽然Spark、Flink、Kafka等大数据工具的学习我并没有深入到实际项目中,但我确保了自己对这些工具的基本概念、应用场景和技术原理有足够的了解。对于每个工具的学习,我都花时间去了解它们的功能、优势以及如何在特定场景下应用。


精美简历:突出重点,简洁明了

简历是进入面试的第一步,而对于我来说,如何让简历“脱颖而出”是一项不容忽视的挑战。为了让面试官快速抓住我的核心竞争力,我在简历中只列出了自己真正掌握的技术,如Hadoop、Hive、Spark等。

在描述项目经验时,我尽量详细描述自己在项目中的职责、解决的关键问题和取得的成绩。例如,在数据仓库建设项目中,我详细阐述了自己负责的模块设计、遇到的难点以及如何克服这些问题;在报表开发中,我展示了如何优化查询流程、提高数据处理效率。通过这些真实的项目经验,我的简历更具说服力。


面试准备:知己知彼,百战不殆

除了理论学习与技能提升,面试技巧的磨练同样不可忽视。我查阅了大量大数据开发岗位的面试经验和面试题,整理成文档,并进行逐一分析。通过这些面经,我不仅了解了面试官最关心的技术点,还学习了如何在面试中展示自己的优势。

面试过程中,我始终保持自信,尽量展示出我的技术深度与广度。同时,我也通过模拟面试,提升自己的表达能力和应变能力。

这里汇集了上百家中大厂的大数据面经:

大数据最全面试题-Offer直通车


面试过程:从等待到收获

投递简历后,我并没有期待快速的反馈。前期的简历投递并没有让我立刻迎来面试机会,但我没有气馁。在不断优化简历、提升技术实力后,面试机会开始逐渐增多。经过几轮严格的笔试和面试,我最终成功拿到了某知名互联网企业的大数据开发岗位的Offer,薪资在同行中领先。

大数据大厂校招网申入口最全集合和2025年校园招聘时间线(持续更新)

这段求职历程不仅让我收获了职位,也让我深刻体会到,只有通过持续的学习和实践,才能在激烈的技术竞争中站稳脚跟。 ​

作者 east
Flink 1月 24,2025

解决flink报错:org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] cannot be applied to (com.chuneng.saas.dao.SinkToTDengine) iotStream.sinkTo(new SinkToTDengine)

在scala用下面的代码,

iotStream.sinkTo(new SinkToTDengine)

出现报错:

overloaded method value sinkTo with alternatives:
  (sink: org.apache.flink.api.connector.sink2.Sink[com.chuneng.saas.vo.IotData])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] <and>
  (sink: org.apache.flink.api.connector.sink.Sink[com.chuneng.saas.vo.IotData, _, _, _])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData]
 cannot be applied to (com.chuneng.saas.dao.SinkToTDengine)
    iotStream.sinkTo(new SinkToTDengine)

遇到的错误是由于 sinkTo 方法期望的参数类型与提供的 SinkToTDengine 类型不匹配。具体来说,sinkTo 方法期望的是一个实现了 Sink 接口的类型,而您的 SinkToTDengine 类继承自 RichSinkFunction,这导致类型不兼容。

错误原因分析

这表明 sinkTo 方法期望的是 Sink[IotData] 或 Sink[IotData, _, _, _] 类型,而您传递的是 SinkToTDengine,它继承自 RichSinkFunction<IotData>,因此类型不匹配。

解决方案

要解决这个问题,您需要将 SinkToTDengine 转换为 Flink 支持的 Sink 类型。

使用 addSink 方法

Flink 提供了 addSink 方法,可以直接接受实现了 SinkFunction 的自定义 Sink。这是最直接和常用的方式。

修改后的代码示例:

iotStream
  .map(rd => {
    val iotData = new IotData()
    iotData.setPid(rd.getDeviceCode.replaceAll(".", "-"))
    iotData.setVal(rd.getCompensationMW.toString)

    // 将13位时间戳转换为%Y-%m-%d %H:%M:%S.%f格式,毫秒保留3位小数
    val timestamp = rd.getTime // 假设rd.getTime()返回的是13位时间戳(毫秒级)
    val instant = Instant.ofEpochMilli(timestamp)
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
      .withLocale(Locale.CHINA)
      .withZone(ZoneId.systemDefault())
    val formattedTime = formatter.format(instant)

    iotData.setTs(formattedTime)
    iotData
  })
  .print()
  .addSink(new SinkToTDengine()) // 使用 addSink 方法
作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7通过Flink Doris Connector写入Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage

在flink 1.7项目,通过Flink Doris Connector,采用批处理读取Doris数据进行计算然后写入到doris的另外一个表。采用flink sql方式。

原来的代码进行脱敏后的代码如下:

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);

// 数据源配置
String sourceDDL = "CREATE TABLE <SOURCE_TABLE_NAME>(" +
        "pid STRING," +
        "pvalue decimal(39,3)," +
        "ptime TIMESTAMP(3)," +
        "ds DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'" +
        ")";

tableEnvironment.executeSql(sourceDDL);
// 获取当前时间戳
String timestamp = String.valueOf(System.currentTimeMillis());

// 目标 Doris 表 DDL
String sinkDDL = "CREATE TABLE <TARGET_TABLE_NAME> (" +
        "station_sn STRING," +
        "pid_system_code STRING," +
        "`day` STRING," +
        "`value` STRING," +
        "created_at TIMESTAMP(3)," +
        "dt DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'," +
        "'sink.label-prefix' = '<LABEL_PREFIX>_" + timestamp + "'" +
        ")";

执行报错如下:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
	at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
	at com.chuneng.saas.doris.FlinkBatchSql.main(FlinkBatchSql.java:68)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
	at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
	at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
	... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage@265569e2
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.getCheckpointCommittables(CommittableCollector.java:241)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:748)

这可能是 Flink 批处理模式下通常不需要 checkpoint,但 Doris Sink Connector 默认可能依赖 checkpoint 相关逻辑,从而导致 NullPointerException。

添加与 sink 行为相关的参数, 设置不用 checkpoint 。

修改后的sink如下:

String sinkDDL = “CREATE TABLE (” +
“station_sn STRING,” +
“pid_system_code STRING,” +
“day STRING,” +
“value STRING,” +
“created_at TIMESTAMP(3),” +
“dt DATE” +
“) WITH (” +
“‘connector’ = ‘doris’,” +
“‘fenodes’ = ‘:’,” +
“‘table.identifier’ = ‘.’,” +
“‘username’ = ”,” +
“‘password’ = ”,” +
“‘sink.label-prefix’ = ‘_” + timestamp + “‘,” +

"'doris.batch.size' = '1000'," +  // 批量写入大小
"'sink.enable-2pc' = 'false'" + // 禁用两阶段提交
")";
作者 east
Flink 1月 23,2025

flink 1.12用Flink SQL写入Doris的坑

在flink 1.12,用flink sql写入doris,相关pom配置如下:

   <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.12_2.11</artifactId>
      <version>1.0.3</version>
    </dependency>

to_date('2025-01-14')实际写入到doris变成了另外一个日期,非常坑。而且 
Flink Connector 24.0.0 版本之后支持使用Arrow Flight SQL 读取数据 ,速度提高非常快。

flink-doris-connector各版本兼容如下。

版本兼容​

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11,1.12,1.13,1.140.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8–
1.3.01.161.0+8–
1.4.01.15,1.16,1.171.0+8–
1.5.21.15,1.16,1.17,1.181.0+8–
1.6.21.15,1.16,1.17,1.18,1.191.0+8–
24.0.11.15,1.16,1.17,1.18,1.19,1.201.0+8–

比较好选择是选择Flink 1.16以上(可以兼容hive语法90
%以上)。升级到flink 1.17后,to_date(‘2025-01-14’)返回结果果然正常了。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7 Exception in thread “main” org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table

问题分析

  1. 报错提示的主要内容
    • ValidationException: Unable to create a sink for writing table ...
    • Could not load service provider for factories 和 org.apache.flink.table.planner.delegation.DefaultExecutorFactory not a subtype。
    • 这些问题通常是因为 Flink 运行环境或依赖配置不正确。
  2. 可能原因
    • Flink 和 Doris 依赖版本不匹配:
      • 使用的 Flink Doris Connector 是 flink-doris-connector-1.17,其版本号为 24.0.1,需要确保它与当前 Flink 的版本(1.17.x)兼容。或者flink的jar包有的不是1.17.x版本,和上面的 link-doris-connector-1.17 不兼容。

解决方案

1. 检查 Flink 和 Doris Connector 的兼容性

  • 确认 Flink 和 Doris Connector 的版本兼容。
  • 当前使用的是 flink-doris-connector-1.17,对应 Flink 1.17.x。如果使用的是其他版本的 Flink(如 1.16 或 1.18),需要更换依赖:
<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>

2. 添加 Flink Doris Connector 所需的运行时依赖

确保项目中包含以下依赖(建议手动检查 pom.xml 是否缺失),检查flink的jar是否都是1.17.x版本:

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.0</version>
    <scope>provided</scope>
</dependency>

确保 flink-table-planner_2.12 版本与 Flink 版本匹配。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 找不到org.apache.flink.table.descriptors.TableDescriptor

flink版本1.7的项目代码如下:

   StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        // 设置 Flink SQL 环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);

运行报错找不到org.apache.flink.table.descriptors.TableDescriptor。

问题分析

  1. 依赖冲突或缺失:
    • Flink 1.17.2 中 TableDescriptor 类已被废弃。Flink 1.13 开始引入了 TableDescriptor 的新概念,用于定义表源和表目标,而旧版依赖中的 org.apache.flink.table.descriptors 相关类在后续版本中被逐步移除。
    • 如果代码中还有引用 org.apache.flink.table.descriptors 包下的类(如连接器或格式描述符),可能导致运行时报错。
  2. API 版本不匹配:
    • 在 Flink 1.17.2 中,推荐使用 Table API 的新方式(TableDescriptor 不再使用)。这可能意味着您正在使用旧版本的 API,或者您的代码依赖了不兼容的旧包。

解决方案

1. 检查代码中是否仍在使用旧版 API

移除任何对 org.apache.flink.table.descriptors 的直接依赖。使用以下代码替换旧方法:

// 创建 Flink 流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 创建 Flink SQL 表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

// 定义表源或目标时,使用 Table API 的新方式
TableDescriptor descriptor = TableDescriptor.forConnector("kafka") // 替换为实际使用的连接器
        .schema(Schema.newBuilder()
                .column("field1", DataTypes.STRING())
                .column("field2", DataTypes.INT())
                .build())
        .format("json") // 替换为实际使用的格式
        .option("property.key", "value") // 替换为实际连接器选项
        .build();

tableEnv.createTemporaryTable("my_table", descriptor);

2. 更新依赖

确保项目使用的依赖与 Flink 1.17.2 版本兼容。在 pom.xml 或 build.gradle 文件中明确声明以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.2</version>
    <scope>provided</scope>
</dependency>

3. 清理旧依赖

如果仍需要使用 TableDescriptor 类,请确认没有混用老旧版本的连接器或额外库,例如 flink-connector-kafka 等。检查项目中是否存在以下老依赖,并替换为新版依赖:

旧版依赖示例:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.12.x</version>
</dependency>

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList

运行flink 1.7的项目,报错如下:

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList at org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase.<init>(FlinkPreparingTableBase.java:92) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.<init>(ExpandingPreparingTable.java:42) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.<init>(QueryOperationCatalogViewTable.java:49) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.create(QueryOperationCatalogViewTable.java:58) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.convertQueryOperationView(FlinkCalciteCatalogReader.java:146) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.toPreparingTable(FlinkCalciteCatalogReader.java:110) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:91) at org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:229) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:144) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:110) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2490) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at com.chuneng.saas.doris.FlinkCuSohJdbcSqlAnalyze.main(FlinkCuSohJdbcSqlAnalyze.java:98) Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

  • 从报错信息 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList 和 Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 可以看出,程序在运行时无法找到 org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 这个类。
  • 这通常是因为相应的依赖库没有被正确地添加到项目的类路径中,导致 JVM 在运行时无法加载所需的类。

修改方案:

  1. 确认你是否在项目的构建文件(如 Maven 的 pom.xml 或 Gradle 的 build.gradle)中添加了 Apache Flink 相关的依赖。
  2. 确保使用的 Flink 版本是兼容的,并且其依赖的 Guava 版本是 flink-shaded-guava 的 18 版本。
  3. 对于 Maven 项目,检查 pom.xml 中是否有类似如下的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-guava</artifactId>
    <version>18.0</version>
</dependency>
  1. 对于 Gradle 项目,检查 build.gradle 中是否有类似如下的依赖:

implementation 'org.apache.flink:flink-shaded-guava:18.0'
  1. 如果已经添加了依赖,可能是因为依赖冲突导致无法找到正确的类。可以使用 mvn dependency:tree(对于 Maven)或 gradle dependencies(对于 Gradle)命令查看依赖树,找出是否有多个版本的 Guava 被引入,然后通过排除冲突的依赖来解决。
作者 east

上一 1 2 3 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.