Flink-Kafka连接器的流模式

介绍

这篇博文将介绍 Flink Table API 中提供的 Kafka 连接器。读完这篇博文,您将更好地了解哪种连接器更适合特定的应用程序。

Flink DataStream API 中的 Kafka 连接器

Flink DataStream API 提供了一个 Kafka 连接器,它工作在附加模式下,可以被您用 Scala/Java API 编写的 Flink 程序使用。除了这个,Flink 的 Table API 还提供了两种 Kafka 连接器:

  • Kafka-unboundedsource,对sink使用“append 模式”
  • Upsert Kafka-unboundedsource,对sink使用“upsert 模式”

这篇博文将专注于用于 Table API 的 Kafka 连接器。我还将尝试回答何时使用 Kafka 连接器(追加)或选择 Upsert Kafka 连接器的问题。

简单的 Kafka 连接器 – 追加模式

以下示例是将数据从内存数据流复制到输出 Kafka 主题。在生产场景中,输入数据可以丰富或聚合,但我们将保持这个示例简单,以展示 Flink 在使用第一个 Kafka 连接器时的行为。

首先,创建一个表,其中包含订单作为流数据的来源,这些数据是由数据生成连接器提供的:

CREATE TABLE `orders` (
`id` INT,
`bid` DOUBLE,
`order_time` AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND() * -3 + 5) * -1 AS INTEGER), CURRENT_TIMESTAMP)
)
WITH (
'connector' = 'datagen',
'fields.id.kind' = 'random',
'fields.id.max' = '100',
'fields.id.min' = '1',
'每秒行数' = '100'
);

然后,使用 Kafka 连接器创建一个输出表作为接收器来存储输入数据:

CREATE TABLE `orders_sink_append` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3)
)
WITH (
'connector' = 'kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_sink_append',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'order-receiver-append',
'值.格式' = 'csv'
);

要运行本文中的所有 Flink 代码示例,您需要使用 Ververica Platform (VVP),它可以在任何 Kubernetes 集群上轻松安装:

  • VVP 文档:安装在 Google Kubernetes Engine 上开始使用 Ververica Platform
  • 在 Azure Kubernetes 服务上开始使用 Ververica Platform
  • 在 AWS EKS 上开始使用 Ververica Platform

执行上述表 DDL 以在 VVP 的内置目录中注册新表。这可以通过打开 VVP -> SQL -> 编辑器窗口来完成。然后选择每个“CREATE TABLE … ;”单独声明并单击右侧的“运行选择”。

现在我们可以使用以下 SQL 脚本在 VVP 中创建并启动 Flink SQL 部署。它将生成的数据流连续存储到带有 Kafka 连接器的 Kafka 主题中,即以追加模式运行。

选择 SQL 查询并单击“运行选择”来运行下面的 SQL 查询:

INSERT INTO `orders_sink_append` SELECT * FROM `orders`;

VVP 将引导您完成新的 VVP 部署过程。只需遵循它并单击“开始”按钮即可。

以下是从上面的 SQL 查询创建的 VVP 部署的概述:

Kafka 连接器 – Upsert 模式

让我们看看另一个连接器以及它的不同之处。输入表的定义保持不变,但接收器连接器设置为“upsert-kafka”。为了清楚起见,让我们使用“upsert-kafka”连接器创建一个克隆表。

CREATE TABLE `orders_sink_upserts` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3),
`PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_upserts',
'properties.group.id' = 'order-upserts-consumers',
'值.格式' = 'csv'
);

与上一节类似,我们创建另一个 VVP 部署将数据存储到表 orders_sink_upserts 中,使用“upsert-kafka”连接器和以下 SQL 语句:

INSERT INTO `orders_sink_upserts` SELECT * FROM `orders`;

VVP 部署的概述和作业图看起来与以前一样:

Flink Job 图的拓扑保持不变:

让我们检查 orders_sink_upserts 主题/表的输出:

SELECT * FROM `orders_sink_upserts`;

您可以看到 VVP SQL 编辑器会话 i 显示 100 个插入 (-I),然后其余更改是更新 (+U、-U)。datagen 中配置了 100 个唯一的订单 ID。这就是为什么仅在此处获取 100 条插入的原因,其余所有都是对这 100 个唯一订单的更新。

当您使用 Kafka 支持的 SQL 表时,这是两种流模式“append”和“upsert”之间的主要区别。Upsert 模式可以轻松获取最新更改或了解流数据是否是新的或是否应视为更新或删除。当特定键的任何值为 NULL 时,就会检测到删除。

“upsert-kafka”如何检测 upsert?

首先,任何使用“upsert-kafka”连接器的表都必须有一个主键。在上面的示例中,它是:

PRIMARY KEY (`id`) NOT ENFORCED

您还可以看到,Flink 在使用“upsert-kafka”表中的数据时又注入了一个运算符“ChangeLogNormalize”。注入的运算符聚合输入数据并返回特定主键的最新记录。

下面是另一个 VVP 部署来展示这一点。它将 upsert 表中的数据打印到标准输出:

CREATE TEMPORARY TABLE SinkTable WITH (‘connector’ = ‘print’) LIKE orders_sink_upserts (EXCLUDING OPTIONS);

INSERT INTO `SinkTable` SELECT * FROM `orders_sink_upserts`;

相反,如果从使用 append 模式工作的 orders_sink_append 读取数据,Flink 不会将 ChangelogNormalize 操作符注入到作业图中:

CREATE TEMPORARY TABLE `SinkTable`
WITH ('connector' = 'print')
LIKE `orders_sink_append`
(EXCLUDING OPTIONS);
INSERT INTO `SinkTable` SELECT * FROM `orders_sink_append`;

连接表:upsert 与追加模式

当多个表连接在一起时,两种不同的流模式会产生很大的差异。这种差异可能会导致数据重复。以下示例展示了如何在连接由 Kafka 主题支持的两个 Flink 表时避免数据重复。

以下是一个关于出租车运行的 Flink SQL 作业示例。我们有一个汽车注册表,每个汽车在第一个表中都有一个“蓝色”或“黑色”类别。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627