gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Flink

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Flink"
  • ( 页面4 )
Flink 8月 19,2023

Apache Flink 部署模式概述

Apache Flink 集群与在其上运行的 Flink 作业之间的关系可能相当灵活。Apache Flink 支持 Flink 作业的不同部署模式,使开发人员可以根据自己的需求和作业特定要求专注于使用适当的模式。

应用程序模式

在 Flink 1.11 中,社区在 Apache Flink 中引入了一种新的部署模式,即“应用程序模式”。Application Mode 是一种优化,旨在让 Flink 作业提交过程变得更加轻量级,特别是针对需要频繁提交多个 Flink 应用程序的情况。这种部署模式的主要目标是减少与本地下载应用程序依赖项相关的步骤和必要的带宽,执行 main() 方法来提取 Flink 运行时可以理解的应用程序表示(即 JobGraph)并传送依赖项和 JobGraph(s) 到集群。Application Mode 提供与 Per-Job 模式相同级别的隔离保证,建议用于生产环境。

Application Mode 为每个提交的应用程序创建一个集群,但这一次,应用程序的 main() 方法在 JobManager 上执行。虽然这种部署看起来与 Per-Job 模式(稍后描述)相对相似,但 Application Mode 允许更加灵活和轻量级的作业执行顺序,因为这不受部署模式的影响,而是受到用于启动应用程序的调用的影响。正在部署的作业(或作业包)。

有关 Apache Flink 中 Application Mode 的详细概述,可以参考这里的这篇博文。

会话模式

会话模式可能是 Flink 应用程序最简单的部署模式。会话模式下的集群是长期存在的,这意味着会话模式下的 Flink 作业将假设正在运行的集群已经存在,并将使用该集群的资源来执行任何提交的应用程序。在会话模式下,同一个集群执行多个作业,这意味着资源之间不存在隔离,因为集群中的所有任务管理器都是或可以共享的。使用会话模式,开发人员无需担心启动任务管理器的额外开销。为提交的 Flink 应用程序创建新集群,因为作业使用现有集群资源。

然而,在会话模式下,由于所有 Flink 应用程序共享同一集群的资源,因此行为不当的作业可能会导致整个集群瘫痪,并可能影响不相关的 Apache Flink 部署。出于同样的原因,在确保部署之间可靠的安全凭证隔离时,会话模式可能会带来额外的挑战。因此,我们建议会话模式最适合具有(相对)可预测行为的相对简单、较短的作业(例如执行简单的 FlinkSQL 查询)。

Per-Job 模式

最后一种模式是 Per-Job 模式。顾名思义,通过 Per-Job 模式,每个 Flink 应用程序都会获得一个隔离的集群,并在集群中保留资源。当 Flink 应用程序以 Per-Job 模式提交时,它将使用底层资源管理框架为每个提交的作业启动一个新集群。当 Flink 部署完成后,集群将变得不可用,并且所有资源或文件都将从集群中删除。

在 Per-Job 模式下,JobManager 监督单个作业的执行,而任何任务管理器进程都是专门专用的执行单个 .jar 文件。由于所有这些原因,Per-Job 模式提供了比会话模式(如上所述)更好的资源隔离保证。然而,与 Application 模式相比,Per-Job 模式在客户端非常繁重,可能会导致巨大的资源成本。因此,目前,Per-Job 模式唯一推荐的用例是当集群无法访问构建作业的依赖项而只有“客户端”可以时。

借助 Apache Flink 中提供的不同部署模式,开发人员可以灵活地以灵活的方式使用其底层资源管理框架(例如 YARN 或 Kubernetes),根据他们的需求和要求进行定制。有关 Apache Flink 可用部署模式的更多信息,您可以参考 Apache Flink® 官方文档。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 19,2023

使用 Flink SQL 进行实时性能监控:AdTech 用例

背景

广告技术(Ad Tech)是一种统称,描述了用于管理和分析程序化广告活动的系统和工具。数字广告的目标是尽可能覆盖尽可能多的相关受众。因此,广告技术本质上与处理大量数据相关。

在这篇博文中,我们将研究如何关联两个事件流——广告投放(所谓的展示次数)和点击次数,并计算一个重要的广告技术指标——点击率(CTR)。我们的计算将使用 Apache Flink 的水平可扩展执行引擎根据运行中的数据进行。我们将专注于获得结果,而不用 Java 或 Scala 编写任何代码,而是完全依赖 SQL。

典型场景

在典型场景中,广告的投放是通过称为实时出价的机制执行的。从本质上讲,实时出价是一种拍卖,众多参与者竞相向特定最终用户展示横幅或视频(统称为创意)。在此过程中,需求方平台 (DSP) 获得向用户展示广告的服务,通过用户的设备 ID 进行识别并回复他们的投注。

 

虽然投放广告的过程在很大程度上是自动化的,但广告活动经理和业务分析师通常仍然采用很大程度的手动控制。通常,活动的定义和受众选择器(例如人口统计数据、原籍国以及活动的绩效标准)都是手动定义的。密切监视活动的表现并调整某些参数可能是必要的,特别是在发布后的早期阶段 – 验证假设的时间。

为什么选择流处理?

传统上,通过以下方式解决洞察大量数据的任务:利用批处理。这种方法与数字广告业务的高度动态性质相矛盾。实时获取洞察至关重要 – 等待一个小时或更长时间来完成定期批处理作业来完成原始数据的处理,同时由于活动的初始参数错误而耗尽预算,这是非常不可取的。此外,对于任何依赖于关联两个后续事件的指标,批处理不会为位于批处理“截止”相反两侧的事件提供正确的结果,因此会由两个不同的批处理作业进行处理。

为什么选择 Flink SQL?

监视活动的任务通常由数据分析师或业务分析师执行。由于业务的动态性质,可以预期与新数据源的潜在临时集成、向现有数据流添加新维度以及其他类似的调整。在这种情况下,希望消除数据分析师在执行日常任务时对数据工程师的依赖。为了实现这一目标,需要一个采用门槛较低的灵活工具集。 SQL 是数据分析的通用语言,其知识非常广泛。在 Flink 中运行 SQL 语句可以让您利用 Flink 水平可扩展流处理引擎的强大功能,而无需成为 Java 或 Scala 开发人员。它可以轻松地利用大量原始飞行数据,并以自助服务方式促进交互式自定义仪表板的创建。

实践

在我们的示例中,我们将使用两个数据流。首先,通过定义其架构和表选项,将这些流注册为表。第一个流是印象流。这些事件中的每一个都表明实时竞价拍卖的胜利以及成功向用户展示创意。它包含广告素材的维度、国家/地区代码和广告活动 ID 等详细信息。

创建临时表“印象数”

CREATE TEMPORARY TABLE impressions (
  bid_id VARCHAR NOT NULL,
  `timestamp` VARCHAR,
  serve_time AS TO_TIMESTAMP(`timestamp`, 'EEE MMM dd HH:mm:ss zzz yyyy'),
  campaign_id INT,
  creative_dimensions VARCHAR,
  country_code VARCHAR(2),
  serve_time AS serve_time - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kafka',
  'format' = 'json',
  'properties.bootstrap.servers' = 'kafka.svc:9092',
  'properties.group.id' = 'impressions

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 19,2023

英特尔在 Flink Forward 上展示分布式模型推理平台

Flink Forward 全球虚拟会议 2020 将于下个月拉开帷幕,届时 Flink 社区将探讨流处理和 Apache Flink 的未来。此次会议免费参加,时间为 2020 年 10 月 21 日至 22 日。另外,组织者还将于 10 月 19 日至 20 日举办六场由讲师指导的实践培训课程,内容涵盖了 Apache Flink 的多个方面,具体包括:

  • Flink 开发(2 天)
  • SQL 开发(2 天)
  • 运行时和操作(1 天)
  • 有状态函数(1 天)
  • 调优和故障排除(简介和高级,各 1 天)

我们有幸能够展示英特尔团队如何在 Intel Analytics Zoo 内的 Cluster Serving 服务中利用 Apache Flink。请仔细阅读我们的 Flink Forward 会议,并了解集群服务:在 Analytics Zoo 中使用 Apache Flink 进行分布式模型推理,时间为 2020 年 10 月 21 日。如果您尚未注册,请务必在 10 月 1 日之前完成注册,确保您的席位,并准备好聆听来自不同行业、公司规模和地点的公司代表就最新技术发展和 Flink 用例进行的一些有趣演讲!

集群服务:在 Analytics Zoo 中使用 Apache Flink 进行分布式模型推理

随着深度学习项目从实验发展到生产阶段,部署深度学习模型以进行大规模实时分布式推理的需求日益增长。尽管有许多可用工具可用于不同任务,如模型优化、模型服务、集群调度、集群管理等,但深度学习工程师和科学家仍然面临部署和管理分布式推理工作流程的挑战性过程,这些工作流程可以扩展到以直观和透明的方式实现大型集群。

在我们的会议中,我们将讨论以下内容:我们将介绍成功交付模型服务的两个主要领域,即大数据与机器学习的结合。一旦模型训练完成,为模型提供直观的服务成为构建机器学习管道的关键任务。在模型服务场景中,我们关注两个主要基准,即延迟和吞吐量。为满足机器学习管道中对极低延迟模型服务的需求,我们开发了 Cluster Serving:Intel Analytics Zoo 内的自动分布式服务解决方案。Cluster Serving 利用 Flink 的流处理运行时及其低延迟的连续处理引擎。此外,为满足高吞吐量需求,Cluster Serving 在 Flink 的统一数据处理引擎中实现了批处理。此外,集群服务还支持多种深度学习模型,包括 TensorFlow、PyTorch、Caffe、BigDL 和 OpenVINO。我们的模型服务解决方案提供了简单的发布-订阅(pub/sub)API,允许用户使用简单的 Python 或 HTTP API 轻松将推理请求发送到输入队列。

在我们的会议中,我们将介绍集群服务及其架构设计给 Flink Forward 的观众,并讨论将深度学习模型部署和管理到分布式、大数据和统一数据处理引擎中的底层设计模式和权衡。我们还将展示一些已经采用 Cluster Serving 来开发和部署分布式推理工作流程的用户实际用例、经验和示例。

您将在我们的会议中学到:

  • Cluster Serving 在 Apache Flink 的实现设计及其核心特性
  • 并行化昂贵操作
  • 使用消息队列等数据管道作为并行源
  • 最大限度地减少机器学习管道中的数据传输

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 16,2023

Flink SQL:重复数据删除

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了额外的工具,用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器,以及操作生成的长时间运行的查询。我们已经看到 Flink SQL 有很多用例,我们很高兴看到什么你将用它来建造。在这篇博文中,我们将解释什么是重复数据删除,并展示如何使用 Flink SQL 来实现这一目标。

什么是流处理中的重复数据删除?重复数据删除是一个从数据集中删除重复数据的过程。这样做通常是为了提高数据的质量。在流处理中,重复数据删除是一个非常重要的过程,因为它可以帮助提高系统的性能。重复数据删除的工作原理是识别并删除数据流中的重复记录。这通常是通过将流中的数据与参考数据集进行比较来完成的。当发现重复记录时,会将其从流中删除。 重复数据删除的好处 对数据进行重复数据删除有很多好处,

包括:

提高性能 – 通过删除重复数据,可以减少需要处理的数据量,从而可以提高性能性能降低存储要求 – 重复数据占用不必要的空间,因此删除它可以释放宝贵的存储空间更高的准确性 – 重复数据可能导致结果不准确,因此删除它可以提高数据分析的准确性提高效率 – 重复数据删除可以使数据处理更加高效通过减少需要处理的数据量来提高效率如何使用 Flink SQL 删除重复数据重复事件可能以多种方式最终出现在数据源中,从人为错误到应用程序错误。无论来源如何,不干净的数据都会对结果的质量(和正确性)产生真正的影响。在某些情况下,数据生产者会为流数据更改生成具有相同 ID 的记录。这些记录可能包括插入、更新和删除记录,并且在与其他流聚合或联接之前,它们可能需要作为管道中业务逻辑的一部分进行重复数据删除。在这种情况下,重复数据删除的目的是确保只处理唯一的记录,并避免重复数据可能引起的任何问题。假设您的订单系统偶尔会生成具有相同order_id的重复事件,但您只想保留用于下游处理的最新事件。第一步,您可以结合使用 COUNT 函数和 HAVING 子句来检查是否以及哪些订单具有多个事件,然后使用 ROW_NUMBER() 过滤掉这些事件。实际上,重复数据删除是 Top-N 聚合的一种特殊情况,其中 N 为 1 (rownum = 1),排序列是事件的处理时间或事件时间。在下面的示例查询中,源表顺序由内置的indatagen连接器,它在内存中不断生成行。

CREATE TABLE orders (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);

--Check for duplicates in the `orders` table
SELECT id AS order_id,
       COUNT(*) AS order_cnt
FROM orders o
GROUP BY id
HAVING COUNT(*) > 1;

--Use deduplication to keep only the latest record for each `order_id`
SELECT
  order_id,
  order_time
FROM (
  SELECT id AS order_id,
         order_time,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
  FROM orders
     )
WHERE rownum = 1;

摘要在本文中,您了解了数据的重复数据删除。您还了解了如何使用 Flink SQL 为此类问题编写查询。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 15,2023

Flink 的测试工具揭秘

使用 Flink 的测试工具测试用户定义函数 (UDF)

使用 Apache Flink 时,开发人员在测试利用状态和计时器的用户定义函数 (UDF) 时经常面临挑战。在本文中,我们将回答一个问题“如何使用 Flink 的测试工具来测试用户定义函数 (UDF)”。

使用 Flink 的测试工具测试用户定义函数 (UDF)

测试使用 Flink 状态和计时器的用户定义函数 (UDF) 可能会很复杂,尤其是在使用诸如 KeyedProcessFunction 之类的函数时。 Flink 包含一组专门为简化此任务而设计的测试工具。这些测试工具是在 Flink 1.15 中引入的,并且被认为是实验性的。

测试工具的配置

要使用 Flink 的测试工具,您需要添加一些依赖项到您的项目。要测试 DataStream 作业,您可以在 Maven 项目的 pom.xml 的依赖项块中添加以下内容:

XML
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
</dependency>
Use code with caution. Learn more

要测试 Table/SQL 作业,除了前面提到的 flink-test-utils 之外,您还可以在 Maven 项目的 pom.xml 的依赖项块中添加以下内容:

XML
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-test-utils</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
</dependency>
Use code with caution. Learn more

通过运算符测试用户函数

通过运算符测试用户函数可能是一种有用的方法,因为它允许您测试函数的功能。需要注意的是,Flink 只为算子提供测试工具,因此您需要手动实例化适当的算子才能使用这些测试工具。这意味着您需要创建运算符的实例并使用必要的输入和参数对其进行配置。完成此操作后,您可以使用测试工具来验证用户功能的正确性。值得注意的是,这种方法可能比使用其他测试方法更复杂,因为它需要您手动设置操作员并使用适当的输入对其进行配置。

Java
@Test
public void testMyProcessFunction() throws Exception {
    KeyedProcessOperator<String, String, String> operator =
        new KeyedProcessOperator<>(new MyKeyedProcessFunction());

    // 设置测试工具
    // 推送数据
    // 验证结果
}
 

测试计时器行为

在 Flink 中测试用户函数时,必须测试其行为的各个方面。测试的一件常见事情是计时器的创建和触发。您可以使用 TestHarness 通过操作员发送测试数据并验证是否创建了计时器。然后,您可以推进水印并验证计时器是否已触发。除了测试计时器之外,您可能还想测试处理元素是否会创建状态并验证此处理的结果。假设我们希望在 20 毫秒内为我们的测试目标触发计时器。下面的示例代码演示了如何使用测试工具对其进行测试。

Java
@Test
public void testTimelyOperator() throws Exception {
    // 设置初始条件
    testHarness.processWatermark(0L);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // 发送一些数据
    testHarness.processElement(3L, 100L);

    // 验证定时器
    assertThat(testHarness.numEventTimeTimers(), is(1));

    // 将时间提前到 20 毫秒以触发计时器。
    testHarness.processWatermark(20);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // 验证结果
    assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
    assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0));

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink, mysql 8月 15,2023

操作指南:使用 Flink CDC 同步 MySQL 分库分表

线事务处理(OLTP)系统中,为了解决单表数据量大的问题,通常采用分库分表的方法对单张大表进行拆分,以提高系统的吞吐量。但为了方便数据分析,在同步到数据仓库或数据湖时,一般需要将分库分表的数据合并成一张大表。本教程将向您展示如何使用 Flink CDC 为上述场景构建实时数据湖。本文中的示例将全部基于 Docker 并使用 Flink SQL。无需一行 Java/Scala 代码或安装 IDE。本指南的全部内容包含 docker-compose 文件。整个过程将通过从 MySQL 同步数据到 Iceberg 来展示,如下图所示。

步骤1:创建 docker-compose.yml 文件 创建一个 Docker Compose 文件(docker-compose.yml),内容如下:

Version: ‘2.1’

Services:

sql-client: user: flink

image: yuxialuo/flink-sql-client:1.13.2.v1

depends_on:

– jobmanager

– mysql

environment: FLINK_JOBMANAGER_HOST: jobmanager MYSQL_HOST: mysql volumes: – shared tmpfs:/tmp/iceberg jobmanager: user: flink image: flink:1.13.2-scala_2.11 ports: – “8081:8081” command: jobmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: – shared tmpfs:/tmp/iceberg taskmanager: user: flink image: flink:1.13.2-scala_2.11 depends_on: – jobmanager command: taskmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 volumes: – shared tmpfs:/tmp/iceberg mysql: image: debezium/example-mysql:1.1 ports: – “3306:3306” environment: – MYSQL_ROOT_PASSWORD=123456 – MYSQL_USER=mysql用户 – MYSQL_PASSWORD=mysqlpw volumes: – shared tmpfs driver-options: type: “tmpfs” device: “tmpfs”

这个 docker-compose 文件中的容器包括:

  • SQL-Client:Flink SQL Client,用于提交 SQL 查询和查看 SQL 执行结果
  • Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用于执行 Flink SQL
  • MySQL:作为数据源分库分表,存储用户表

注意:如果您想在自己的 Flink 环境中运行本指南,您需要下载下面列出的包并将其放在 Flink 目录的 lib 目录中,即 FLINK_HOME/lib/。

  • flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar
  • flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
  • iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

步骤 2:准备 MySQL 数据库中的数据 进入 MySQL 容器,执行以下命令:

shell复制代码
docker-compose exec mysql mysql -uroot -p123456

然后在 MySQL 中创建数据、表,并填充数据:

sql复制代码
CREATE DATABASE db_1;
USE db_1;
CREATE TABLE user_1 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234","user_110@foo.com");

CREATE TABLE user_2 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (120,"user_120","上海","123567891234","user_120@foo.com");

CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234", NULL);

CREATE TABLE user_2 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","上海","123567891234","user_220@foo.com");

步骤3:使用 Flink DDL 和 Flink SQL CLI 创建表 进入 Flink SQL CLI 容器,执行以下命令:

shell复制代码
docker-compose exec sql-client ./sql-client

在 Flink SQL CLI 中,执行以下命令:

sql复制代码
-- Flink SQL
SET execution.checkpointing.interval = 3s;

-- 创建源表 user_source 来捕获 MySQL 中所有数据库和表的数据并使用正则表达式来匹配这些数据库和表的配置项中使用的表。
-- 而且表还定义了一个元数据列来区分数据来自哪个数据库和表。
CREATE TABLE user_source(
  database_name STRING METADATA VIRTUAL,
  table_name STRING METADATA VIRTUAL,
  `id` DECIMAL(20, 0) NOT NULL,
  name STRING,
  address STRING,
  phone STRING,
  email STRING,
  PRIMARY KEY (`id`)
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'db_[0-9]+',
  'table-name' = 'user_[0-9]+'
);

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 15,2023

如何测试您的 Flink SQL 应用程序

测试 Apache Flink SQL 代码

测试 Apache Flink SQL 代码是确保应用程序顺利运行并提供预期结果的关键步骤。 Flink SQL 应用程序用于广泛的数据处理任务,从复杂的分析到简单的 SQL 作业。全面的测试流程可以帮助您在开发过程的早期发现潜在问题,并确保您的应用程序按预期工作。

这篇文章将介绍 Flink SQL 应用程序的几种测试可能性。

手动测试和自动测试

使用 SQL 客户端是快速测试的有效方法。并轻松测试您的 Flink SQL 代码。 SQL 客户端旨在提供一个交互式环境,您可以在其中运行 SQL 查询并查看结果。这使得您可以轻松测试代码并快速进行更改。但是,您大多只能使用 SQL 客户端执行手动测试。为了进行更全面的测试,您应该使用自动化测试工具。

自动化测试工具可以提供一种使用多个数据集和各种场景测试代码的方法。这可以帮助识别手动测试中可能无法立即看到的问题。自动化测试包括单元测试和集成测试。单元测试用于测试应用程序的各个组件,而集成测试用于测试不同组件之间的集成。它们都有助于识别与数据处理相关的问题,例如不正确的 SQL 语法或不正确的数据转换。

通过单元和集成测试进行测试

此测试方法涉及使用单元和集成测试来验证代码的行为。它很容易实现自动化,这使得它成为测试较小单元(例如查询或函数的一部分)的便捷选择。此外,它是高度可定制的,允许您使用特定的输入数据定义单独的测试场景,而无需更改日志流并控制行时间属性。这种方法的另一个好处是,它可以在开发用户定义的函数时实现快速周转时间。这可以帮助您在开发过程的早期识别类型推断问题和其他问题。但是,这种测试方法涉及使用非 SQL 代码,这可能需要一些 Java 或 Scala 知识才能有效地使用它。这可能包括理解这些编程语言的语法和基本概念,以及与它们一起使用的任何专用库或框架。虽然这可能需要一些前期学习投资,但对于那些熟悉这些语言并希望自动化某些流程或任务的人来说,这可能是一个很好的方法。

Flink 1.12 或更低版本中的单元和集成测试

如果您需要提供输入数据出于测试目的,一种选择是使用 TableEnvironment.fromValues() 方法。但是,需要注意的是,此方法仅支持插入,不允许更改日志流。此外,它不提供对数据顺序的任何控制,这在某些情况下可能是必要的。此外,没有可用的行时间或水印选项。另一种方法是将值连接器与 TestValuesTableFactory 结合使用。这允许您使用表 DDL 中可用的全部选项来定义输入表。这包括指定行时间和水印属性,以及表 DDL 的任何其他特性。但是,需要注意的是,这是一个非公共 API,这意味着它不适合在生产环境中使用,并且可能不会得到供应商的完全支持。

Flink 1.13 或更高版本中的单元和集成测试

如果您需要在 Flink 1.13(或更高版本)中提供输入数据用于测试目的,一种选择是使用 TableEnvironment.fromValues() 方法。不过,需要注意的是,该方法与之前版本的 Flink(1.12 或更早版本)具有相同的限制。另一种方法是使用 StreamTableEnvironment.fromChangelogStream() 方法,该方法允许您将输入定义为 aDataStreamorDataStream 具有 RowKind 属性。此方法提供自动类型转换并在操作之间保留事件时间和水印。此外,它允许您像在表 DDL 中一样定义自定义架构定义,从而提供更大的灵活性和对输入数据的控制。总的来说,在 Flink 1.13(或更高版本)中,使用 fromChangelogStream() 方法可以是一种更强大、更通用的方法来提供用于测试的输入数据。

在表中收集查询或操作的结果

测试 Apache Flink SQL 代码的注意事项

以下是测试 Apache Flink SQL 代码时需要注意的一些事情:

  • 使用正确的测试工具。 有许多不同的测试工具可用于测试 Apache Flink SQL 代码。选择正确的工具取决于您的特定需求。例如,如果您需要测试复杂的应用程序,则需要使用功能更强大的测试工具。
  • 编写可重复的测试。 您的测试应该是可重复的,这意味着您应该能够在不同的环境中运行它们并获得相同的结果。这将有助于您发现代码中的错误并验证您的更改。
  • 编写可维护的测试。 您的测试应该是可维护的,这意味着您应该能够轻松地添加新测试或更改现有的测试。这将有助于您随着代码的更改保持您的测试库的最新状态。
  • 编写可读的测试。 您的测试应该是可读的,这意味着您应该能够轻松地理解它们的目的和输出。这将有助于您在需要时更轻松地调试您的测试。
  • 编写全面的测试。 您的测试应该全面,这意味着您应该测试代码的所有方面。这将有助于您确保您的代码是正确的和可靠的。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 13,2023

Flink SQL:查询、窗口和时间 – 第 2 部分

如何创建时间窗口

在上一篇文章中,我们讨论了创建时间窗口的必要性以及如何选择时间窗口的长度。在本文中,我们将更深入地了解如何创建时间窗口。

时间窗口 是收集数据的一段时间段。时间窗口的长度将取决于所收集的数据类型和研究的目的。例如,如果您有兴趣研究新产品对消费者行为的影响,则需要收集与有兴趣研究新营销活动对销售影响的消费者行为相比更长的时间段的数据。

在选择适合您案例的时间窗口时,您应该考虑以下属性:

  • 数据的频率:如果每天、每周、每月等收集数据。
  • 数据的季节性:如果数据受季节性影响(例如,销售数据通常在假期期间更高),您需要在选择时间窗口时考虑这一点。
  • 数据的稳定性:如果数据不稳定(例如,股票价格),您需要在选择时间窗口时考虑这一点。
  • 时间窗口的长度:时间窗口的长度将取决于所收集的数据类型和研究的目的。

一旦您考虑了数据的属性,您就可以为您的案例选择最合适的时间窗口。

如何创建时间窗口

有几种不同方法可以创建时间窗口。最常见的方法是使用滚动窗口、滑动窗口 或固定窗口。

  • 滚动窗口:滚动窗口是一组连续的时间段。每个时间段都会随着新数据的到来而向前移动。
  • 滑动窗口:滑动窗口是一组重叠的时间段。每个时间段都会随着新数据的到来而向前移动,但不会覆盖之前的时间段。
  • 固定窗口:固定窗口是一组大小相同的时间段。这些窗口不会随着新数据的到来而移动。

哪种方法最适合您将取决于您的特定需求。例如,如果您想跟踪一段时间内的趋势,则滚动窗口可能是最好的选择。如果您想跟踪一段时间内的变化,则滑动窗口可能是最好的选择。如果您想跟踪一段时间内的固定间隔的数据,则固定窗口可能是最好的选择。

如何使用时间窗口

一旦您创建了时间窗口,您就可以使用它来聚合数据或运行查询。例如,您可以使用时间窗口来计算一段时间内的平均值、最大值或最小值。您还可以使用时间窗口来运行查询以查找特定时间段内的数据。

使用时间窗口的示例

以下是使用时间窗口的示例:

  • 计算一段时间内的平均值:您可以使用时间窗口来计算一段时间内的平均值。例如,您可以使用滚动窗口来计算过去 1 分钟内的平均温度。
  • 查找特定时间段内的数据:您可以使用时间窗口来查找特定时间段内的数据。例如,您可以使用滑动窗口来查找过去 1 小时内发生的所有交易。

结论

时间窗口是用于聚合数据或运行查询的强大工具。了解如何创建和使用时间窗口可以帮助您更好地分析您的数据。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 13,2023

Flink-Kafka连接器的流模式

介绍

这篇博文将介绍 Flink Table API 中提供的 Kafka 连接器。读完这篇博文,您将更好地了解哪种连接器更适合特定的应用程序。

Flink DataStream API 中的 Kafka 连接器

Flink DataStream API 提供了一个 Kafka 连接器,它工作在附加模式下,可以被您用 Scala/Java API 编写的 Flink 程序使用。除了这个,Flink 的 Table API 还提供了两种 Kafka 连接器:

  • Kafka-unboundedsource,对sink使用“append 模式”
  • Upsert Kafka-unboundedsource,对sink使用“upsert 模式”

这篇博文将专注于用于 Table API 的 Kafka 连接器。我还将尝试回答何时使用 Kafka 连接器(追加)或选择 Upsert Kafka 连接器的问题。

简单的 Kafka 连接器 – 追加模式

以下示例是将数据从内存数据流复制到输出 Kafka 主题。在生产场景中,输入数据可以丰富或聚合,但我们将保持这个示例简单,以展示 Flink 在使用第一个 Kafka 连接器时的行为。

首先,创建一个表,其中包含订单作为流数据的来源,这些数据是由数据生成连接器提供的:

CREATE TABLE `orders` (
`id` INT,
`bid` DOUBLE,
`order_time` AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND() * -3 + 5) * -1 AS INTEGER), CURRENT_TIMESTAMP)
)
WITH (
'connector' = 'datagen',
'fields.id.kind' = 'random',
'fields.id.max' = '100',
'fields.id.min' = '1',
'每秒行数' = '100'
);

然后,使用 Kafka 连接器创建一个输出表作为接收器来存储输入数据:

CREATE TABLE `orders_sink_append` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3)
)
WITH (
'connector' = 'kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_sink_append',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'order-receiver-append',
'值.格式' = 'csv'
);

要运行本文中的所有 Flink 代码示例,您需要使用 Ververica Platform (VVP),它可以在任何 Kubernetes 集群上轻松安装:

  • VVP 文档:安装在 Google Kubernetes Engine 上开始使用 Ververica Platform
  • 在 Azure Kubernetes 服务上开始使用 Ververica Platform
  • 在 AWS EKS 上开始使用 Ververica Platform

执行上述表 DDL 以在 VVP 的内置目录中注册新表。这可以通过打开 VVP -> SQL -> 编辑器窗口来完成。然后选择每个“CREATE TABLE … ;”单独声明并单击右侧的“运行选择”。

现在我们可以使用以下 SQL 脚本在 VVP 中创建并启动 Flink SQL 部署。它将生成的数据流连续存储到带有 Kafka 连接器的 Kafka 主题中,即以追加模式运行。

选择 SQL 查询并单击“运行选择”来运行下面的 SQL 查询:

INSERT INTO `orders_sink_append` SELECT * FROM `orders`;

VVP 将引导您完成新的 VVP 部署过程。只需遵循它并单击“开始”按钮即可。

以下是从上面的 SQL 查询创建的 VVP 部署的概述:

Kafka 连接器 – Upsert 模式

让我们看看另一个连接器以及它的不同之处。输入表的定义保持不变,但接收器连接器设置为“upsert-kafka”。为了清楚起见,让我们使用“upsert-kafka”连接器创建一个克隆表。

CREATE TABLE `orders_sink_upserts` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3),
`PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_upserts',
'properties.group.id' = 'order-upserts-consumers',
'值.格式' = 'csv'
);

与上一节类似,我们创建另一个 VVP 部署将数据存储到表 orders_sink_upserts 中,使用“upsert-kafka”连接器和以下 SQL 语句:

INSERT INTO `orders_sink_upserts` SELECT * FROM `orders`;

VVP 部署的概述和作业图看起来与以前一样:

Flink Job 图的拓扑保持不变:

让我们检查 orders_sink_upserts 主题/表的输出:

SELECT * FROM `orders_sink_upserts`;

您可以看到 VVP SQL 编辑器会话 i 显示 100 个插入 (-I),然后其余更改是更新 (+U、-U)。datagen 中配置了 100 个唯一的订单 ID。这就是为什么仅在此处获取 100 条插入的原因,其余所有都是对这 100 个唯一订单的更新。

当您使用 Kafka 支持的 SQL 表时,这是两种流模式“append”和“upsert”之间的主要区别。Upsert 模式可以轻松获取最新更改或了解流数据是否是新的或是否应视为更新或删除。当特定键的任何值为 NULL 时,就会检测到删除。

“upsert-kafka”如何检测 upsert?

首先,任何使用“upsert-kafka”连接器的表都必须有一个主键。在上面的示例中,它是:

PRIMARY KEY (`id`) NOT ENFORCED

您还可以看到,Flink 在使用“upsert-kafka”表中的数据时又注入了一个运算符“ChangeLogNormalize”。注入的运算符聚合输入数据并返回特定主键的最新记录。

下面是另一个 VVP 部署来展示这一点。它将 upsert 表中的数据打印到标准输出:

CREATE TEMPORARY TABLE SinkTable WITH (‘connector’ = ‘print’) LIKE orders_sink_upserts (EXCLUDING OPTIONS);

INSERT INTO `SinkTable` SELECT * FROM `orders_sink_upserts`;

相反,如果从使用 append 模式工作的 orders_sink_append 读取数据,Flink 不会将 ChangelogNormalize 操作符注入到作业图中:

CREATE TEMPORARY TABLE `SinkTable`
WITH ('connector' = 'print')
LIKE `orders_sink_append`
(EXCLUDING OPTIONS);
INSERT INTO `SinkTable` SELECT * FROM `orders_sink_append`;

连接表:upsert 与追加模式

当多个表连接在一起时,两种不同的流模式会产生很大的差异。这种差异可能会导致数据重复。以下示例展示了如何在连接由 Kafka 主题支持的两个 Flink 表时避免数据重复。

以下是一个关于出租车运行的 Flink SQL 作业示例。我们有一个汽车注册表,每个汽车在第一个表中都有一个“蓝色”或“黑色”类别。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 12,2023

在 Flink SQL 中加入高度倾斜的流

Flink SQL 是一种功能强大的工具,可用于批处理和流式处理。它提供低代码数据分析,并符合 SQL 标准。在生产系统中,我们的客户发现,随着工作负载的扩展,以前运行良好的 SQL 作业可能会显著减慢,甚至失败。数据倾斜是一个常见且重要的原因。

数据倾斜是指变量的概率分布关于其均值的不对称性。换句话说,数据在某些属性上分布不均匀。本文讨论并分析了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。如果您是该领域的新手,或者有兴趣了解更多有关 Flink 或 Flink SQL 的信息,请查看末尾的相关信息。

加入具有键偏差的流

考虑以下场景:我们有一个 Users 表,其中包含有关某些业务应用程序的用户的信息。我们有一个 GenOrders 表,其中包含有关订单(买/卖东西)的信息。我们想知道 Users 表中每个用户的订单数。

(简化的)查询可能如下所示:

SQL
SELECT o.uid, COUNT(o.oid)
FROM GenOrders o
JOIN Users u ON o.uid = u.uid
GROUP BY o.uid;

在流连接的上下文中,了解两个表代表连续的信息流非常重要。在 Flink 中,聚合(例如 COUNT 和 SUM)是由聚合算子执行的。聚合运算符是有状态的,它将中间聚合结果存储在其状态中。默认情况下,流聚合运算符会逐条处理输入记录。当一条记录进来时,操作员会执行以下步骤:

  1. 从状态中检索累加器。
  2. 将记录累加/收回到累加器。
  3. 将累加器存储回状态。

每次读取状态的/写入是有一定成本的。

数据倾斜

在大规模的 Flink 应用中,流通常会根据特定的 key 来划分,并分发到多个任务中进行并行处理。我们将这样的 key 称为“分组 key”。如果记录分布不均匀,则某些任务会比其他任务重。而且较重的任务需要更长的时间才能完成,并且可能成为数据管道的瓶颈。当这种情况发生时,我们说数据出现了偏差。此外,如果数据是基于某些键分布的,我们将其称为“键倾斜”。

对于上述用例,我们可以按用户 ID 将记录分布到聚合任务以进行并行处理。由于我们要查找每个用户的订单数,因此按用户 ID 对数据进行分组是有意义的。下图使用颜色来指示不同用户的数据记录。我们可以看到,Red 用户有 8 条记录,比其他用户多得多。在这种情况下,我们称数据在用户 ID 上存在偏差。如果我们按照用户 ID 分配数据,那么处理红色记录的顶级聚合任务会处理更多的数据,并且可能会比其他任务花费更长的时间。

如何处理它?

MiniBatch 聚合

MiniBatch 聚合将输入记录放入缓冲区并在缓冲区满后或一段时间后执行聚合操作。这样,缓冲区中具有相同 key 值(键值)的记录会一起处理,因此每批每个键值只有一次状态写入。小批量聚合提高了吞吐量,因为聚合运算符的状态访问次数较少,并且输出较少的记录,特别是当有撤回消息且下游算子的性能不佳时。下图演示了这一点。

我们可以使用以下选项来实现:

Properties
table.exec.mini-batch.enabled: true # 启用 mini-批
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000 # [可选] MiniBatch 可以缓冲的最大输入记录数

  • 增量聚合状态

由于明显的键偏差,部分聚合可能会变得很大。一种解决方案是采用三个阶段的聚合。首先,每个上游任务对每个不同的键值进行本地聚合。不维护本地聚合的状态。这些结果按照分组键和桶键进行划分,并发送到第二阶段聚合(称为增量聚合)。增量聚合仅将不同的键存储在其状态中。增量聚合的结果按照分组键进行划分,并发送到第三阶段聚合(最终聚合),从而保持聚合函数值的状态。这种技术称为增量聚合。

我们可以使用以下选项可利用增量聚合以及小批量聚合、本地/全局聚合和拆分不同聚合:

Properties
table.exec.mini-batch.enabled: true # 启用小批量
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000
table.optimizer.agg-phase-strategy: TWO_PHASE/AUTO # 启用本地/全局聚合
table.optimizer.distinct-agg.split.enabled: true # 启用拆分不同聚合
table.optimizer.distinct-agg.split.bucket-num: 1024 # 桶数
table.optimizer.incremental-agg-enabled: true # 启用增量聚合,默认为true

注意:查询也必须支持 MiniBatch 聚合、Local/Global 聚合和 Split Distinct 聚合。

  • 结论

在本文中,我们讨论了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。我们讨论了三种不同的解决方案:MiniBatch 聚合、本地/全局聚合和增量聚合。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 7,2023

Flink SQL:查询、窗口和时间 – 第 1 部分

时间是流处理中的一个关键因素,因为数据在到达时就被处理,并且必须快速处理以避免延迟。流处理中时间的普遍性意味着数据处理的设计必须考虑到时间因素。基于时间的窗口是流处理中常用的技术,用于确保数据得到及时处理。流处理可用于各种应用程序,例如监控系统、欺诈检测以及任何想要提供实时数据的应用程序。 -时间洞察。流处理中普遍存在的时间给数据处理带来了挑战和机遇。通过正确的设计,流处理可用于提供对数据流的实时洞察。在这篇文章中,我们将了解在使用 Flink SQL 时如何考虑时间。

时间戳和查询

在流处理中,时间戳是用于记录事件发生的时间。此信息可用于确定处理事件所需的时间,或监视流处理系统的性能。时间戳还可以用于对同时发生的事件进行排序。

示例:

  • 用户交互:点击
  • 应用程序日志:应用程序
  • 机器事务:信用卡、广告服务
  • 传感器:手机、汽车、物联网

流处理中涉及时间的查询通常是用于分析一段时间内的数据。这可能涉及查找数据中的趋势或模式,或比较不同时间段的数据。流处理系统通常提供对数据加窗的方法,以便仅考虑特定时间段的数据。这使得可以对传入的数据进行实时分析,或者对历史数据进行分析。

示例:

  • 最后一分钟的平均值
  • 使用最新汇率加入
  • 在 5 分钟内尝试 3 次失败后发出警报

时间属性

流处理中有多种不同的时间属性。它们是事件时间、处理时间和摄取时间。

  • 事件时间是事件发生时的时间戳。
  • 处理时间是处理事件的时间戳。
  • 摄取时间是事件被摄取到系统中的时间戳。

事件时间是唯一完全由用户控制的时间属性。所有其他时间属性均由系统控制。

事件时间允许用户控制事件发生的时间,这在某些情况下非常重要。处理时间可能会受到系统速度的影响。在某些情况下,系统可能会很慢并且处理时间可能会延迟。在其他情况下,系统可能很快,处理时间可能比预期早。

摄取时间完全不受用户控制。提取时间由系统控制,取决于系统的速度。

事件时间属性

事件时间属性是带有关联水印的 TIMESTAMP 或 TIMESTAMP_LTZ。水印使用有界无序水印策略。

  • TIMESTAMP 是一种记录精确到小数秒的日期和时间的数据类型,而 TIMESTAMP_LTZ(本地时区)是一种存储日期、时间和本地时区的数据类型。

访问 Flink 官方文档网站了解更多信息。

CREATE TABLE clicks (
  user STRING,
  url STRING,
  cTime TIMESTAMP_LTZ(3),
  cTime WITH WATERMARK AS cTime - INTERVAL '2' MINUTES
)

处理时间属性

处理时间属性是一个计算列,不保存数据;每当访问该属性时都会查询本地计算机时间。处理时间属性可以像常规 TIMESTAMP_LTZ 一样使用。

CREATE TABLE clicks (
  user STRING,
  url STRING,
  cTime AS PROCTIME()
)

随着时间推移,状态可能会过期,例如,在长达一小时的窗口中对每个用户的点击进行计数。

  • 事件时间 windows:clicks 将计入其发生的小时。水印触发关闭窗口并丢弃其状态。
  • 处理时间 windows:clicks 将计入当前时间处理时的小时。本地系统时钟触发关闭窗口并丢弃其状态。

如果输入表中没有时间属性,窗口操作员将不知道窗口何时完成。

时间戳与时间属性

在流处理中有两种表示时间的常见方法:时间戳和时间属性。时间戳是与事件关联的时间点,而时间属性可以存在于每个表模式中。时间戳更精确,但时间属性更灵活,可用于表示复杂的时间关系。

时态运算符

时态运算符是流处理中处理基于时间的数据的一种方法。有几种不同类型的时态运算符:

  • 窗口:窗口是数据的集合,在特定的时间范围内被处理。窗口可以是滚动的、跳跃的,或者会话的。
  • 聚合:聚合是对数据的集合进行操作,以产生单个值。聚合可以是平均值、计数或总和等。
  • 连接:连接是将两个数据集合结合起来,以便可以根据它们的时间属性进行比较。连接可以是基于事件时间、处理时间或摄取时间的。
  • 模式匹配:模式匹配是对数据的集合进行操作,以查找满足特定模式的数据。模式可以是简单的,如连续的数字,也可以是复杂的,如识别欺诈交易的模式。

时态运算符及时跟踪进度,以确定输入何时完成。它们发出无法更新的最终结果行,并且能够丢弃不再需要的状态(记录和结果)。

窗口示例

窗口是流处理中处理基于时间的数据的一种常见方法。有几种不同类型的窗口,每个窗口都有其优点和缺点。

  • 滚动窗口:滚动窗口是数据的集合,在特定的时间范围内被处理。滚动窗口随着时间的推移而移动,因此每次窗口中的数据都不同。滚动窗口适用于需要实时分析的数据。
  • 跳跃窗口:跳跃窗口是数据的集合,在特定的时间间隔内被处理。跳跃窗口不随着时间的推移而移动,因此每次窗口中的数据相同。跳跃窗口适用于需要每隔一段时间分析的数据。
  • 会话窗口:会话窗口是数据的集合,在没有活动的情况下保持打开状态。会话窗口适用于需要分析数据流中的活动的数据。

窗口表值函数

窗口表值函数 (WTVF) 是一种特殊函数,可用于从窗口中返回数据。WTVF 可以用于聚合窗口数据、计算窗口统计信息或查找模式。

结论

在流处理中,时间是一个关键因素。时间属性和时态运算符是流处理中处理基于时间的数据的强大工具。通过正确的设计,可以使用这些工具来提供对数据流的实时洞察。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 7,2023

Flink SQL:Join 系列 3(Lateral Joins、LAG 聚合函数)

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。

什么是横向连接?

横向连接是一种 SQL 连接类型,允许您在 FROM 子句中指定子查询。然后针对外部查询中的每一行执行该子查询。横向联接可以通过减少表扫描次数来提高 SQL 查询的性能。换句话说,您可以将横向联接视为 SQL 中的 foreach 循环,它迭代集合,在每次迭代上应用一些转换,并且产生输出。横向联接在处理以分层或嵌套格式存储的数据时非常有用。

如何执行横向表联接

此示例将展示如何使用横向联接关联事件。给定一个包含人员地址的表,您需要找到每个州有两个人口最多的城市,并随着人们的流动而不断更新这些排名。

首先,使用连续聚合来计算每个城市的人口。虽然这很简单,但当人们移动时,Flink SQL 的真正威力就会显现出来。通过使用重复数据删除,当一个人搬家时,Flink 会自动为他们的旧城市发出撤回请求。因此,如果约翰从纽约搬到洛杉矶,纽约的人口将自动减少 1。这为我们提供了变更数据捕获的能力,而无需投资于设置它的实际基础设施!

有了这种动态手头有填充表后,您就可以使用 LATERAL 表连接来解决原始问题。与普通联接不同,横向联接允许子查询与 FROM 子句中其他参数的列相关联。与常规子查询不同,作为联接,横向可以返回多行。

sql复制代码
CREATE TABLE People (
    id INTEGER,
    city STRING,
    state STRING,
    arrival_time TIMESTAMP(3),
    arrival_watermark AS arrival_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'fake',
    'fields.id.expression' = '#{number.numberBetween ''1'',''100''}',
    'fields.city.expression' = '#{regexify ''(New York|Newport|Port|Shoesfort|Springfield){1}''}',
    'fields.state.expression' = '#{regexify ''(New York|Illinois|California|Washington){1}''}',
    'fields.arrival_time.expression' = '#{date.past ''15'',''seconds''}',
    'rows-per-second' = '10'
);
sql复制代码
CREATE TEMPORARY VIEW current_population AS
SELECT
    city,
    state,
    COUNT(*) AS population
FROM (
    SELECT
        city,
        state,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
    FROM People
) WHERE rownum = 1
GROUP BY city, state;
sql复制代码
SELECT
    state,
    city,
    population
FROM
    (SELECT DISTINCT state FROM current_population) states,
    LATERAL (
        SELECT city, population
        FROM current_population
        WHERE state = states.state
        ORDER BY population DESC
        LIMIT 2
    );

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east

上一 1 … 3 4 5 … 7 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.