gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
  • ( 页面2 )
Flink 1月 22,2025

Flink1.7官方文档中文翻译:有状态流处理

什么是状态?#
虽然数据流中的许多操作通常一次仅处理单个事件(例如事件解析器),但有些操作会在多个事件间记住相关信息(例如窗口操作符)。这些操作被称为有状态操作。
有状态操作的一些示例:

  • 当应用程序搜索特定的事件模式时,状态会存储到目前为止遇到的事件序列。
  • 按分钟 / 小时 / 天聚合事件时,状态保存待处理的聚合结果。
  • 在一系列数据点上训练机器学习模型时,状态保存模型参数的当前版本。
  • 当需要管理历史数据时,状态允许高效访问过去发生的事件。

Flink 需要了解状态,以便使用检查点和保存点实现容错。
了解状态还有助于对 Flink 应用程序进行重新缩放,这意味着 Flink 会负责在并行实例间重新分配状态。
可查询状态允许你在运行时从 Flink 外部访问状态。
在处理状态时,了解 Flink 的状态后端可能也会有所帮助。Flink 提供了不同的状态后端,用于指定状态的存储方式和存储位置。

键控状态 #
键控状态维护在一个可视为嵌入式键值存储的结构中。状态与有状态操作符读取的流严格分区并一起分布。因此,仅在键控流上才能访问键值状态,即在进行键控 / 分区数据交换之后,并且只能访问与当前事件的键相关联的值。将流的键与状态的键对齐,可确保所有状态更新都是本地操作,无需事务开销即可保证一致性。这种对齐还使 Flink 能够透明地重新分配状态并调整流分区。

状态与分区

键控状态进一步组织为所谓的键组。键组是 Flink 重新分配键控状态的基本单元;键组的数量与定义的最大并行度完全相同。在执行过程中,键控操作符的每个并行实例处理一个或多个键组的键。

State and Partitioning

状态持久性#

Flink 通过流重放和检查点相结合的方式实现容错。一个检查点标记每个输入流中的特定点,以及每个操作符的相应状态。通过恢复操作符的状态并从检查点处重新播放记录,流数据流可以从检查点恢复,同时保持一致性(精确一次处理语义)。
检查点间隔是在执行期间容错开销与恢复时间(需要重新播放的记录数)之间进行权衡的一种方式。
容错机制持续对分布式流数据流进行快照。对于状态较小的流应用程序,这些快照非常轻量级,可以频繁进行,而对性能影响不大。流应用程序的状态存储在可配置的位置,通常是分布式文件系统中。
如果程序发生故障(由于机器、网络或软件故障),Flink 会停止分布式流数据流。然后系统重新启动操作符,并将它们重置到最近一次成功的检查点。输入流被重置到状态快照的位置。作为重新启动的并行数据流一部分处理的任何记录,都保证不会影响先前检查点的状态。
默认情况下,检查点功能是禁用的。有关如何启用和配置检查点的详细信息,请参阅 “检查点”。
为使此机制充分发挥其保证作用,数据流源(如消息队列或代理)需要能够将流倒回到最近定义的点。Apache Kafka 具备此能力,Flink 与 Kafka 的连接器利用了这一点。有关 Flink 连接器提供的保证的更多信息,请参阅 “数据源和接收器的容错保证”。
由于 Flink 的检查点是通过分布式快照实现的,我们可互换使用 “快照” 和 “检查点” 这两个词。通常我们也用 “快照” 一词来指代检查点或保存点。

检查点#

Flink 容错机制的核心部分是对分布式数据流和操作符状态进行一致性快照。这些快照作为一致性检查点,系统在发生故障时可以回退到这些检查点。Flink 进行这些快照的机制在《分布式数据流的轻量级异步快照》中有描述。它受标准的 Chandy – Lamport 分布式快照算法启发,并专门针对 Flink 的执行模型进行了定制。
请记住,与检查点相关的所有操作都可以异步完成。检查点屏障不会同步移动,操作可以异步对其状态进行快照。
自 Flink 1.11 起,检查点可以在有对齐或无对齐的情况下进行。在本节中,我们先描述对齐检查点。

屏障#

Flink 分布式快照中的一个核心元素是流屏障。这些屏障被注入到数据流中,并作为数据流的一部分与记录一起流动。屏障永远不会超过记录,它们严格按顺序流动。一个屏障将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录集。每个屏障携带它前面推送的快照的 ID。屏障不会中断流的流动,因此非常轻量级。来自不同快照的多个屏障可以同时存在于流中,这意味着各种快照可以并发发生。
数据流中的检查点屏障:流屏障在流源处被注入到并行数据流中。注入快照 n 的屏障的点(我们称之为 Sn)是源流中快照覆盖数据的位置。例如,在 Apache Kafka 中,这个位置将是分区中最后一条记录的偏移量。这个位置 Sn 会报告给检查点协调器(Flink 的 JobManager)。

Checkpoint barriers in data streams


然后屏障向下游流动。当一个中间操作符从其所有输入流接收到快照 n 的屏障时,它会向其所有输出流发送一个快照 n 的屏障。一旦一个接收器操作符(流 DAG 的末端)从其所有输入流接收到屏障 n,它就会向检查点协调器确认快照 n。在所有接收器都确认一个快照后,该快照被视为完成。

Aligning data streams at operators with multiple inputs


一旦快照 n 完成,作业将不再要求源提供 Sn 之前的记录,因为此时这些记录(及其衍生记录)将已经通过整个数据流拓扑。
在具有多个输入的操作符处对齐数据流:接收多个输入流的操作符需要在快照屏障上对齐输入流。上图说明了这一点:

  • 一旦操作符从传入流接收到快照屏障 n,在它也从其他输入接收到屏障 n 之前,它不能处理来自该流的任何更多记录。否则,它会将属于快照 n 的记录与属于快照 n + 1 的记录混合。
  • 一旦最后一个流接收到屏障 n,操作符会发出所有挂起的输出记录,然后自己发出快照 n 屏障。
  • 它对状态进行快照,并从所有输入流恢复处理记录,在处理来自流的记录之前先处理输入缓冲区中的记录。
  • 最后,操作符将状态异步写入状态后端。

请注意,所有具有多个输入的操作符以及在洗牌后消耗多个上游子任务输出流的操作符都需要进行对齐。

操作符状态快照#

当操作符包含任何形式的状态时,此状态也必须是快照的一部分。
操作符在从其输入流接收到所有快照屏障的时间点,并且在向其输出流发送屏障之前,对其状态进行快照。在该时间点,已经对屏障之前的记录进行了所有状态更新,并且尚未应用依赖于屏障之后记录的更新。由于快照的状态可能很大,它存储在可配置的状态后端中。默认情况下,这是 JobManager 的内存,但对于生产使用,应配置分布式可靠存储(如 HDFS)。在状态存储之后,操作符确认检查点,向输出流发送快照屏障,然后继续执行。
生成的快照现在包含:

  • 对于每个并行流数据源,启动快照时流中的偏移量 / 位置。
  • 对于每个操作符,指向作为快照一部分存储的状态的指针。

检查点机制图示

Illustration of the Checkpointing Mechanism

恢复#

在此机制下的恢复很简单:发生故障时,Flink 选择最新完成的检查点 k。然后系统重新部署整个分布式数据流,并为每个操作符提供作为检查点 k 一部分进行快照的状态。源被设置为从位置 Sk 开始读取流。例如在 Apache Kafka 中,这意味着告诉消费者从偏移量 Sk 开始获取数据。
如果状态是增量快照的,操作符从最新的完整快照状态开始,然后对该状态应用一系列增量快照更新。
有关更多信息,请参阅 “重启策略”。

非对齐检查点#

检查点也可以以非对齐方式执行。基本思想是,只要飞行中的数据成为操作符状态的一部分,检查点就可以超过所有飞行中的数据。
请注意,这种方法实际上更接近 Chandy – Lamport 算法,但 Flink 仍然在源中插入屏障,以避免使检查点协调器过载。
非对齐检查点:该图展示了一个操作符如何处理非对齐检查点屏障:

  • 操作符对存储在其输入缓冲区中的第一个屏障做出反应。
  • 它立即通过将屏障添加到输出缓冲区的末尾,将其转发到下游操作符。
  • 操作符标记所有被超过的记录以异步存储,并创建自己状态的快照。
  • 因此,操作符仅短暂停止输入处理以标记缓冲区、转发屏障并创建其他状态的快照。

非对齐检查点确保屏障尽快到达接收器。它特别适用于至少有一个缓慢移动数据路径的应用程序,在这种情况下对齐时间可能长达数小时。但是,由于它会增加额外的 I/O 压力,当到状态后端的 I/O 成为瓶颈时,它并无帮助。有关其他限制,请参阅操作中的更深入讨论。
请注意,保存点始终是对齐的。

非对齐恢复#

在非对齐检查点中,操作符在开始处理来自上游操作符的任何数据之前,首先恢复飞行中的数据。除此之外,它执行与对齐检查点恢复期间相同的步骤。

状态后端#

键 / 值索引存储的确切数据结构取决于所选的状态后端。一种状态后端将数据存储在内存哈希表中,另一种状态后端使用 RocksDB 作为键值存储。除了定义保存状态的数据结构之外,状态后端还实现了对键值状态进行时间点快照并将该快照作为检查点一部分存储的逻辑。可以在不更改应用程序逻辑的情况下配置状态后端。
检查点和快照

checkpoints and snapshots

保存点#

所有使用检查点的程序都可以从保存点恢复执行。保存点允许在不丢失任何状态的情况下更新程序和 Flink 集群。
保存点是手动触发的检查点,它对程序进行快照并将其写入状态后端。它们依赖于常规的检查点机制来实现这一点。
保存点与检查点类似,不同之处在于它们由用户触发,并且在新的检查点完成时不会自动过期。为了正确使用保存点,了解检查点与保存点之间的区别非常重要,“检查点与保存点” 中对此进行了描述。

精确一次与至少一次#

对齐步骤可能会给流程序增加延迟。通常,这种额外延迟在几毫秒量级,但我们也看到过一些异常值的延迟明显增加的情况。对于要求所有记录始终具有超低延迟(几毫秒)的应用程序,Flink 提供了一个开关,可在检查点期间跳过流对齐。一旦操作符从每个输入看到检查点屏障,仍会立即进行检查点快照。
当跳过对齐时,即使在检查点 n 的一些检查点屏障到达后,操作符仍会继续处理所有输入。这样,在为检查点 n 拍摄状态快照之前,操作符也会处理属于检查点 n + 1 的元素。在恢复时,这些记录将作为重复项出现,因为它们既包含在检查点 n 的状态快照中,又将作为检查点 n 之后的数据的一部分被重新播放。
仅对于具有多个前驱(连接)的操作符以及具有多个发送者(在流重新分区 / 洗牌之后)的操作符才会发生对齐。因此,仅包含易于并行的流操作(map ()、flatMap ()、filter () 等)的数据流实际上即使在至少一次模式下也能提供精确一次的保证。

批处理程序中的状态与容错#

Flink 将批处理程序作为流程序的一种特殊情况执行,其中流是有界的(元素数量有限)。DataSet 在内部被视为数据流。因此,上述概念同样适用于批处理程序,与适用于流程序的方式相同,但有一些小的例外:

  • 批处理程序的容错不使用检查点。恢复通过完全重放流来实现。这是可行的,因为输入是有界的。这将成本更多地推向恢复阶段,但使常规处理更便宜,因为它避免了检查点。
  • DataSet API 中的有状态操作使用简化的内存 / 外存数据结构,而不是键 / 值索引。
  • DataSet API 引入了特殊的同步(基于超步)迭代,这仅在有界流上才可行。有关详细信息,请查看迭代文档。
作者 east
Flink 1月 22,2025

Flink1.7文档 时间表函数

时间表函数提供了在特定时间点访问时间表版本的功能。为了访问时间表中的数据,必须传递一个时间属性,该属性确定返回的表的版本。Flink 使用表函数的 SQL 语法来提供这种访问方式。

与版本化表不同,时间表函数只能在追加-only 流上定义——它不支持变更日志输入。此外,时间表函数不能通过纯 SQL DDL 来定义。

定义时间表函数

时间表函数可以使用 Table API 在追加-only 流上定义。表会注册一个或多个键列,以及用于版本控制的时间属性。

假设我们有一个追加-only 的货币汇率表,我们希望将其注册为时间表函数。

SELECT * FROM currency_rates; 
update_timecurrencyrate
09:00:00Yen102
09:00:00Euro114
09:00:00USD1
11:15:00Euro119
11:49:00Pounds108

使用 Table API,我们可以使用 currency 作为键,并将 update_time 作为版本时间属性来注册该流。

Java 示例:

TemporalTableFunction rates = tEnv
    .from("currency_rates")
    .createTemporalTableFunction("update_time", "currency");
 
tEnv.createTemporarySystemFunction("rates", rates);      

时间表函数连接

定义时间表函数后,它可以作为标准表函数使用。追加-only 表(左输入/探测方)可以与时间表(右输入/构建方)连接,即一个随着时间变化并跟踪其变化的表,用于在特定时间点获取某个键的值。

考虑一个追加-only 表 orders,它跟踪客户的订单并使用不同的货币。

SELECT * FROM orders; 
order_timeamountcurrency
10:152Euro
10:301USD
10:3250Yen
10:523Euro
11:045USD

给定这些表,我们希望将订单转换为一种统一的货币——美元(USD)。

SQL 查询:

SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
作者 east
Flink 1月 8,2025

解决flink Caused by: java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor

运行flink代码报错:

运行报错:Exception in thread "main" org.apache.flink.table.api.TableException: Unexpected error when trying to load service provider.
	at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:826)
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:525)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:295)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:266)
	at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
	at com.xxx.a_jobs.FlinkBatchHiveJob$.main(FlinkBatchHiveJob.scala:35)
	at com.xxx.a_jobs.FlinkBatchHiveJob.main(FlinkBatchHiveJob.scala)
Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider org.apache.flink.table.planner.delegation.DefaultDialectFactory could not be instantiated
	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:817)
	... 6 more
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor
	at java.lang.Class.getDeclaredConstructors0(Native Method)
	at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
	at java.lang.Class.getConstructor0(Class.java:3075)
	at java.lang.Class.newInstance(Class.java:412)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
	... 9 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.delegation.ExtendedOperationExecutor

报错的核心问题是在加载 Flink 的服务提供程序时出现了异常。具体的错误信息是 java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor 和 java.lang.ClassNotFoundException: org.apache.flink.table.delegation.ExtendedOperationExecutor。这通常是由于类缺失或者缺少相应的依赖库导致的。

解析错误信息

  1. 错误描述:Copy CodeCaused by: java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor 这是因为 org.apache.flink.table.delegation.ExtendedOperationExecutor 类未找到。Flink 在尝试加载这个类时,发现它不存在。这通常意味着某个必需的类库没有被正确包含在项目的依赖中。
  2. 原因分析:
    • Flink 依赖的类库中缺少了某些必需的 JAR 文件,或者你的项目中缺少某些必要的依赖。
    • 可能是因为缺少了 Hive 相关的依赖,或者使用的 Flink 版本与 Hive 连接的依赖不兼容。
    • 另外,org.apache.flink.table.planner.delegation.DefaultDialectFactory 类在创建时也无法实例化,这表明整个 Flink SQL 引擎的配置或者依赖加载出现了问题。

解决方案

1. 检查依赖:

确保你的项目中包含了 Flink 和 Hive 的相关依赖,特别是你使用的 Flink 版本与 Hive 相关的 JAR 文件。

对于 Flink 1.17.1 和 Hive,你应该确保以下依赖被正确添加到项目中:

  • Flink和hive 依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.17.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>1.17.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hive_2.11</artifactId>
    <version>1.17.1</version>
</dependency>

2. 检查兼容性:

确保你所使用的 Flink 版本与 Hive 连接器的版本兼容。在 Flink 1.17.x 中,某些老旧的 Hive 连接器可能会出现兼容性问题。你可以查阅 Flink 官方文档 查看兼容的版本。

作者 east
Flink 1月 6,2025

解决flink toAppendStream doesn’t support consuming update changes which is produced by node GroupAggregate

下面的flink代码:

String sqlQuery = “SELECT MAX(val) AS max_val, MIN(val) AS min_val FROM dataT GROUP BY pid”;

Table resultTable = tableEnv.sqlQuery(sqlQuery);

DataStream resultStream = tableEnv.toAppendStream(resultTable, Row.class);

运行报错:

org.apache.flink.table.api.TableException: toAppendStream doesn’t support consuming update changes which is produced by node GroupAggregate(groupBy=[pid], select=[pid, MAX(val) AS max_val, MIN(val) AS min_val])

原因分析

报错信息提示 toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate,这意味着在当前代码中使用 toAppendStream 方法去转换结果表为 DataStream 时出现了不兼容的情况。
在 Flink 中,当执行包含聚合操作(比如这里的 GROUP BY 以及 MAX、MIN 聚合函数计算)的 SQL 查询时,查询结果可能会产生更新(update)类型的变更,而 toAppendStream 方法只适用于那种仅追加(append-only)类型的结果,也就是结果表中数据只会新增而不会有更新、删除等变更的情况。这里的聚合操作导致了结果存在更新变化,所以调用 toAppendStream 就抛出了异常,它无法处理这种带有更新的数据变更情况。

正确代码修改思路及示例

要解决这个问题,可以使用 toRetractStream 方法来替代 toAppendStream 方法,toRetractStream 方法可以处理包含更新、删除等多种变更类型的数据,它返回的 DataStream 中元素是包含了一个布尔值标志(表示是新增还是撤回操作)以及实际的数据行(对应查询结果行)的二元组形式。
以下是修改后的代码示例:



        // 2. 添加 Source
        DataStream<RunData> dataSource = env.addSource(new TDengineSourceFunction(jdbcUrl, user, password, query));

        // 3. 注册临时表
        tableEnv.createTemporaryView("rundata", dataSource, "pid, val"); // 根据实际字段调整

        // 4. 执行 SQL 查询以计算最大值和最小值
        String sqlQuery = "SELECT MAX(val) AS max_val, MIN(val) AS min_val FROM dataT GROUP BY pid";
        Table resultTable = tableEnv.sqlQuery(sqlQuery);

        // 5. 将结果转换为 DataStream 并打印,这里使用 toRetractStream 替代 toAppendStream
        DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
        resultStream.print();

        // 6. 触发执行
        env.execute("Flink SQL Max and Min Calculation");
    }
}
作者 east
Flink, tdengine 1月 3,2025

Flink读取TDEngine数据实例,解决com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields错误

用flink读取TDEngine,运行报错:
com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields

这意味着 com.taosdata.jdbc.rs.RestfulDatabaseMetaData 类的对象无法被序列化,而 Flink 的作业中涉及到的某些操作需要将对象传递到不同的任务中,这就要求对象是可序列化的(即实现了 Serializable 接口)。在 Flink 中,所有要在分布式环境中传输或持久化的对象都必须是可序列化的。

  • RestfulDatabaseMetaData 是 TDengine JDBC 驱动中的一个类,它可能没有实现 Serializable 接口,因此在需要将该类对象传输到其他机器时,Flink 无法进行序列化。

解决方法是

使用 transient 关键字避免对不可序列化对象进行传递。

通过标记 connection、preparedStatement 和 resultSet 为 transient,这些对象不会被 Flink 传递到 Task Manager。

完整可执行代码如下:


import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class TDengineSourceFunction extends RichParallelSourceFunction<RunData> {

    private transient Connection connection;        // 使用 transient 避免序列化
    private transient PreparedStatement preparedStatement;
    private transient ResultSet resultSet;
    private String query;
    private volatile boolean isRunning = true;

    private String jdbcUrl;
    private String user;

    private String password;


    public TDengineSourceFunction(String jdbcUrl, String user, String password, String query) {
        this.query = query;
        this.jdbcUrl = jdbcUrl;
        this.user = user;
        this.password = password;

        // JDBC连接参数在open()方法中初始化
    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
        // 在这里初始化数据库连接
        this.connection = DriverManager.getConnection(jdbcUrl, user, password);
        // 准备SQL查询语句
        this.preparedStatement = connection.prepareStatement(query);
        this.resultSet = preparedStatement.executeQuery();
    }

    @Override
    public void run(SourceContext<RunData> sourceContext) throws Exception {
        while (isRunning && resultSet.next()) {
            // 从ResultSet中提取数据并转换为RunData对象
            RunData data = convertResultSetToData(resultSet);
            // 将数据发送到Flink的处理流中
            if (data != null) {
                sourceContext.collect(data);
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        // 关闭资源
        try {
            if (resultSet != null) resultSet.close();
            if (preparedStatement != null) preparedStatement.close();
            if (connection != null) connection.close();
        } catch (SQLException e) {
            // 处理关闭资源时的异常
            e.printStackTrace();
        }
    }

    private RunData convertResultSetToData(ResultSet resultSet) throws SQLException {
        // 提取单行数据
      
        // 将数据转换为 RunData 对象


      //  return new RunData(......);
        return null;
    }
}
作者 east
Flink 1月 3,2025

解决flink读取TDEngine的数据Could not initialize class com.taosdata.jdbc.TSDBJNIConnector

需要用flink读取TDEngine的数据,用jdbc方式连接,运行报错:Could not initialize class com.taosdata.jdbc.TSDBJNIConnector

JDBC-JNI的方式需要 TDengine-client(使用JDBC-JNI时必须,使用JDBC-RESTful时非必须),所以采用JDBC-RESTful 的方式,原因是一开始想用
JDBC-JNI 的方式,想改用 JDBC-RESTful 代码没改干净。



通过指定URL获取连接,如下所示:

Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata";Connection conn = DriverManager.getConnection(jdbcUrl);

以上示例,使用 JDBC-RESTful 的 driver,建立了到 hostname 为 taosdemo.com,端口为 6041,数据库名为 test 的连接。这个 URL 中指定用户名(user)为 root,密码(password)为 taosdata。

使用 JDBC-RESTful 接口,不需要依赖本地函数库。与 JDBC-JNI 相比,仅需要:

  1. driverClass 指定为“com.taosdata.jdbc.rs.RestfulDriver”;
  2. jdbcUrl 以“jdbc:TAOS-RS://”开头;
  3. 使用 6041 作为连接端口。

按上面的方式修改,果然没有再报上面的错误。

作者 east
Flink 1月 3,2025

flink运行报错:java.lang.IllegalStateException: No ExecutorFactory found to execute the application

在本地运行flink代码,报错“
java.lang.IllegalStateException: No ExecutorFactory found to execute the application ”

通常是由于缺少必要的 Flink 依赖项导致的。具体来说,Flink 需要特定的执行器工厂来运行应用程序,而这些依赖项可能未正确包含在您的项目中。

原因分析

  1. 缺少 Flink 运行时依赖:
    • 您的代码片段看起来是基于 Flink 的流处理 API 编写的。如果项目缺少 Flink 运行时的相关依赖(例如 flink-java, flink-streaming-java, 和 flink-clients),Flink 将无法找到执行器工厂来启动作业。
  2. 依赖版本不匹配:
    • 如果您使用的 Flink 版本与代码不兼容,也可能导致类似的问题。确保所有 Flink 相关依赖的版本一致。
  3. 缺少必要的插件或扩展:
    • 某些情况下,特定的 Flink 插件或扩展可能缺失,导致执行器工厂无法加载。

解决方案

确保您的项目中包含了所有必要的 Flink 依赖项。以下是使用 Maven 的示例 pom.xml 配置,确保包含了 Flink 的核心和流处理依赖:

<dependencies>
    <!-- Flink 核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.16.2</version> <!-- 请根据需要替换为合适的版本 -->
    </dependency>

    <!-- Flink 流处理依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.16.2</version> <!-- 版本需与 flink-java 一致 -->
    </dependency>

    <!-- Flink 客户端依赖(如果需要远程提交作业) -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.16.2</version>
    </dependency>



    <!-- 如果使用自定义 Source 和 Sink,确保它们所在的依赖已添加 -->
</dependencies>
作者 east
Flink 9月 23,2024

Flink Lookup Join在性能优化方面都提供了哪些机制?

Apache Flink的Lookup Join是一种特殊类型的连接操作,它允许流表与外部数据库中的维表进行实时关联。在性能优化方面,Flink提供了以下几种机制:

  1. 同步和异步查询机制:Flink支持通过查询提示开启同步和异步查询模式。在异步模式下,可以并发地向维表发送多个数据项的查询请求,这样可以减少等待单个查询响应的时间,从而提高整体的查询性能。 
  2. Ordered和Unordered查询机制:在异步查询模式中,Flink还提供了Ordered和Unordered两种输出模式。Ordered模式需要等待所有数据查询完成并对数据进行排序后才能发送给下游,而Unordered模式不对输出顺序做要求,可以在查询到结果后立即发送,这样可以进一步提升性能。 
  3. Cache机制:Flink支持使用Cache机制来优化Lookup Join的性能。通过将维表数据缓存到内存中,可以减少对数据库的直接查询次数,加快数据检索速度。Flink提供了Full Caching(将所有数据缓存)和Partial Caching(使用LRU算法缓存最近使用的数据)两种缓存策略。 
  4. 状态管理优化:Flink的状态后端(State Backends)和Checkpoint机制也对Lookup Join的性能有影响。选择合适的状态后端和调整Checkpoint间隔可以优化状态的存储和恢复,从而间接提升Lookup Join的性能。
  5. 查询提示(Query Hints):Flink允许用户通过查询提示来指导优化器如何执行Lookup Join,这可以帮助优化器生成更高效的执行计划。 
作者 east
Flink 9月 23,2024

Flink Lookup Join 相比传统Join有哪些优势?

Flink Lookup Join 的优势

Flink Lookup Join 相比传统的 Join 操作,在实时数据处理方面具有明显的优势:

  1. 低延迟查询:Lookup Join 能够根据键值动态查找外部维表中的数据,这对于需要快速响应的实时数据流处理场景非常适用。 
  2. 异步查询:Flink 支持异步查找机制,这有助于提高查询性能和吞吐量,尤其是在处理大规模数据时。 
  3. 高效的维度查询:通过将维表数据加载到内存中并进行索引,Lookup Join 提供了快速查找和关联维度数据的能力,减少了对外部存储系统的访问次数。 
  4. 实时维度关联:Lookup Join 允许实时流数据与维表进行关联,这对于获取最新的维度信息至关重要,如在电商领域中将订单流与商品维表关联。 
  5. 动态数据关联:适用于维表数据动态变化的场景,确保流数据处理过程中能够及时获取最新的维度信息。 
  6. 事件时间关联:支持基于事件时间的关联操作,这在处理时间敏感的数据时尤为重要,如用户行为分析等。 
作者 east
doris, Flink 9月 18,2024

流批一体化,Flink+Doris取代Spark+Hive来开发离线数仓

Flink + Doris 可以作为替代方案,用于开发离线数仓,替代 Spark + Hive 组合。这个架构能够实现更高的性能和实时性,同时简化了架构的复杂性。让我们详细分析一下 Flink + Doris 取代 Spark + Hive 的优点及适用场景:

1. 实时性与流处理能力

  • Flink 的优势:Flink 是一个分布式流处理框架,支持事件驱动和批处理模式,特别擅长处理实时数据流和复杂事件处理(CEP)。Flink 能够以低延迟处理海量数据,确保数据能够迅速进入下游系统。
  • Doris 的实时写入和分析:Doris 支持实时数据加载,能够高效处理来自 Flink 的流式数据,提供秒级查询的响应能力,适合实时数据分析场景。

Spark + Hive 的组合主要用于批处理,Hive 本身不擅长处理实时数据,Spark 的流处理能力也不如 Flink 专注且强大。因此,Flink + Doris 更适合有实时数据需求的场景,尤其是在需要低延迟的业务数据分析和展示时。

2. 查询性能和并发处理

  • Doris 的高效查询:Doris 作为一个专为分析设计的 MPP 数据库,擅长处理大规模数据查询,支持高并发的 SQL 查询并提供低延迟响应,适合在线分析型应用(OLAP)。
  • 列式存储和向量化执行:Doris 使用列式存储和向量化执行引擎,在处理复杂查询时效率更高,能够大幅缩短查询时间。相比之下,Hive 在执行复杂查询时较慢,尤其是基于 HDFS 的查询,依赖磁盘 I/O,性能较差。

Spark + Hive 组合通常需要大量的计算资源来执行批量查询任务,且在查询复杂度上性能不如 Doris。

3. 简化架构与运维成本

  • Flink 和 Doris 的简洁架构:Flink 与 Doris 都具备较高的集成度和易用性,且不依赖于像 Hadoop 这样的复杂生态系统。Flink + Doris 的组合能够简化系统架构,减少数据流动中的延迟,且维护成本相对较低。
  • 轻量级部署:Doris 作为轻量级 OLAP 数据库,部署和运维都较为简单,不需要 Hadoop 的支持。相比之下,Hive 依赖于 Hadoop 生态系统,需要更多的组件(如 HDFS、YARN 等)来保证其工作,这增加了架构的复杂度和运维难度。

Spark + Hive 架构复杂,需要更多的组件支持,部署和维护较为繁琐,Flink + Doris 在这一方面更具优势。

4. 批处理和流处理的统一

  • Flink 批流一体化:Flink 提供了统一的编程模型,能够同时处理批处理和流处理任务,使得离线数仓架构能够更灵活地应对各种数据处理需求。
  • 批处理能力:尽管 Flink 主要以流处理见长,但它在批处理方面的表现也相当出色。通过批流一体化架构,开发人员可以更高效地处理历史数据和实时数据,极大简化了数据处理的开发和运维。

Spark + Hive 则主要关注批处理任务,对于流数据的处理能力相对弱一些。Spark 的流处理框架(Spark Streaming)比 Flink 在复杂流处理上的能力有限。

5. 灵活的数据集成

  • 数据集成与传输:Flink 可以轻松集成各种数据源,包括 Kafka、文件系统、数据库等。它可以将流式和批量数据统一处理后,通过 Doris 实现实时分析和查询。
  • Doris 的多种导入方式:Doris 支持多种数据导入方式,能够高效地处理 Flink 输出的数据流(例如通过 HTTP、Broker、Stream Load 等方式),这使得两者之间的集成非常顺畅。

Spark + Hive 在数据集成的实时性上不如 Flink + Doris。Spark 处理数据后通常还需要依赖 Hive 进行存储和管理,数据查询和更新的延迟较高。

6. 高可扩展性

  • 分布式处理:Flink 作为流处理框架具备出色的可扩展性,能够处理大规模的数据流。Doris 也是一个分布式架构,能够扩展到数百个节点,适应大规模数仓需求。
  • 集群管理:Flink 和 Doris 都支持分布式集群管理,能够根据业务需求动态扩展计算和存储能力。

Spark + Hive 也具有可扩展性,但其扩展性受 Hadoop 生态的限制,复杂性更高。

适用场景

Flink + Doris 适用于需要实时数据处理、高性能查询以及复杂多维分析的场景,如:

  • 实时数据流分析(用户行为分析、监控告警系统)
  • 实时数据仓库(T+0 数据仓库)
  • 多维度的在线查询(报表系统、BI 工具)
  • 需要兼顾批处理和流处理的场景

Spark + Hive 更适用于需要处理大规模离线批量数据且对实时性要求不高的传统数仓场景。

作者 east
Flink 9月 14,2024

Flink中的窗口与传统数据库中的窗函数有何不同?

Flink中的窗口与传统数据库中的窗函数主要有以下几个方面的不同:

  1. 实时处理与批处理的差异:Flink是专为实时数据流处理设计的,其窗口机制能够处理无限数据流,并支持事件时间和处理时间的概念。相比之下,传统数据库中的窗函数通常用于批处理,处理的是有界数据集。 
  2. 窗口类型的多样性:Flink提供了多种窗口类型,包括滚动窗口、滑动窗口、会话窗口和全局窗口,这些窗口可以根据时间或计数来定义。而传统数据库中的窗函数通常较为基础,主要是基于时间的窗口聚合。 
  3. 窗口函数的实现:Flink中的窗口函数不仅支持全量聚合,还支持增量聚合,后者在性能上更为优越,特别是在处理大规模数据流时。此外,Flink的窗口函数可以与触发器结合使用,以控制窗口的计算时机。 
  4. 时间语义的明确性:Flink的窗口函数在处理事件时间时提供了明确的时间语义,这对于确保实时数据分析的准确性至关重要。而传统数据库中的窗函数通常不涉及事件时间的概念。 
  5. 窗口的动态创建:Flink中的窗口是动态创建的,只有当窗口内的数据到达时才会创建相应的窗口,这有助于优化内存使用和计算资源。 
作者 east
Flink 6月 19,2024

Apache Flink处理IoT复杂数据流程案例

使用Apache Flink处理IoT复杂数据是一项涉及多个步骤和组件的任务,包括数据接入、数据清洗、实时处理、状态管理、窗口计算、以及结果输出等。以下是一个全面且详细的Flink流处理框架,结合理论和实际应用,以处理IoT数据为主线。

1. 引入依赖和设置环境

首先,需要在你的项目中引入Flink所需的依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>
xmlCopy Code<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

2. 创建Flink执行环境

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}
javaCopy Codeimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}

3. 数据接入

通常,IoT数据会通过Kafka或其他消息队列接入。假设使用Kafka作为数据源:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        // 进一步处理...
    }
}




4. 数据清洗和解析

实际的IoT数据通常是JSON格式的字符串,需要进行解析和清洗:

import org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}
javaCopy Codeimport org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}

5. 定义时间窗口和处理函数

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer and parsing setup...

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        // 进一步处理...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }
    }
}

6. 输出结果到外部系统

处理后的数据通常需要写到数据库、文件系统或者其他外部系统。以写入到Kafka为例:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}
javaCopy Codeimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}

7. 完整代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Properties;

public class IoTDataProcessing {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;

        // constructor, getters, setters...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }

        @Override
        public String toString() {
            return "AggregatedResult{" +
                    "deviceId='" + deviceId + '\'' +
                    ", windowStart=" + windowStart +
                    ", windowEnd=" + windowEnd +
                    ", avgTemperature=" + avgTemperature +
                    ", avgHumidity=" + avgHumidity +
                    '}';
        }
    }
}
作者 east

上一 1 2 3 … 7 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (494)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (36)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.