gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
  • ( 页面2 )
Flink 1月 22,2025

Flink1.7文档 时间表函数

时间表函数提供了在特定时间点访问时间表版本的功能。为了访问时间表中的数据,必须传递一个时间属性,该属性确定返回的表的版本。Flink 使用表函数的 SQL 语法来提供这种访问方式。

与版本化表不同,时间表函数只能在追加-only 流上定义——它不支持变更日志输入。此外,时间表函数不能通过纯 SQL DDL 来定义。

定义时间表函数

时间表函数可以使用 Table API 在追加-only 流上定义。表会注册一个或多个键列,以及用于版本控制的时间属性。

假设我们有一个追加-only 的货币汇率表,我们希望将其注册为时间表函数。

SELECT * FROM currency_rates; 
update_timecurrencyrate
09:00:00Yen102
09:00:00Euro114
09:00:00USD1
11:15:00Euro119
11:49:00Pounds108

使用 Table API,我们可以使用 currency 作为键,并将 update_time 作为版本时间属性来注册该流。

Java 示例:

TemporalTableFunction rates = tEnv
    .from("currency_rates")
    .createTemporalTableFunction("update_time", "currency");
 
tEnv.createTemporarySystemFunction("rates", rates);      

时间表函数连接

定义时间表函数后,它可以作为标准表函数使用。追加-only 表(左输入/探测方)可以与时间表(右输入/构建方)连接,即一个随着时间变化并跟踪其变化的表,用于在特定时间点获取某个键的值。

考虑一个追加-only 表 orders,它跟踪客户的订单并使用不同的货币。

SELECT * FROM orders; 
order_timeamountcurrency
10:152Euro
10:301USD
10:3250Yen
10:523Euro
11:045USD

给定这些表,我们希望将订单转换为一种统一的货币——美元(USD)。

SQL 查询:

SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
作者 east
Flink 1月 8,2025

解决flink Caused by: java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor

运行flink代码报错:

运行报错:Exception in thread "main" org.apache.flink.table.api.TableException: Unexpected error when trying to load service provider.
	at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:826)
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:525)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:295)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:266)
	at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
	at com.xxx.a_jobs.FlinkBatchHiveJob$.main(FlinkBatchHiveJob.scala:35)
	at com.xxx.a_jobs.FlinkBatchHiveJob.main(FlinkBatchHiveJob.scala)
Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider org.apache.flink.table.planner.delegation.DefaultDialectFactory could not be instantiated
	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:817)
	... 6 more
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor
	at java.lang.Class.getDeclaredConstructors0(Native Method)
	at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
	at java.lang.Class.getConstructor0(Class.java:3075)
	at java.lang.Class.newInstance(Class.java:412)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
	... 9 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.delegation.ExtendedOperationExecutor

报错的核心问题是在加载 Flink 的服务提供程序时出现了异常。具体的错误信息是 java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor 和 java.lang.ClassNotFoundException: org.apache.flink.table.delegation.ExtendedOperationExecutor。这通常是由于类缺失或者缺少相应的依赖库导致的。

解析错误信息

  1. 错误描述:Copy CodeCaused by: java.lang.NoClassDefFoundError: org/apache/flink/table/delegation/ExtendedOperationExecutor 这是因为 org.apache.flink.table.delegation.ExtendedOperationExecutor 类未找到。Flink 在尝试加载这个类时,发现它不存在。这通常意味着某个必需的类库没有被正确包含在项目的依赖中。
  2. 原因分析:
    • Flink 依赖的类库中缺少了某些必需的 JAR 文件,或者你的项目中缺少某些必要的依赖。
    • 可能是因为缺少了 Hive 相关的依赖,或者使用的 Flink 版本与 Hive 连接的依赖不兼容。
    • 另外,org.apache.flink.table.planner.delegation.DefaultDialectFactory 类在创建时也无法实例化,这表明整个 Flink SQL 引擎的配置或者依赖加载出现了问题。

解决方案

1. 检查依赖:

确保你的项目中包含了 Flink 和 Hive 的相关依赖,特别是你使用的 Flink 版本与 Hive 相关的 JAR 文件。

对于 Flink 1.17.1 和 Hive,你应该确保以下依赖被正确添加到项目中:

  • Flink和hive 依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.17.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>1.17.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hive_2.11</artifactId>
    <version>1.17.1</version>
</dependency>

2. 检查兼容性:

确保你所使用的 Flink 版本与 Hive 连接器的版本兼容。在 Flink 1.17.x 中,某些老旧的 Hive 连接器可能会出现兼容性问题。你可以查阅 Flink 官方文档 查看兼容的版本。

作者 east
Flink 1月 6,2025

解决flink toAppendStream doesn’t support consuming update changes which is produced by node GroupAggregate

下面的flink代码:

String sqlQuery = “SELECT MAX(val) AS max_val, MIN(val) AS min_val FROM dataT GROUP BY pid”;

Table resultTable = tableEnv.sqlQuery(sqlQuery);

DataStream resultStream = tableEnv.toAppendStream(resultTable, Row.class);

运行报错:

org.apache.flink.table.api.TableException: toAppendStream doesn’t support consuming update changes which is produced by node GroupAggregate(groupBy=[pid], select=[pid, MAX(val) AS max_val, MIN(val) AS min_val])

原因分析

报错信息提示 toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate,这意味着在当前代码中使用 toAppendStream 方法去转换结果表为 DataStream 时出现了不兼容的情况。
在 Flink 中,当执行包含聚合操作(比如这里的 GROUP BY 以及 MAX、MIN 聚合函数计算)的 SQL 查询时,查询结果可能会产生更新(update)类型的变更,而 toAppendStream 方法只适用于那种仅追加(append-only)类型的结果,也就是结果表中数据只会新增而不会有更新、删除等变更的情况。这里的聚合操作导致了结果存在更新变化,所以调用 toAppendStream 就抛出了异常,它无法处理这种带有更新的数据变更情况。

正确代码修改思路及示例

要解决这个问题,可以使用 toRetractStream 方法来替代 toAppendStream 方法,toRetractStream 方法可以处理包含更新、删除等多种变更类型的数据,它返回的 DataStream 中元素是包含了一个布尔值标志(表示是新增还是撤回操作)以及实际的数据行(对应查询结果行)的二元组形式。
以下是修改后的代码示例:



        // 2. 添加 Source
        DataStream<RunData> dataSource = env.addSource(new TDengineSourceFunction(jdbcUrl, user, password, query));

        // 3. 注册临时表
        tableEnv.createTemporaryView("rundata", dataSource, "pid, val"); // 根据实际字段调整

        // 4. 执行 SQL 查询以计算最大值和最小值
        String sqlQuery = "SELECT MAX(val) AS max_val, MIN(val) AS min_val FROM dataT GROUP BY pid";
        Table resultTable = tableEnv.sqlQuery(sqlQuery);

        // 5. 将结果转换为 DataStream 并打印,这里使用 toRetractStream 替代 toAppendStream
        DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
        resultStream.print();

        // 6. 触发执行
        env.execute("Flink SQL Max and Min Calculation");
    }
}
作者 east
Flink, tdengine 1月 3,2025

Flink读取TDEngine数据实例,解决com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields错误

用flink读取TDEngine,运行报错:
com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields

这意味着 com.taosdata.jdbc.rs.RestfulDatabaseMetaData 类的对象无法被序列化,而 Flink 的作业中涉及到的某些操作需要将对象传递到不同的任务中,这就要求对象是可序列化的(即实现了 Serializable 接口)。在 Flink 中,所有要在分布式环境中传输或持久化的对象都必须是可序列化的。

  • RestfulDatabaseMetaData 是 TDengine JDBC 驱动中的一个类,它可能没有实现 Serializable 接口,因此在需要将该类对象传输到其他机器时,Flink 无法进行序列化。

解决方法是

使用 transient 关键字避免对不可序列化对象进行传递。

通过标记 connection、preparedStatement 和 resultSet 为 transient,这些对象不会被 Flink 传递到 Task Manager。

完整可执行代码如下:


import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class TDengineSourceFunction extends RichParallelSourceFunction<RunData> {

    private transient Connection connection;        // 使用 transient 避免序列化
    private transient PreparedStatement preparedStatement;
    private transient ResultSet resultSet;
    private String query;
    private volatile boolean isRunning = true;

    private String jdbcUrl;
    private String user;

    private String password;


    public TDengineSourceFunction(String jdbcUrl, String user, String password, String query) {
        this.query = query;
        this.jdbcUrl = jdbcUrl;
        this.user = user;
        this.password = password;

        // JDBC连接参数在open()方法中初始化
    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
        // 在这里初始化数据库连接
        this.connection = DriverManager.getConnection(jdbcUrl, user, password);
        // 准备SQL查询语句
        this.preparedStatement = connection.prepareStatement(query);
        this.resultSet = preparedStatement.executeQuery();
    }

    @Override
    public void run(SourceContext<RunData> sourceContext) throws Exception {
        while (isRunning && resultSet.next()) {
            // 从ResultSet中提取数据并转换为RunData对象
            RunData data = convertResultSetToData(resultSet);
            // 将数据发送到Flink的处理流中
            if (data != null) {
                sourceContext.collect(data);
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        // 关闭资源
        try {
            if (resultSet != null) resultSet.close();
            if (preparedStatement != null) preparedStatement.close();
            if (connection != null) connection.close();
        } catch (SQLException e) {
            // 处理关闭资源时的异常
            e.printStackTrace();
        }
    }

    private RunData convertResultSetToData(ResultSet resultSet) throws SQLException {
        // 提取单行数据
      
        // 将数据转换为 RunData 对象


      //  return new RunData(......);
        return null;
    }
}
作者 east
Flink 1月 3,2025

解决flink读取TDEngine的数据Could not initialize class com.taosdata.jdbc.TSDBJNIConnector

需要用flink读取TDEngine的数据,用jdbc方式连接,运行报错:Could not initialize class com.taosdata.jdbc.TSDBJNIConnector

JDBC-JNI的方式需要 TDengine-client(使用JDBC-JNI时必须,使用JDBC-RESTful时非必须),所以采用JDBC-RESTful 的方式,原因是一开始想用
JDBC-JNI 的方式,想改用 JDBC-RESTful 代码没改干净。



通过指定URL获取连接,如下所示:

Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata";Connection conn = DriverManager.getConnection(jdbcUrl);

以上示例,使用 JDBC-RESTful 的 driver,建立了到 hostname 为 taosdemo.com,端口为 6041,数据库名为 test 的连接。这个 URL 中指定用户名(user)为 root,密码(password)为 taosdata。

使用 JDBC-RESTful 接口,不需要依赖本地函数库。与 JDBC-JNI 相比,仅需要:

  1. driverClass 指定为“com.taosdata.jdbc.rs.RestfulDriver”;
  2. jdbcUrl 以“jdbc:TAOS-RS://”开头;
  3. 使用 6041 作为连接端口。

按上面的方式修改,果然没有再报上面的错误。

作者 east
Flink 1月 3,2025

flink运行报错:java.lang.IllegalStateException: No ExecutorFactory found to execute the application

在本地运行flink代码,报错“
java.lang.IllegalStateException: No ExecutorFactory found to execute the application ”

通常是由于缺少必要的 Flink 依赖项导致的。具体来说,Flink 需要特定的执行器工厂来运行应用程序,而这些依赖项可能未正确包含在您的项目中。

原因分析

  1. 缺少 Flink 运行时依赖:
    • 您的代码片段看起来是基于 Flink 的流处理 API 编写的。如果项目缺少 Flink 运行时的相关依赖(例如 flink-java, flink-streaming-java, 和 flink-clients),Flink 将无法找到执行器工厂来启动作业。
  2. 依赖版本不匹配:
    • 如果您使用的 Flink 版本与代码不兼容,也可能导致类似的问题。确保所有 Flink 相关依赖的版本一致。
  3. 缺少必要的插件或扩展:
    • 某些情况下,特定的 Flink 插件或扩展可能缺失,导致执行器工厂无法加载。

解决方案

确保您的项目中包含了所有必要的 Flink 依赖项。以下是使用 Maven 的示例 pom.xml 配置,确保包含了 Flink 的核心和流处理依赖:

<dependencies>
    <!-- Flink 核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.16.2</version> <!-- 请根据需要替换为合适的版本 -->
    </dependency>

    <!-- Flink 流处理依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.16.2</version> <!-- 版本需与 flink-java 一致 -->
    </dependency>

    <!-- Flink 客户端依赖(如果需要远程提交作业) -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.16.2</version>
    </dependency>



    <!-- 如果使用自定义 Source 和 Sink,确保它们所在的依赖已添加 -->
</dependencies>
作者 east
Flink 9月 23,2024

Flink Lookup Join在性能优化方面都提供了哪些机制?

Apache Flink的Lookup Join是一种特殊类型的连接操作,它允许流表与外部数据库中的维表进行实时关联。在性能优化方面,Flink提供了以下几种机制:

  1. 同步和异步查询机制:Flink支持通过查询提示开启同步和异步查询模式。在异步模式下,可以并发地向维表发送多个数据项的查询请求,这样可以减少等待单个查询响应的时间,从而提高整体的查询性能。 
  2. Ordered和Unordered查询机制:在异步查询模式中,Flink还提供了Ordered和Unordered两种输出模式。Ordered模式需要等待所有数据查询完成并对数据进行排序后才能发送给下游,而Unordered模式不对输出顺序做要求,可以在查询到结果后立即发送,这样可以进一步提升性能。 
  3. Cache机制:Flink支持使用Cache机制来优化Lookup Join的性能。通过将维表数据缓存到内存中,可以减少对数据库的直接查询次数,加快数据检索速度。Flink提供了Full Caching(将所有数据缓存)和Partial Caching(使用LRU算法缓存最近使用的数据)两种缓存策略。 
  4. 状态管理优化:Flink的状态后端(State Backends)和Checkpoint机制也对Lookup Join的性能有影响。选择合适的状态后端和调整Checkpoint间隔可以优化状态的存储和恢复,从而间接提升Lookup Join的性能。
  5. 查询提示(Query Hints):Flink允许用户通过查询提示来指导优化器如何执行Lookup Join,这可以帮助优化器生成更高效的执行计划。 
作者 east
Flink 9月 23,2024

Flink Lookup Join 相比传统Join有哪些优势?

Flink Lookup Join 的优势

Flink Lookup Join 相比传统的 Join 操作,在实时数据处理方面具有明显的优势:

  1. 低延迟查询:Lookup Join 能够根据键值动态查找外部维表中的数据,这对于需要快速响应的实时数据流处理场景非常适用。 
  2. 异步查询:Flink 支持异步查找机制,这有助于提高查询性能和吞吐量,尤其是在处理大规模数据时。 
  3. 高效的维度查询:通过将维表数据加载到内存中并进行索引,Lookup Join 提供了快速查找和关联维度数据的能力,减少了对外部存储系统的访问次数。 
  4. 实时维度关联:Lookup Join 允许实时流数据与维表进行关联,这对于获取最新的维度信息至关重要,如在电商领域中将订单流与商品维表关联。 
  5. 动态数据关联:适用于维表数据动态变化的场景,确保流数据处理过程中能够及时获取最新的维度信息。 
  6. 事件时间关联:支持基于事件时间的关联操作,这在处理时间敏感的数据时尤为重要,如用户行为分析等。 
作者 east
doris, Flink 9月 18,2024

流批一体化,Flink+Doris取代Spark+Hive来开发离线数仓

Flink + Doris 可以作为替代方案,用于开发离线数仓,替代 Spark + Hive 组合。这个架构能够实现更高的性能和实时性,同时简化了架构的复杂性。让我们详细分析一下 Flink + Doris 取代 Spark + Hive 的优点及适用场景:

1. 实时性与流处理能力

  • Flink 的优势:Flink 是一个分布式流处理框架,支持事件驱动和批处理模式,特别擅长处理实时数据流和复杂事件处理(CEP)。Flink 能够以低延迟处理海量数据,确保数据能够迅速进入下游系统。
  • Doris 的实时写入和分析:Doris 支持实时数据加载,能够高效处理来自 Flink 的流式数据,提供秒级查询的响应能力,适合实时数据分析场景。

Spark + Hive 的组合主要用于批处理,Hive 本身不擅长处理实时数据,Spark 的流处理能力也不如 Flink 专注且强大。因此,Flink + Doris 更适合有实时数据需求的场景,尤其是在需要低延迟的业务数据分析和展示时。

2. 查询性能和并发处理

  • Doris 的高效查询:Doris 作为一个专为分析设计的 MPP 数据库,擅长处理大规模数据查询,支持高并发的 SQL 查询并提供低延迟响应,适合在线分析型应用(OLAP)。
  • 列式存储和向量化执行:Doris 使用列式存储和向量化执行引擎,在处理复杂查询时效率更高,能够大幅缩短查询时间。相比之下,Hive 在执行复杂查询时较慢,尤其是基于 HDFS 的查询,依赖磁盘 I/O,性能较差。

Spark + Hive 组合通常需要大量的计算资源来执行批量查询任务,且在查询复杂度上性能不如 Doris。

3. 简化架构与运维成本

  • Flink 和 Doris 的简洁架构:Flink 与 Doris 都具备较高的集成度和易用性,且不依赖于像 Hadoop 这样的复杂生态系统。Flink + Doris 的组合能够简化系统架构,减少数据流动中的延迟,且维护成本相对较低。
  • 轻量级部署:Doris 作为轻量级 OLAP 数据库,部署和运维都较为简单,不需要 Hadoop 的支持。相比之下,Hive 依赖于 Hadoop 生态系统,需要更多的组件(如 HDFS、YARN 等)来保证其工作,这增加了架构的复杂度和运维难度。

Spark + Hive 架构复杂,需要更多的组件支持,部署和维护较为繁琐,Flink + Doris 在这一方面更具优势。

4. 批处理和流处理的统一

  • Flink 批流一体化:Flink 提供了统一的编程模型,能够同时处理批处理和流处理任务,使得离线数仓架构能够更灵活地应对各种数据处理需求。
  • 批处理能力:尽管 Flink 主要以流处理见长,但它在批处理方面的表现也相当出色。通过批流一体化架构,开发人员可以更高效地处理历史数据和实时数据,极大简化了数据处理的开发和运维。

Spark + Hive 则主要关注批处理任务,对于流数据的处理能力相对弱一些。Spark 的流处理框架(Spark Streaming)比 Flink 在复杂流处理上的能力有限。

5. 灵活的数据集成

  • 数据集成与传输:Flink 可以轻松集成各种数据源,包括 Kafka、文件系统、数据库等。它可以将流式和批量数据统一处理后,通过 Doris 实现实时分析和查询。
  • Doris 的多种导入方式:Doris 支持多种数据导入方式,能够高效地处理 Flink 输出的数据流(例如通过 HTTP、Broker、Stream Load 等方式),这使得两者之间的集成非常顺畅。

Spark + Hive 在数据集成的实时性上不如 Flink + Doris。Spark 处理数据后通常还需要依赖 Hive 进行存储和管理,数据查询和更新的延迟较高。

6. 高可扩展性

  • 分布式处理:Flink 作为流处理框架具备出色的可扩展性,能够处理大规模的数据流。Doris 也是一个分布式架构,能够扩展到数百个节点,适应大规模数仓需求。
  • 集群管理:Flink 和 Doris 都支持分布式集群管理,能够根据业务需求动态扩展计算和存储能力。

Spark + Hive 也具有可扩展性,但其扩展性受 Hadoop 生态的限制,复杂性更高。

适用场景

Flink + Doris 适用于需要实时数据处理、高性能查询以及复杂多维分析的场景,如:

  • 实时数据流分析(用户行为分析、监控告警系统)
  • 实时数据仓库(T+0 数据仓库)
  • 多维度的在线查询(报表系统、BI 工具)
  • 需要兼顾批处理和流处理的场景

Spark + Hive 更适用于需要处理大规模离线批量数据且对实时性要求不高的传统数仓场景。

作者 east
Flink 9月 14,2024

Flink中的窗口与传统数据库中的窗函数有何不同?

Flink中的窗口与传统数据库中的窗函数主要有以下几个方面的不同:

  1. 实时处理与批处理的差异:Flink是专为实时数据流处理设计的,其窗口机制能够处理无限数据流,并支持事件时间和处理时间的概念。相比之下,传统数据库中的窗函数通常用于批处理,处理的是有界数据集。 
  2. 窗口类型的多样性:Flink提供了多种窗口类型,包括滚动窗口、滑动窗口、会话窗口和全局窗口,这些窗口可以根据时间或计数来定义。而传统数据库中的窗函数通常较为基础,主要是基于时间的窗口聚合。 
  3. 窗口函数的实现:Flink中的窗口函数不仅支持全量聚合,还支持增量聚合,后者在性能上更为优越,特别是在处理大规模数据流时。此外,Flink的窗口函数可以与触发器结合使用,以控制窗口的计算时机。 
  4. 时间语义的明确性:Flink的窗口函数在处理事件时间时提供了明确的时间语义,这对于确保实时数据分析的准确性至关重要。而传统数据库中的窗函数通常不涉及事件时间的概念。 
  5. 窗口的动态创建:Flink中的窗口是动态创建的,只有当窗口内的数据到达时才会创建相应的窗口,这有助于优化内存使用和计算资源。 
作者 east
Flink 6月 19,2024

Apache Flink处理IoT复杂数据流程案例

使用Apache Flink处理IoT复杂数据是一项涉及多个步骤和组件的任务,包括数据接入、数据清洗、实时处理、状态管理、窗口计算、以及结果输出等。以下是一个全面且详细的Flink流处理框架,结合理论和实际应用,以处理IoT数据为主线。

1. 引入依赖和设置环境

首先,需要在你的项目中引入Flink所需的依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>
xmlCopy Code<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

2. 创建Flink执行环境

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}
javaCopy Codeimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(4);

        // 其他环境配置...
    }
}

3. 数据接入

通常,IoT数据会通过Kafka或其他消息队列接入。假设使用Kafka作为数据源:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        // 进一步处理...
    }
}




4. 数据清洗和解析

实际的IoT数据通常是JSON格式的字符串,需要进行解析和清洗:

import org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}
javaCopy Codeimport org.apache.flink.api.common.functions.MapFunction;
import com.fasterxml.jackson.databind.ObjectMapper;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer setup...

        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        // 进一步处理...
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;
        // 其他字段和构造方法...
    }
}

5. 定义时间窗口和处理函数

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer and parsing setup...

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        // 进一步处理...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }
    }
}

6. 输出结果到外部系统

处理后的数据通常需要写到数据库、文件系统或者其他外部系统。以写入到Kafka为例:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}
javaCopy Codeimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class IoTDataProcessing {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Kafka consumer, parsing, and processing setup...

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }
}

7. 完整代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Properties;

public class IoTDataProcessing {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-group");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("iot-topic", new SimpleStringSchema(), properties);
        DataStream<String> input = env.addSource(kafkaConsumer);

        DataStream<IoTEvent> parsedStream = input.map(new MapFunction<String, IoTEvent>() {
            private ObjectMapper mapper = new ObjectMapper();

            @Override
            public IoTEvent map(String value) throws Exception {
                return mapper.readValue(value, IoTEvent.class);
            }
        });

        DataStream<AggregatedResult> resultStream = parsedStream
            .keyBy(event -> event.deviceId)
            .timeWindow(Time.minutes(1))
            .process(new AggregateTemperatureHumidity());

        resultStream.map(result -> result.toString())
            .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

        env.execute("IoT Data Processing with Flink");
    }

    public static class IoTEvent {
        public String deviceId;
        public long timestamp;
        public double temperature;
        public double humidity;

        // constructor, getters, setters...
    }

    public static class AggregateTemperatureHumidity extends ProcessWindowFunction<IoTEvent, AggregatedResult, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<IoTEvent> elements, Collector<AggregatedResult> out) {
            double sumTemp = 0;
            double sumHumidity = 0;
            int count = 0;

            for (IoTEvent event : elements) {
                sumTemp += event.temperature;
                sumHumidity += event.humidity;
                count++;
            }

            double avgTemp = sumTemp / count;
            double avgHumidity = sumHumidity / count;

            out.collect(new AggregatedResult(key, context.window().getStart(), context.window().getEnd(), avgTemp, avgHumidity));
        }
    }

    public static class AggregatedResult {
        public String deviceId;
        public long windowStart;
        public long windowEnd;
        public double avgTemperature;
        public double avgHumidity;

        public AggregatedResult(String deviceId, long windowStart, long windowEnd, double avgTemperature, double avgHumidity) {
            this.deviceId = deviceId;
            this.windowStart = windowStart;
            this.windowEnd = windowEnd;
            this.avgTemperature = avgTemperature;
            this.avgHumidity = avgHumidity;
        }

        @Override
        public String toString() {
            return "AggregatedResult{" +
                    "deviceId='" + deviceId + '\'' +
                    ", windowStart=" + windowStart +
                    ", windowEnd=" + windowEnd +
                    ", avgTemperature=" + avgTemperature +
                    ", avgHumidity=" + avgHumidity +
                    '}';
        }
    }
}
作者 east
Flink 6月 14,2024

Flink实时开发添加水印的案例分析

在Flink中,处理时间序列数据时,通常需要考虑事件时间和水印(watermarks)的处理。以下是修改前后的代码对比分析:

修改前的代码:

val systemDS = unitDS.map(dp => {
  dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))
  dp
}).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. unitDS 经过一个 map 操作,将每个元素的 deviceCode 转换为系统设备码。
  2. 使用 keyBy(_.getDeviceCode) 对转换后的设备码进行分组。
  3. 定义了一个基于事件时间的滚动窗口,窗口大小为60秒。
  4. 使用 process 操作应用自定义的窗口函数 HPageSystemWinF 来处理每个窗口中的数据。

注意:修改前的代码没有显示地处理水印(watermarks),这可能导致在处理乱序数据或延迟数据时出现问题。

修改后的代码:

val systemDS = unitDS.map(dp => {
  dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))
  dp
}).keyBy(_.getDeviceCode)
.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .<boundedOutOfOrdernessDaysPower>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 假设这里应该是.forBoundedOutOfOrderness而不是.forBoundedOutOfOrdernessDaysPower
    .withIdleness(Duration.ofSeconds(5))
    .withTimestampAssigner(new SerializableTimestampAssigner[DaysPower] {
      override def extractTimestamp(element: DaysPower, recordTimestamp: Long): Long = {
        Math.max(element.getEventTime, recordTimestamp)
      }
    })
).keyBy(_.getDeviceCode)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new MySystemWinF)
  1. 与修改前相同的部分:map, keyBy, 和 window 操作。
  2. 添加了 assignTimestampsAndWatermarks 方法来处理事件时间和水印:
    • 使用 WatermarkStrategy.forBoundedOutOfOrderness 允许一定程度的乱序数据(这里是5秒)。
    • .withIdleness(Duration.ofSeconds(5)) 设置了空闲超时时间为5秒,用于处理不活跃的键。
    • 使用 withTimestampAssigner 自定义了时间戳分配器,确保使用的事件时间是元素中的 eventTime 和记录的 recordTimestamp 中的较大值。

不同点和适用场景:

  • 事件时间和水印处理:修改后的代码显式地处理了事件时间和水印,这对于处理乱序数据、延迟数据以及确保正确的时间窗口计算是非常重要的。如果您的数据流中存在乱序或延迟数据,或者您希望更严格地保证处理时间窗口的正确性,那么应该使用修改后的代码。
  • 空闲超时:通过设置空闲超时,可以处理那些长时间不活跃的键,避免因为某些键长时间没有新数据而导致整个程序挂起。
  • 延迟数据处理:如果数据有可能晚到,但仍然需要被纳入正确的窗口进行计算,水印可以帮助界定数据的“迟到”界限。
    精确的时间窗口分析:对于需要基于事件实际发生时间而非数据处理时间进行分析的场景,如实时监控、金融交易分析等,事件时间模型是必须的。

作者 east

上一 1 2 3 … 7 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.