gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面2 )
Flink 3月 5,2025

Flink Checkpoint 详解

一、Checkpoint 的原理

  1. 核心概念
    Checkpoint 是 Flink 的容错机制,通过定期生成分布式快照,记录流处理应用的全局状态(算子状态、键控状态等)。当发生故障时,Flink 可从最近的 Checkpoint 恢复,保证 Exactly-Once 语义。
  2. 实现机制
    • Chandy-Lamport 算法:基于 Barrier 的分布式快照。
    • 流程:
      1. 触发:JobManager 定期触发 Checkpoint,向所有 Source 发送 Barrier。
      2. Barrier 传播:Source 插入 Barrier 到数据流,算子接收到 Barrier 后暂停处理新数据,将当前状态异步持久化。
      3. 状态存储:状态写入外部存储(如 HDFS、S3)。
      4. 确认机制:所有算子确认状态保存后,Checkpoint 完成。
  3. 一致性语义
    • EXACTLY_ONCE:精确一次,通过对齐 Barrier 确保状态与数据流严格一致。
    • AT_LEAST_ONCE:至少一次,可能重复处理数据。

二、Checkpoint 的使用方法

  1. 基础配置
    在 Flink 作业中启用 Checkpoint,设置间隔、存储路径和模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint,间隔 60 秒
env.enableCheckpointing(60000);
// 配置 Exactly-Once 语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置 Checkpoint 存储路径
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints/");
// 其他配置(超时时间、最小间隔等)
env.getCheckpointConfig().setCheckpointTimeout(30000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  1. 状态后端选择
    • FsStateBackend:状态存储在内存,快照存于文件系统(适合状态较小场景)。
    • RocksDBStateBackend:状态存储在 RocksDB,支持超大状态(需权衡性能)。
  2. 恢复作业
    通过命令行或 API 从指定 Checkpoint 恢复作业:bash复制./bin/flink run -s hdfs:///checkpoints/1234 …

三、Checkpoint 的应用场景

  1. 容错恢复
    节点故障、网络中断时,从 Checkpoint 恢复状态,避免数据丢失。
  2. 有状态计算
    • 窗口聚合(如每小时销售额统计)。
    • 复杂事件处理(CEP)中的模式状态。
    • 连接操作(如流-流 Join 的中间状态)。
  3. 作业升级与扩缩容
    通过 Savepoint(手动触发的 Checkpoint)暂停作业,修改并行度或代码后恢复。

四、Flink SQL 流式写入数据到表时为何需要 Checkpoint

  1. 保障 Exactly-Once 语义
    • 当使用 Flink SQL 写入 Kafka、JDBC 表等外部系统时,Checkpoint 通过**两阶段提交协议(2PC)**协调事务:
      1. 预提交阶段:数据写入外部系统,但未提交。
      2. 提交阶段:Checkpoint 完成后,所有事务统一提交。
    • 若故障发生,Flink 回滚到上一个 Checkpoint,确保数据不重复、不丢失。
  2. 维护内部状态一致性
    • 即使目标表不支持事务(如 HBase),Checkpoint 仍保障 Flink 内部状态(如去重状态、窗口状态)的正确恢复。
  3. 避免数据丢失
    • 未启用 Checkpoint 时,若作业崩溃,可能丢失未持久化的状态和数据,导致写入结果不完整。

五、示例:Flink SQL 写入 Kafka

-- 启用 Checkpoint(隐式通过 ExecutionConfig)
SET 'execution.checkpointing.interval' = '60s';

-- 定义 Kafka Sink 表
CREATE TABLE kafka_sink (
    user_id STRING,
    count BIGINT
) WITH (
    'connector' = 'kafka',
    'topic' = 'output_topic',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'sink.transactional-id-prefix' = 'tx-' -- 启用 Kafka 事务
);

-- 流式写入
INSERT INTO kafka_sink 
SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id;
  • 依赖 Checkpoint:Kafka Sink 通过事务提交机制与 Checkpoint 绑定,确保每条数据仅写入一次。

总结

  • Checkpoint 原理:基于 Barrier 的分布式快照,保障状态一致性。
  • 使用场景:容错、有状态计算、作业维护。
  • Flink SQL 写入表:Checkpoint 是保证端到端 Exactly-Once 的核心机制,协调外部系统事务与内部状态恢复。
作者 east
doris 2月 7,2025

Doris更新某一列完整教程

通过临时表来更新
例如有下面的表结构

ods_t_iot (
pid varchar(255) NOT NULL,
ptime bigint NOT NULL,
pvalue decimal(38,9) NOT NULL,
ds varchar(30) NULL
) ENGINE=OLAP
UNIQUE KEY(pid, ptime, pvalue, ds)
需要把ds这一列更新为2025-02-07

  1. 创建一个新表并导入数据
    由于不能直接使用 UPDATE,一种常见的方法是创建一个新表,然后通过批量插入的方式将数据导入,并修改 ds 列的值。下面是详细步骤:

步骤 1: 创建一个新表
首先,我们需要创建一个新的表,结构与原表一致,唯一的区别是我们会把 ds 列的默认值设置为 ‘2025-02-07’,并确保与原表的列顺序、类型保持一致。

CREATE TABLE ods_t_iot_new like ods_t_iot;
步骤 2: 使用 INSERT INTO SELECT 语句从原表导入数据
接下来,我们将原表的数据导入到新表中,同时确保 ds 列的值设置为 ‘2025-02-07’。可以使用以下 SQL:

INSERT INTO ods_t_iot_new (pid, ptime, pvalue, ds) SELECT pid, ptime, pvalue, IFNULL(ds, ‘2025-02-07’) AS ds FROM ods_t_iot;
在这里,IFNULL(ds, ‘2025-02-07’) 会将原表中 ds 列为 NULL 的数据替换为 ‘2025-02-07’。

步骤 3: 删除原表并重命名新表
导入数据完成后,我们可以删除原表并将新表重命名为原表名。这样,所有数据都已经更新,ds 列也被设置为 ‘2025-02-07’。

DROP TABLE ods_t_iot;

ALTER TABLE ods_t_iot_new RENAME ods_t_iot;

作者 east
大数据开发, 面试 2月 3,2025

二本生的破局:从迷茫到上岸知名互联网大数据岗,我是这样做的

​

大学时的迷茫,直到如今回想起来,依然觉得有点“懵懂”。我本科就读于一所普通的二本院校,专业是计算机相关。对于未来的职业方向,我一直没有一个明确的目标。身边的师兄师姐们给出了很多建议。Java开发的岗位早已是“烂大街”的状态,几乎人人都能进入,但也难以脱颖而出。C++的就业方向则相对小众,竞争相对较小,但是行业需求并不大。大数据开发虽然岗位不多,但薪资可观,且未来的增长潜力巨大——这让我决心全力投入到大数据的学习和实践中。

刷题:编程能力的磨砺之路

提升编程能力,绝对是每个技术开发者必须经历的过程。对于我来说,刷题成为了提升编程能力的“必经之路”。无论是面试中的笔试,还是实际的编码工作,扎实的编程能力总是能起到决定性的作用。通过LeetCode,我挑战了200+道题。最初,很多题目都让我头大,甚至看半天也没有任何思路,感觉几乎要放弃。但我始终坚持每一道题,仔细分析每种解题方法,总结出最优的解法。

解决不了的问题,我会去寻找答案,去研究大神们是如何分析并解决问题的,理解他们的思路之后,我自己再亲手实现一遍。这个过程不仅是对算法的磨练,更是对思维方式的训练。

除了LeetCode,我还选择了《剑指 Offer》这本书。这本书的题目紧扣面试核心,尤其适合那些准备大数据或Java开发岗位的应聘者。书中不仅有面试中常见的题目,还特别注重对思路的培养。虽然有些题目一开始并不容易通过,但在面试现场,面试官更看重的是你的思考过程和解决方案,而不是你是否一次性通过了所有测试用例。

特别是大厂高频手撕面试题,更是要准备熟悉:

从上千份大厂面经呕心沥血整理:大厂高频手撕面试题(数据结构和算法篇 ,Java实现亲试可跑)


数据库:SQL技能是大数据开发的根基

SQL是大数据开发者必须掌握的一项技能,尤其是在与数据库打交道的过程中,它是操作数据的核心工具。对我来说,SQL不仅仅是日常工作中处理数据的工具,它更是提高自己业务理解和解决问题的“钥匙”。

我在学习SQL时,花了大量时间深入理解了它的高级用法,特别是窗口函数、子查询等复杂查询方法。为了更加深入地理解SQL的应用,我做了大量的练习,包括针对具体业务场景的SQL题目,如留存分析、在线时长统计、漏斗分析、连续登录天数计算等。这些练习不仅提升了我的SQL能力,还帮助我理解了如何通过SQL解决实际工作中的问题。面试中,我几乎没有遇到过SQL方面的难题,面试官通常会对我解决问题的方式给予高度评价。

大厂面试手撕SQL面试题(Hive实现:样例数据、详细思路、亲试可行的运行截图)_hive sql经典面试题-CSDN博客


理论学习:扎实的知识体系是进阶的基石

尽管编程和SQL能力非常重要,但大数据开发所涉及的理论知识同样不容忽视。这些理论知识不仅能帮助你解决实际问题,更能让你站在更高的角度去思考和分析问题。为了在大数据开发领域立足,我专注于构建一个完整的知识体系,包括数据仓库理论、操作系统原理、计算机网络、Java基础等方面的知识。

数据仓库与分层理论:
为了深入了解大数据在企业中的实际应用,我精读了《维度建模工具箱》和《大数据之路:阿里巴巴大数据实践》这两本书。《维度建模工具箱》详细介绍了如何进行数据建模,特别是维度建模,这对构建灵活且高效的数据仓库架构至关重要。而《大数据之路:阿里巴巴大数据实践》则通过阿里巴巴的真实案例,帮助我理解了大数据的应用场景以及发展趋势。

操作系统与计算机网络:
作为计算机专业的基础课程,操作系统和计算机网络对于大数据开发的深入理解同样重要。我花了两周时间复习了这些基础课程,结合项目中的实际应用加深理解。例如,在操作系统的进程管理与内存管理的复习过程中,我结合了大数据处理中的并发控制、资源分配等内容;在学习计算机网络的TCP/IP协议时,我思考这些知识如何在Hadoop等大数据组件的网络通信中得以应用。

Java编程与多线程:
Java是大数据开发中最常用的编程语言之一,我花了一个月时间复习Java的基础、源码、多线程以及JVM的相关内容。通过深入分析Java的类库和源码,我理解了Java的底层实现。多线程和JVM的内容相对复杂,但我通过阅读经典书籍并结合实际案例进行了深入学习,并最终在面试中得心应手。

MySQL优化:
MySQL作为常用的关系型数据库,优化MySQL的性能也是大数据开发者的必修课。我读了两遍《高性能MySQL》,这本书涵盖了MySQL的性能优化、索引优化、存储引擎等核心知识。在实际项目中,通过对MySQL的优化,我提升了数据存取效率,减少了响应时间,确保了数据库在大数据场景下的高效运行。


大数据组件:从基础到深入,掌握核心技术

在学习了编程语言和理论知识之后,我将重点放在了大数据开发的核心技术上。对于一个大数据开发者来说,掌握相关的大数据组件和框架是进入行业的必要条件。

Hadoop:
Hadoop作为大数据处理的基础框架,我通过反复研读《Hadoop权威指南》,深入了解了Hadoop的架构、HDFS(分布式文件系统)、MapReduce(分布式计算框架)和YARN(资源管理器)。我不仅仅停留在理论层面,还通过搭建Hadoop集群,进行了一些实际操作,积累了大量的实践经验。

Hive:
Hive是基于Hadoop的SQL查询工具,我不仅学习了HiveQL的基本语法,还深入研究了Hive的性能调优和配置优化。在项目中,我通过实践学会了如何使用Hive进行数据仓库建设和数据分析,同时优化了查询性能,提高了数据处理效率。

Spark与其他工具:
虽然Spark、Flink、Kafka等大数据工具的学习我并没有深入到实际项目中,但我确保了自己对这些工具的基本概念、应用场景和技术原理有足够的了解。对于每个工具的学习,我都花时间去了解它们的功能、优势以及如何在特定场景下应用。


精美简历:突出重点,简洁明了

简历是进入面试的第一步,而对于我来说,如何让简历“脱颖而出”是一项不容忽视的挑战。为了让面试官快速抓住我的核心竞争力,我在简历中只列出了自己真正掌握的技术,如Hadoop、Hive、Spark等。

在描述项目经验时,我尽量详细描述自己在项目中的职责、解决的关键问题和取得的成绩。例如,在数据仓库建设项目中,我详细阐述了自己负责的模块设计、遇到的难点以及如何克服这些问题;在报表开发中,我展示了如何优化查询流程、提高数据处理效率。通过这些真实的项目经验,我的简历更具说服力。


面试准备:知己知彼,百战不殆

除了理论学习与技能提升,面试技巧的磨练同样不可忽视。我查阅了大量大数据开发岗位的面试经验和面试题,整理成文档,并进行逐一分析。通过这些面经,我不仅了解了面试官最关心的技术点,还学习了如何在面试中展示自己的优势。

面试过程中,我始终保持自信,尽量展示出我的技术深度与广度。同时,我也通过模拟面试,提升自己的表达能力和应变能力。

这里汇集了上百家中大厂的大数据面经:

大数据最全面试题-Offer直通车


面试过程:从等待到收获

投递简历后,我并没有期待快速的反馈。前期的简历投递并没有让我立刻迎来面试机会,但我没有气馁。在不断优化简历、提升技术实力后,面试机会开始逐渐增多。经过几轮严格的笔试和面试,我最终成功拿到了某知名互联网企业的大数据开发岗位的Offer,薪资在同行中领先。

大数据大厂校招网申入口最全集合和2025年校园招聘时间线(持续更新)

这段求职历程不仅让我收获了职位,也让我深刻体会到,只有通过持续的学习和实践,才能在激烈的技术竞争中站稳脚跟。 ​

作者 east
Flink 1月 24,2025

解决flink报错:org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] cannot be applied to (com.chuneng.saas.dao.SinkToTDengine) iotStream.sinkTo(new SinkToTDengine)

在scala用下面的代码,

iotStream.sinkTo(new SinkToTDengine)

出现报错:

overloaded method value sinkTo with alternatives:
  (sink: org.apache.flink.api.connector.sink2.Sink[com.chuneng.saas.vo.IotData])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] <and>
  (sink: org.apache.flink.api.connector.sink.Sink[com.chuneng.saas.vo.IotData, _, _, _])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData]
 cannot be applied to (com.chuneng.saas.dao.SinkToTDengine)
    iotStream.sinkTo(new SinkToTDengine)

遇到的错误是由于 sinkTo 方法期望的参数类型与提供的 SinkToTDengine 类型不匹配。具体来说,sinkTo 方法期望的是一个实现了 Sink 接口的类型,而您的 SinkToTDengine 类继承自 RichSinkFunction,这导致类型不兼容。

错误原因分析

这表明 sinkTo 方法期望的是 Sink[IotData] 或 Sink[IotData, _, _, _] 类型,而您传递的是 SinkToTDengine,它继承自 RichSinkFunction<IotData>,因此类型不匹配。

解决方案

要解决这个问题,您需要将 SinkToTDengine 转换为 Flink 支持的 Sink 类型。

使用 addSink 方法

Flink 提供了 addSink 方法,可以直接接受实现了 SinkFunction 的自定义 Sink。这是最直接和常用的方式。

修改后的代码示例:

iotStream
  .map(rd => {
    val iotData = new IotData()
    iotData.setPid(rd.getDeviceCode.replaceAll(".", "-"))
    iotData.setVal(rd.getCompensationMW.toString)

    // 将13位时间戳转换为%Y-%m-%d %H:%M:%S.%f格式,毫秒保留3位小数
    val timestamp = rd.getTime // 假设rd.getTime()返回的是13位时间戳(毫秒级)
    val instant = Instant.ofEpochMilli(timestamp)
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
      .withLocale(Locale.CHINA)
      .withZone(ZoneId.systemDefault())
    val formattedTime = formatter.format(instant)

    iotData.setTs(formattedTime)
    iotData
  })
  .print()
  .addSink(new SinkToTDengine()) // 使用 addSink 方法
作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7通过Flink Doris Connector写入Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage

在flink 1.7项目,通过Flink Doris Connector,采用批处理读取Doris数据进行计算然后写入到doris的另外一个表。采用flink sql方式。

原来的代码进行脱敏后的代码如下:

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);

// 数据源配置
String sourceDDL = "CREATE TABLE <SOURCE_TABLE_NAME>(" +
        "pid STRING," +
        "pvalue decimal(39,3)," +
        "ptime TIMESTAMP(3)," +
        "ds DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'" +
        ")";

tableEnvironment.executeSql(sourceDDL);
// 获取当前时间戳
String timestamp = String.valueOf(System.currentTimeMillis());

// 目标 Doris 表 DDL
String sinkDDL = "CREATE TABLE <TARGET_TABLE_NAME> (" +
        "station_sn STRING," +
        "pid_system_code STRING," +
        "`day` STRING," +
        "`value` STRING," +
        "created_at TIMESTAMP(3)," +
        "dt DATE" +
        ") WITH (" +
        "'connector' = 'doris'," +
        "'fenodes' = '<IP_ADDRESS>:<PORT>'," +
        "'table.identifier' = '<DATABASE_NAME>.<TABLE_NAME>'," +
        "'username' = '<USERNAME>'," +
        "'password' = '<PASSWORD>'," +
        "'sink.label-prefix' = '<LABEL_PREFIX>_" + timestamp + "'" +
        ")";

执行报错如下:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
	at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
	at com.chuneng.saas.doris.FlinkBatchSql.main(FlinkBatchSql.java:68)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
	at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
	at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
	... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	... 4 more
Caused by: java.lang.NullPointerException: Unknown checkpoint for org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage@265569e2
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.getCheckpointCommittables(CommittableCollector.java:241)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:193)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:748)

这可能是 Flink 批处理模式下通常不需要 checkpoint,但 Doris Sink Connector 默认可能依赖 checkpoint 相关逻辑,从而导致 NullPointerException。

添加与 sink 行为相关的参数, 设置不用 checkpoint 。

修改后的sink如下:

String sinkDDL = “CREATE TABLE (” +
“station_sn STRING,” +
“pid_system_code STRING,” +
“day STRING,” +
“value STRING,” +
“created_at TIMESTAMP(3),” +
“dt DATE” +
“) WITH (” +
“‘connector’ = ‘doris’,” +
“‘fenodes’ = ‘:’,” +
“‘table.identifier’ = ‘.’,” +
“‘username’ = ”,” +
“‘password’ = ”,” +
“‘sink.label-prefix’ = ‘_” + timestamp + “‘,” +

"'doris.batch.size' = '1000'," +  // 批量写入大小
"'sink.enable-2pc' = 'false'" + // 禁用两阶段提交
")";
作者 east
Flink 1月 23,2025

flink 1.12用Flink SQL写入Doris的坑

在flink 1.12,用flink sql写入doris,相关pom配置如下:

   <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.12_2.11</artifactId>
      <version>1.0.3</version>
    </dependency>

to_date('2025-01-14')实际写入到doris变成了另外一个日期,非常坑。而且 
Flink Connector 24.0.0 版本之后支持使用Arrow Flight SQL 读取数据 ,速度提高非常快。

flink-doris-connector各版本兼容如下。

版本兼容​

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11,1.12,1.13,1.140.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8–
1.3.01.161.0+8–
1.4.01.15,1.16,1.171.0+8–
1.5.21.15,1.16,1.17,1.181.0+8–
1.6.21.15,1.16,1.17,1.18,1.191.0+8–
24.0.11.15,1.16,1.17,1.18,1.19,1.201.0+8–

比较好选择是选择Flink 1.16以上(可以兼容hive语法90
%以上)。升级到flink 1.17后,to_date(‘2025-01-14’)返回结果果然正常了。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7 Exception in thread “main” org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table

问题分析

  1. 报错提示的主要内容
    • ValidationException: Unable to create a sink for writing table ...
    • Could not load service provider for factories 和 org.apache.flink.table.planner.delegation.DefaultExecutorFactory not a subtype。
    • 这些问题通常是因为 Flink 运行环境或依赖配置不正确。
  2. 可能原因
    • Flink 和 Doris 依赖版本不匹配:
      • 使用的 Flink Doris Connector 是 flink-doris-connector-1.17,其版本号为 24.0.1,需要确保它与当前 Flink 的版本(1.17.x)兼容。或者flink的jar包有的不是1.17.x版本,和上面的 link-doris-connector-1.17 不兼容。

解决方案

1. 检查 Flink 和 Doris Connector 的兼容性

  • 确认 Flink 和 Doris Connector 的版本兼容。
  • 当前使用的是 flink-doris-connector-1.17,对应 Flink 1.17.x。如果使用的是其他版本的 Flink(如 1.16 或 1.18),需要更换依赖:
<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>

2. 添加 Flink Doris Connector 所需的运行时依赖

确保项目中包含以下依赖(建议手动检查 pom.xml 是否缺失),检查flink的jar是否都是1.17.x版本:

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
    <version>24.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.0</version>
    <scope>provided</scope>
</dependency>

确保 flink-table-planner_2.12 版本与 Flink 版本匹配。

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 找不到org.apache.flink.table.descriptors.TableDescriptor

flink版本1.7的项目代码如下:

   StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        // 设置 Flink SQL 环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);

运行报错找不到org.apache.flink.table.descriptors.TableDescriptor。

问题分析

  1. 依赖冲突或缺失:
    • Flink 1.17.2 中 TableDescriptor 类已被废弃。Flink 1.13 开始引入了 TableDescriptor 的新概念,用于定义表源和表目标,而旧版依赖中的 org.apache.flink.table.descriptors 相关类在后续版本中被逐步移除。
    • 如果代码中还有引用 org.apache.flink.table.descriptors 包下的类(如连接器或格式描述符),可能导致运行时报错。
  2. API 版本不匹配:
    • 在 Flink 1.17.2 中,推荐使用 Table API 的新方式(TableDescriptor 不再使用)。这可能意味着您正在使用旧版本的 API,或者您的代码依赖了不兼容的旧包。

解决方案

1. 检查代码中是否仍在使用旧版 API

移除任何对 org.apache.flink.table.descriptors 的直接依赖。使用以下代码替换旧方法:

// 创建 Flink 流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 创建 Flink SQL 表执行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

// 定义表源或目标时,使用 Table API 的新方式
TableDescriptor descriptor = TableDescriptor.forConnector("kafka") // 替换为实际使用的连接器
        .schema(Schema.newBuilder()
                .column("field1", DataTypes.STRING())
                .column("field2", DataTypes.INT())
                .build())
        .format("json") // 替换为实际使用的格式
        .option("property.key", "value") // 替换为实际连接器选项
        .build();

tableEnv.createTemporaryTable("my_table", descriptor);

2. 更新依赖

确保项目使用的依赖与 Flink 1.17.2 版本兼容。在 pom.xml 或 build.gradle 文件中明确声明以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.17.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.2</version>
    <scope>provided</scope>
</dependency>

3. 清理旧依赖

如果仍需要使用 TableDescriptor 类,请确认没有混用老旧版本的连接器或额外库,例如 flink-connector-kafka 等。检查项目中是否存在以下老依赖,并替换为新版依赖:

旧版依赖示例:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.12.x</version>
</dependency>

作者 east
bug清单, Flink 1月 23,2025

解决flink 1.7项目 java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList

运行flink 1.7的项目,报错如下:

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList at org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase.<init>(FlinkPreparingTableBase.java:92) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.<init>(ExpandingPreparingTable.java:42) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.<init>(QueryOperationCatalogViewTable.java:49) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.create(QueryOperationCatalogViewTable.java:58) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.convertQueryOperationView(FlinkCalciteCatalogReader.java:146) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.toPreparingTable(FlinkCalciteCatalogReader.java:110) at org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:91) at org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:229) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:144) at org.apache.calcite.sql.validate.SqlValidatorUtil.getRelOptTable(SqlValidatorUtil.java:110) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2490) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) at com.chuneng.saas.doris.FlinkCuSohJdbcSqlAnalyze.main(FlinkCuSohJdbcSqlAnalyze.java:98) Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

  • 从报错信息 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/ImmutableList 和 Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 可以看出,程序在运行时无法找到 org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList 这个类。
  • 这通常是因为相应的依赖库没有被正确地添加到项目的类路径中,导致 JVM 在运行时无法加载所需的类。

修改方案:

  1. 确认你是否在项目的构建文件(如 Maven 的 pom.xml 或 Gradle 的 build.gradle)中添加了 Apache Flink 相关的依赖。
  2. 确保使用的 Flink 版本是兼容的,并且其依赖的 Guava 版本是 flink-shaded-guava 的 18 版本。
  3. 对于 Maven 项目,检查 pom.xml 中是否有类似如下的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-guava</artifactId>
    <version>18.0</version>
</dependency>
  1. 对于 Gradle 项目,检查 build.gradle 中是否有类似如下的依赖:

implementation 'org.apache.flink:flink-shaded-guava:18.0'
  1. 如果已经添加了依赖,可能是因为依赖冲突导致无法找到正确的类。可以使用 mvn dependency:tree(对于 Maven)或 gradle dependencies(对于 Gradle)命令查看依赖树,找出是否有多个版本的 Guava 被引入,然后通过排除冲突的依赖来解决。
作者 east
Java, 大数据开发 1月 22,2025

手撕代码刷题秘籍,小白到Offer收割机的进阶之路

​

 要准备面试,无非就是要准备熟背八股文,做有深度的项目,好好学习数据结构和算法,刷题准备手撕面试题。

其中手撕面试题的准备时间是比较长的。八股文和大厂手撕面试题可以看下面专栏,花了半个月时间从上千份大厂面经分类整理出面试题及手撕面试题。

嵌入式最全面试题-Offer直通车

大数据最全面试题-Offer直通车

Java开发及前端最全面试题-Offer直通车

Android最全面试题-Offer直通车

C++后端开发最全面试题-从入门到Offer

线上笔试

不少公司面试的第一关就是线上笔试题📃。咱得自己在家对着电脑做,就像考试一样,限定时间内完成题目。这种时候,环境相对熟悉,压力可能没那么大,但也得注意别因为在家就放松警惕,一定要严格按照考试要求来,提前调试好设备,保证网络稳定,别到时候因为这些小问题影响发挥。

现场手撕代码

等过了笔试,到了现场面试,这可就刺激了🤯。面试官会给你一台电脑,让你当场敲代码,有时候甚至直接给你一张纸,让你手写代码。这种情况对咱的心理素质和编程能力要求更高。在面试官眼皮子底下写代码,紧张是肯定的,但越这时候越得稳住,千万别慌了神。咱平时刷题练的就是这时候的底气💪。

入门指南:选对资料,开启刷题第一步

刚接触手撕代码刷题,选对入门资料太重要了📚,除了学校的数据结构和算法教科书,公认是推荐《剑指 Offer》。这本书对数据结构和算法的讲解特别详细,还有对应的 Java 代码示例,很适合新手。你可以先从基础的数据结构,像数组、链表这些开始看,把基础打牢。网上也有不少相关的在线教程,比如慕课网、网易云课堂上都有优质课程,跟着视频一步步学,理解起来更容易。

手撕面试题很多,下面这些是大厂高频的手撕面试题:

从上千份大厂面经呕心沥血整理:大厂高频手撕面试题(数据结构和算法篇 ,Java实现亲试可跑)

 从上千份大厂面经呕心沥血整理:大厂高频手撕面试题(数据结构和算法篇 ,C++实现亲试可跑)

循序渐进:从易到难,稳步提升

刷题就像爬山,得一步一个脚印👣。先从简单的题目入手,比如求数组的和、判断一个数是否为素数这类基础题。把这些简单题做熟了,不仅能建立自信,还能让咱熟悉编程的基本语法和逻辑。等简单题得心应手了,再慢慢增加难度,比如做一些涉及排序算法优化、查找算法应用的题目。面对难题别害怕,就像拆解乐高积木一样,把问题拆分成一个个小问题,逐个击破。每次成功解决一道难题,你会发现自己的编程能力又上了一个台阶。

实战演练:参加竞赛,提升实战能力

如果是在校生,参加算法竞赛对提升大数据刷题能力简直太有帮助了🎉。像 ACM 国际大学生程序设计竞赛、蓝桥杯这些,都是很不错的平台。在竞赛中,你会遇到来自各地的高手,和他们同场竞技,能让你见识到各种巧妙的解题思路和编程技巧。而且竞赛的时间压力很大,能锻炼你在紧张环境下快速思考和编写代码的能力。就像我认识一个学长,参加了几次 ACM 竞赛后,再去面试大数据岗位,那些手撕代码的题目对他来说轻松多了。

合理规划:把握刷题节奏和时间

刷题可不是一蹴而就的事儿,得合理安排时间和节奏🕙。每天刷几道题,这个得根据自己的情况来。要是你时间比较充裕,每天刷 3 – 5 道题也没问题;要是平时学业或者工作忙,每天保证 1 – 2 道题的练习量。别一开始就猛刷,把自己累到了,后面反而坚持不下去。一般来说,先把基础的算法和数据结构题目刷完,再去刷一些综合应用的题目。刷完一本书或者一个阶段的题目后,可以去力扣、牛客网这些平台上找一些大数据专项题目来巩固,刷个 80 – 150 道,基本就差不多了。

效果检验:判断刷题能力是否提升

怎么知道自己刷题有没有效果呢🧐?首先就是看刷题的数量,量变引起质变,刷的题多了,自然会有感觉。但光数量可不够,还得看质量。比如你能不能用多种方法解决同一道题,这说明你对知识点理解得很透彻。还有就是尝试挑战一些难度更高的题目,如果能顺利解决,那能力肯定提升了。另外,刷题平台一般都会给出代码的时间复杂度和空间复杂度分析,看看自己的代码效率有没有提高,这也是检验能力的重要标准。 ​

作者 east
Flink 1月 22,2025

Flink1.7官方文档中文翻译:及时流处理

简介#

及时流处理是有状态流处理的一种扩展,其中时间在计算中发挥一定作用。例如,在进行时间序列分析、基于特定时间段(通常称为窗口)进行聚合,或者在处理事件时事件发生的时间很关键等情况时,都会涉及到及时流处理。
在接下来的章节中,我们将重点介绍在使用 Flink 进行及时流处理应用开发时,需要考虑的一些主题。
返回顶部

时间概念:事件时间与处理时间#

在流处理程序中提及时间(例如定义窗口时),可以涉及不同的时间概念:

  • 处理时间:处理时间指的是执行相应操作的机器的系统时间。

当一个流处理程序基于处理时间运行时,所有基于时间的操作(如时间窗口)将使用运行相应操作符的机器的系统时钟。一个按小时划分的处理时间窗口将包含在系统时钟显示整点之间到达特定操作符的所有记录。例如,如果一个应用程序在上午 9:15 开始运行,第一个按小时划分的处理时间窗口将包含上午 9:15 到 10:00 之间处理的事件,下一个窗口将包含上午 10:00 到 11:00 之间处理的事件,依此类推。
处理时间是最简单的时间概念,无需在流和机器之间进行协调。它能提供最佳性能和最低延迟。然而,在分布式和异步环境中,处理时间不具备确定性,因为它易受记录进入系统的速度(例如从消息队列进入)、记录在系统内操作符之间流动的速度以及中断(计划内或其他情况)的影响。

  • 事件时间:事件时间是每个事件在其产生设备上发生的时间。这个时间通常在记录进入 Flink 之前就嵌入其中,并且可以从每条记录中提取出事件时间戳。在事件时间中,时间的推进取决于数据,而非任何物理时钟。基于事件时间的程序必须指定如何生成事件时间水印,这是一种在事件时间中标记时间推进的机制。这种水印机制将在后续章节中介绍。

在理想情况下,无论事件何时到达或其顺序如何,基于事件时间的处理都能产生完全一致且确定的结果。然而,除非已知事件按时间戳顺序到达,否则事件时间处理在等待乱序事件时会产生一定延迟。由于只能等待有限的时间,这就限制了基于事件时间的应用程序的确定性程度。
假设所有数据都已到达,即使处理乱序或迟到的事件,或者重新处理历史数据,基于事件时间的操作也会按预期执行,并产生正确且一致的结果。例如,一个按小时划分的事件时间窗口将包含所有携带的事件时间戳属于该小时的记录,无论它们到达的顺序如何,也无论它们何时被处理。(有关更多信息,请参阅 “延迟” 部分。)
请注意,有时基于事件时间的程序在实时处理实时数据时,会使用一些基于处理时间的操作,以确保它们能够及时推进。
事件时间与处理时间

Event Time and Processing Time


事件时间与水印#

注意:Flink 实现了数据流模型中的许多技术。若要深入了解事件时间和水印,可查看以下文章。

  • Tyler Akidau 的《Streaming 101》
  • 《数据流模型》论文

一个支持事件时间的流处理器需要一种方式来衡量事件时间的推进。例如,一个构建按小时划分窗口的窗口操作符,需要在事件时间超过一小时结束时得到通知,以便该操作符能够关闭正在处理的窗口。
事件时间可以独立于处理时间(由物理时钟测量)推进。例如,在一个程序中,某个操作符的当前事件时间可能略落后于处理时间(考虑到接收事件的延迟),但两者以相同速度推进。另一方面,另一个流处理程序可能通过快速处理已经缓冲在 Kafka 主题(或其他消息队列)中的一些历史数据,在仅几秒钟的处理时间内推进数周的事件时间。
Flink 中衡量事件时间推进的机制是水印。水印作为数据流的一部分流动,并携带一个时间戳 t。Watermark (t) 声明在该流中事件时间已到达时间 t,这意味着该流中不应再有时间戳 t’ <= t 的元素(即时间戳早于或等于水印的事件)。
下图展示了带有(逻辑)时间戳的事件流以及同步流动的水印。在这个例子中,事件是按(时间戳)顺序排列的,这意味着水印只是流中的周期性标记。
有序事件流和水印
水印对于乱序流至关重要,如下图所示,其中事件并非按时间戳排序。一般来说,水印表明在流中的那个点,所有到某个时间戳的事件都应该已经到达。一旦水印到达一个操作符,该操作符可以将其内部事件时间时钟推进到水印的值。
无序事件流和水印
请注意,新创建的流元素的事件时间继承自产生它们的事件,或者触发这些元素创建的水印。

并行流中的水印#

水印在源函数处或紧随源函数之后生成。源函数的每个并行子任务通常独立生成其水印。这些水印定义了该特定并行源的事件时间。
随着水印在流处理程序中流动,它们会推进水印到达的操作符处的事件时间。每当一个操作符推进其事件时间时,它会为下游的后续操作符生成一个新的水印。
有些操作符会消费多个输入流,例如 union 操作符,或者在 keyBy (…) 或 partition (…) 函数之后的操作符。这样的操作符的当前事件时间是其输入流事件时间的最小值。随着其输入流更新它们的事件时间,该操作符的事件时间也会更新。
下图展示了事件和水印在并行流中流动,以及操作符跟踪事件时间的示例。
并行数据流、操作符与事件和水印

延迟#

有可能某些元素会违反水印条件,即即使在 Watermark (t) 出现之后,仍会出现更多时间戳 t’ <= t 的元素。实际上,在许多实际场景中,某些元素可能会被任意延迟,使得无法指定一个时间,保证在该时间之前具有特定事件时间戳的所有元素都已到达。此外,即使延迟可以界定,将水印延迟太多通常也不可取,因为这会导致事件时间窗口的评估出现过多延迟。
因此,流处理程序可能会明确预期一些延迟元素。延迟元素是指在系统的事件时间时钟(由水印指示)已经超过延迟元素的时间戳之后才到达的元素。有关如何在事件时间窗口中处理延迟元素的更多信息,请参阅 “允许的延迟”。

窗口化#

对流中的事件进行聚合(例如计数、求和)与批处理中的方式不同。例如,不可能对流中的所有元素进行计数,因为流通常是无限的(无界的)。相反,对流的聚合(计数、求和等)是通过窗口来界定范围的,例如 “过去 5 分钟的计数” 或 “过去 100 个元素的求和”。
窗口可以由时间驱动(例如:每 30 秒)或由数据驱动(例如:每 100 个元素)。通常可以区分不同类型的窗口,例如滚动窗口(无重叠)、滑动窗口(有重叠)和会话窗口(由不活动间隙分隔)。
时间窗口和计数窗口
有关窗口的更多示例,请查看此博客文章,或者查看 DataStream API 的窗口文档。介绍一下Flink的时间概念如何在Flink中使用事件时间?Flink的窗口操作符有哪些?

作者 east
Flink 1月 22,2025

Flink1.7官方文档中文翻译:有状态流处理

什么是状态?#
虽然数据流中的许多操作通常一次仅处理单个事件(例如事件解析器),但有些操作会在多个事件间记住相关信息(例如窗口操作符)。这些操作被称为有状态操作。
有状态操作的一些示例:

  • 当应用程序搜索特定的事件模式时,状态会存储到目前为止遇到的事件序列。
  • 按分钟 / 小时 / 天聚合事件时,状态保存待处理的聚合结果。
  • 在一系列数据点上训练机器学习模型时,状态保存模型参数的当前版本。
  • 当需要管理历史数据时,状态允许高效访问过去发生的事件。

Flink 需要了解状态,以便使用检查点和保存点实现容错。
了解状态还有助于对 Flink 应用程序进行重新缩放,这意味着 Flink 会负责在并行实例间重新分配状态。
可查询状态允许你在运行时从 Flink 外部访问状态。
在处理状态时,了解 Flink 的状态后端可能也会有所帮助。Flink 提供了不同的状态后端,用于指定状态的存储方式和存储位置。

键控状态 #
键控状态维护在一个可视为嵌入式键值存储的结构中。状态与有状态操作符读取的流严格分区并一起分布。因此,仅在键控流上才能访问键值状态,即在进行键控 / 分区数据交换之后,并且只能访问与当前事件的键相关联的值。将流的键与状态的键对齐,可确保所有状态更新都是本地操作,无需事务开销即可保证一致性。这种对齐还使 Flink 能够透明地重新分配状态并调整流分区。

状态与分区

键控状态进一步组织为所谓的键组。键组是 Flink 重新分配键控状态的基本单元;键组的数量与定义的最大并行度完全相同。在执行过程中,键控操作符的每个并行实例处理一个或多个键组的键。

State and Partitioning

状态持久性#

Flink 通过流重放和检查点相结合的方式实现容错。一个检查点标记每个输入流中的特定点,以及每个操作符的相应状态。通过恢复操作符的状态并从检查点处重新播放记录,流数据流可以从检查点恢复,同时保持一致性(精确一次处理语义)。
检查点间隔是在执行期间容错开销与恢复时间(需要重新播放的记录数)之间进行权衡的一种方式。
容错机制持续对分布式流数据流进行快照。对于状态较小的流应用程序,这些快照非常轻量级,可以频繁进行,而对性能影响不大。流应用程序的状态存储在可配置的位置,通常是分布式文件系统中。
如果程序发生故障(由于机器、网络或软件故障),Flink 会停止分布式流数据流。然后系统重新启动操作符,并将它们重置到最近一次成功的检查点。输入流被重置到状态快照的位置。作为重新启动的并行数据流一部分处理的任何记录,都保证不会影响先前检查点的状态。
默认情况下,检查点功能是禁用的。有关如何启用和配置检查点的详细信息,请参阅 “检查点”。
为使此机制充分发挥其保证作用,数据流源(如消息队列或代理)需要能够将流倒回到最近定义的点。Apache Kafka 具备此能力,Flink 与 Kafka 的连接器利用了这一点。有关 Flink 连接器提供的保证的更多信息,请参阅 “数据源和接收器的容错保证”。
由于 Flink 的检查点是通过分布式快照实现的,我们可互换使用 “快照” 和 “检查点” 这两个词。通常我们也用 “快照” 一词来指代检查点或保存点。

检查点#

Flink 容错机制的核心部分是对分布式数据流和操作符状态进行一致性快照。这些快照作为一致性检查点,系统在发生故障时可以回退到这些检查点。Flink 进行这些快照的机制在《分布式数据流的轻量级异步快照》中有描述。它受标准的 Chandy – Lamport 分布式快照算法启发,并专门针对 Flink 的执行模型进行了定制。
请记住,与检查点相关的所有操作都可以异步完成。检查点屏障不会同步移动,操作可以异步对其状态进行快照。
自 Flink 1.11 起,检查点可以在有对齐或无对齐的情况下进行。在本节中,我们先描述对齐检查点。

屏障#

Flink 分布式快照中的一个核心元素是流屏障。这些屏障被注入到数据流中,并作为数据流的一部分与记录一起流动。屏障永远不会超过记录,它们严格按顺序流动。一个屏障将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录集。每个屏障携带它前面推送的快照的 ID。屏障不会中断流的流动,因此非常轻量级。来自不同快照的多个屏障可以同时存在于流中,这意味着各种快照可以并发发生。
数据流中的检查点屏障:流屏障在流源处被注入到并行数据流中。注入快照 n 的屏障的点(我们称之为 Sn)是源流中快照覆盖数据的位置。例如,在 Apache Kafka 中,这个位置将是分区中最后一条记录的偏移量。这个位置 Sn 会报告给检查点协调器(Flink 的 JobManager)。

Checkpoint barriers in data streams


然后屏障向下游流动。当一个中间操作符从其所有输入流接收到快照 n 的屏障时,它会向其所有输出流发送一个快照 n 的屏障。一旦一个接收器操作符(流 DAG 的末端)从其所有输入流接收到屏障 n,它就会向检查点协调器确认快照 n。在所有接收器都确认一个快照后,该快照被视为完成。

Aligning data streams at operators with multiple inputs


一旦快照 n 完成,作业将不再要求源提供 Sn 之前的记录,因为此时这些记录(及其衍生记录)将已经通过整个数据流拓扑。
在具有多个输入的操作符处对齐数据流:接收多个输入流的操作符需要在快照屏障上对齐输入流。上图说明了这一点:

  • 一旦操作符从传入流接收到快照屏障 n,在它也从其他输入接收到屏障 n 之前,它不能处理来自该流的任何更多记录。否则,它会将属于快照 n 的记录与属于快照 n + 1 的记录混合。
  • 一旦最后一个流接收到屏障 n,操作符会发出所有挂起的输出记录,然后自己发出快照 n 屏障。
  • 它对状态进行快照,并从所有输入流恢复处理记录,在处理来自流的记录之前先处理输入缓冲区中的记录。
  • 最后,操作符将状态异步写入状态后端。

请注意,所有具有多个输入的操作符以及在洗牌后消耗多个上游子任务输出流的操作符都需要进行对齐。

操作符状态快照#

当操作符包含任何形式的状态时,此状态也必须是快照的一部分。
操作符在从其输入流接收到所有快照屏障的时间点,并且在向其输出流发送屏障之前,对其状态进行快照。在该时间点,已经对屏障之前的记录进行了所有状态更新,并且尚未应用依赖于屏障之后记录的更新。由于快照的状态可能很大,它存储在可配置的状态后端中。默认情况下,这是 JobManager 的内存,但对于生产使用,应配置分布式可靠存储(如 HDFS)。在状态存储之后,操作符确认检查点,向输出流发送快照屏障,然后继续执行。
生成的快照现在包含:

  • 对于每个并行流数据源,启动快照时流中的偏移量 / 位置。
  • 对于每个操作符,指向作为快照一部分存储的状态的指针。

检查点机制图示

Illustration of the Checkpointing Mechanism

恢复#

在此机制下的恢复很简单:发生故障时,Flink 选择最新完成的检查点 k。然后系统重新部署整个分布式数据流,并为每个操作符提供作为检查点 k 一部分进行快照的状态。源被设置为从位置 Sk 开始读取流。例如在 Apache Kafka 中,这意味着告诉消费者从偏移量 Sk 开始获取数据。
如果状态是增量快照的,操作符从最新的完整快照状态开始,然后对该状态应用一系列增量快照更新。
有关更多信息,请参阅 “重启策略”。

非对齐检查点#

检查点也可以以非对齐方式执行。基本思想是,只要飞行中的数据成为操作符状态的一部分,检查点就可以超过所有飞行中的数据。
请注意,这种方法实际上更接近 Chandy – Lamport 算法,但 Flink 仍然在源中插入屏障,以避免使检查点协调器过载。
非对齐检查点:该图展示了一个操作符如何处理非对齐检查点屏障:

  • 操作符对存储在其输入缓冲区中的第一个屏障做出反应。
  • 它立即通过将屏障添加到输出缓冲区的末尾,将其转发到下游操作符。
  • 操作符标记所有被超过的记录以异步存储,并创建自己状态的快照。
  • 因此,操作符仅短暂停止输入处理以标记缓冲区、转发屏障并创建其他状态的快照。

非对齐检查点确保屏障尽快到达接收器。它特别适用于至少有一个缓慢移动数据路径的应用程序,在这种情况下对齐时间可能长达数小时。但是,由于它会增加额外的 I/O 压力,当到状态后端的 I/O 成为瓶颈时,它并无帮助。有关其他限制,请参阅操作中的更深入讨论。
请注意,保存点始终是对齐的。

非对齐恢复#

在非对齐检查点中,操作符在开始处理来自上游操作符的任何数据之前,首先恢复飞行中的数据。除此之外,它执行与对齐检查点恢复期间相同的步骤。

状态后端#

键 / 值索引存储的确切数据结构取决于所选的状态后端。一种状态后端将数据存储在内存哈希表中,另一种状态后端使用 RocksDB 作为键值存储。除了定义保存状态的数据结构之外,状态后端还实现了对键值状态进行时间点快照并将该快照作为检查点一部分存储的逻辑。可以在不更改应用程序逻辑的情况下配置状态后端。
检查点和快照

checkpoints and snapshots

保存点#

所有使用检查点的程序都可以从保存点恢复执行。保存点允许在不丢失任何状态的情况下更新程序和 Flink 集群。
保存点是手动触发的检查点,它对程序进行快照并将其写入状态后端。它们依赖于常规的检查点机制来实现这一点。
保存点与检查点类似,不同之处在于它们由用户触发,并且在新的检查点完成时不会自动过期。为了正确使用保存点,了解检查点与保存点之间的区别非常重要,“检查点与保存点” 中对此进行了描述。

精确一次与至少一次#

对齐步骤可能会给流程序增加延迟。通常,这种额外延迟在几毫秒量级,但我们也看到过一些异常值的延迟明显增加的情况。对于要求所有记录始终具有超低延迟(几毫秒)的应用程序,Flink 提供了一个开关,可在检查点期间跳过流对齐。一旦操作符从每个输入看到检查点屏障,仍会立即进行检查点快照。
当跳过对齐时,即使在检查点 n 的一些检查点屏障到达后,操作符仍会继续处理所有输入。这样,在为检查点 n 拍摄状态快照之前,操作符也会处理属于检查点 n + 1 的元素。在恢复时,这些记录将作为重复项出现,因为它们既包含在检查点 n 的状态快照中,又将作为检查点 n 之后的数据的一部分被重新播放。
仅对于具有多个前驱(连接)的操作符以及具有多个发送者(在流重新分区 / 洗牌之后)的操作符才会发生对齐。因此,仅包含易于并行的流操作(map ()、flatMap ()、filter () 等)的数据流实际上即使在至少一次模式下也能提供精确一次的保证。

批处理程序中的状态与容错#

Flink 将批处理程序作为流程序的一种特殊情况执行,其中流是有界的(元素数量有限)。DataSet 在内部被视为数据流。因此,上述概念同样适用于批处理程序,与适用于流程序的方式相同,但有一些小的例外:

  • 批处理程序的容错不使用检查点。恢复通过完全重放流来实现。这是可行的,因为输入是有界的。这将成本更多地推向恢复阶段,但使常规处理更便宜,因为它避免了检查点。
  • DataSet API 中的有状态操作使用简化的内存 / 外存数据结构,而不是键 / 值索引。
  • DataSet API 引入了特殊的同步(基于超步)迭代,这仅在有界流上才可行。有关详细信息,请查看迭代文档。
作者 east

上一 1 2 3 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.