Flink SQL 连接 – 第 2 部分

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了额外的工具,用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器,以及操作生成的长时间运行的查询。我们已经看到 Flink SQL 有很多用例,我们很高兴看到什么你将用它来建造。在这个由三部分组成的博客文章系列中,我们将向您展示 Flink SQL 中不同类型的联接以及如何使用它们以多种方式处理数据。

在本文中,您将学习:什么是非压缩和压缩 Kafka 主题什么是时态表如何执行非压缩和压缩 Kafka 主题之间的时态表连接什么是实时星型模式反规范化如何执行实时星型模式反规范化时态表连接与非压缩和压缩 Kafka 主题Apache Kafka 最基本的组织单元是主题,它类似于关系数据库中的表。 Kafka 主题可以是压缩的,也可以是非压缩的。压缩主题会在特定时间段内保留所有消息,即使它们已被删除。非压缩主题仅保留未删除的消息。为了连接压缩和非压缩主题,您可以使用时态表连接。时态表是随时间变化的表。这在 Flink 中也称为动态表。时态/动态表中的行与一个或多个时态周期相关联。时态表包含一个或多个版本化表快照。时态表联接是一项功能,允许将两个不同时态表中的数据通过公共键联接在一起,并将第二个表中的数据自动插入到第一个表中在适当的时间段或版本化表中的相关版本。当集成来自多个源的数据或处理随时间变化的数据时,这非常有用。这也意味着可以通过不断变化的元数据来丰富表,并在某一时间点检索其值。如何在非压缩和压缩 Kafka 主题之间执行时态表连接此示例将展示如何正确丰富一个 Kafka 主题中的记录当事件顺序很重要时,与另一个 Kafka 主题的相应记录相结合。临时表连接采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 SQL 语法 FOR SYSTEM_TIME AS OF 来执行此操作。在本节中,您将把每笔交易(transactions)连接到截至交易发生时的正确货币汇率(currency_rates,一个版本化表)。

类似的示例是将每个订单与订单发生时的客户详细信息连接起来。这正是事件时间时态表连接的作用。 Flink SQL 中的时态表连接在两个表之间存在无序和任意时间偏差的情况下提供了正确的、确定性的结果。transactions 和currency_rates 表都由 Kafka 主题支持,但在利率的情况下,该主题被压缩(例如当更新的费率流入时,仅保留给定密钥的最新消息)。事务中的记录仅被解释为插入,因此该表由标准 Kafka 连接器 (connector=kafka) 支持;而currency_rates中的记录需要解释为基于主键的upsert,这需要Upsert Kafka连接器(connector=upsert-kafka)。Flink SQL:

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  t.total,
  c.currency_code,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;

Data Generators

这两个主题也使用 Flink SQL 作业填充。我们使用faker连接器根据Java Faker表达式在内存中生成行并将它们写入相应的Kafka主题。

currency_rates Topic

Flink SQL

CREATE TEMPORARY TABLE currency_rates_faker
WITH (
  'connector' = 'faker',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
  'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE currency_rates (EXCLUDING OPTIONS);

INSERT INTO currency_rates SELECT * FROM currency_rates_faker;

Kafka Topic

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topiccurrency_rates –property print.key=true –property key.separator=” – ”
HTG – {“currency_code”:”HTG”,”eur_rate”:0.0136,”rate_time”:”2020-12-16 22:22:02″}
BZD – {“currency_code”:”BZD”,”eur_rate”:1.6545,”rate_time”:”2020-12-16 22:22:03″}
BZD – {“currency_code”:”BZD”,”eur_rate”:3.616,”rate_time”:”2020-12-16 22:22:10″}
BHD – {“currency_code”:”BHD”,”eur_rate”:4.5308,”rate_time”:”2020-12-16 22:22:05″}
KHR – {“currency_code”:”KHR”,”eur_rate”:1.335,”rate_time”:”2020-12-16 22:22:06″}

交易

TopicFlink SQL

CREATE TEMPORARY TABLE transactions_faker
WITH (
  'connector' = 'faker',
  'fields.id.expression' = '#{Internet.UUID}',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
  'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE transactions (EXCLUDING OPTIONS);

INSERT INTO transactions SELECT * FROM transactions_faker;

Kafka Topic

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic transactions –property print.key=true –property key.separator=” – ”
e102e91f-47b9-434e-86e1-34fb1196d91d – {“id”:”e102e91f-47b9-434e-86e1-34fb1196d91d”,”currency_code”:”SGD”,”total”:494.07,”transaction_time”:”2020-12- 16 22:18:46″}
bf028363-5ee4-4a5a-9068-b08392d59f0b – {“id”:”bf028363-5ee4-4a5a-9068-b08392d59f0b”,”currency_code”:”EEK”,”total”:906.8,”transaction_time”:”2020-12- 16 22:18:46″}
e22374b5-82da-4c6d-b4c6-f27a818a58ab – {“id”:”e22374b5-82da-4c6d-b4c6-f27a818a58ab”,”currency_code”:”GYD”,”total”:80.66,”transaction_time”:”2020-12- 16 22:19:02″}
81b2ce89-26c2-4df3-b12a-8ca921902ac4 – {“id”:”81b2ce89-26c2-4df3-b12a-8ca921902ac4″,”currency_code”:”EGP”,”total”:521.98,”transaction_time”:”2020-12- 16 22:18:57″}
53c4fd3f-af6e-41d3-a677-536f4c86e010 – {“id”:”53c4fd3f-af6e-41d3-a677-536f4c86e010″,”currency_code”:”UYU”,”total”:936.26,”transaction_time”:”2020-12- 16 22:18:59″}

实时星型模式非规范化(N 路联接)实时星型模式非规范化是在星型模式中连接两个或多个表的过程,以便结果表中的数据非规范化。这样做可以是出于性能方面的原因,也可以是为了更轻松地查询数据,或者两者兼而有之。通过减少需要执行的联接数量,非规范化可用于提高联接大量表的查询的性能。它还可以通过提供一个包含所有数据的表来更轻松地查询数据,否则这些数据将分布在多个表中。

非规范化过程可以应用于任何模式,但最常用于星型模式,这些模式具有一个中心事实表,周围有许多维度表。事实表包含正在分析的数据,维度表包含可用于描述事实表中数据的数据。对星型模式进行非规范化时,维度表中的数据将合并到事实表中。如何执行实时星型模式非规范化此示例将展示如何使用 n 路时态表连接对简单的星型模式进行非规范化。星型模式是数据仓库中数据标准化的一种流行方法。星型模式的中心是一个事实表,其行包含度量、测量和有关世界的其他事实。周围的事实表是一个或多个维度表,这些表具有在计算查询时可用于丰富事实的元数据。想象一下,您正在为一家铁路公司运行一个小型数据仓库,其中包含一个事实表 (train_activity) 和三个维度表(车站、预订通道和乘客) )。对事实表的所有插入以及对维度表的所有更新都将镜像到 Apache Kafka。

事实表中的记录仅被解释为插入,因此该表由标准 Kafka 连接器 (connector=kafka) 支持;。相反,维度表中的记录是基于 prim 的更新插入ary 键,这需要 Upsert Kafka 连接器(connector=upsert-kafka)。使用 Flink SQL,您现在可以使用 5 路时态表连接轻松地将所有维度连接到我们的事实表。临时表连接采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 SQL 语法 FOR SYSTEM_TIME AS OF 来执行此操作。将事实表与更多(缓慢)变化的维度表连接时,使用时态表连接可以获得一致、可重现的结果。每个事件(事实表中的行)根据事件在现实世界中发生的时间连接到每个维度的相应值。

CREATE TEMPORARY TABLE passengers (
  passenger_key STRING,
  first_name STRING,
  last_name STRING,
  update_time TIMESTAMP(3),
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (passenger_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'passengers',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE stations (
  station_key STRING,
  update_time TIMESTAMP(3),
  city STRING,
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (station_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'stations',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE booking_channels (
  booking_channel_key STRING,
  update_time TIMESTAMP(3),
  channel STRING,
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (booking_channel_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'booking_channels',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE train_activities (
  scheduled_departure_time TIMESTAMP(3),
  actual_departure_date TIMESTAMP(3),
  passenger_key STRING,
  origin_station_key STRING,
  destination_station_key STRING,
  booking_channel_key STRING,
  WATERMARK FOR actual_departure_date AS actual_departure_date - INTERVAL '10' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'train_activities',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

SELECT
  t.actual_departure_date,
  p.first_name,
  p.last_name,
  b.channel,
  os.city AS origin_station,
  ds.city AS destination_station
FROM train_activities t
LEFT JOIN booking_channels FOR SYSTEM_TIME AS OF t.actual_departure_date AS b
ON t.booking_channel_key = b.booking_channel_key;
LEFT JOIN passengers FOR SYSTEM_TIME AS OF t.actual_departure_date AS p
ON t.passenger_key = p.passenger_key
LEFT JOIN stations FOR SYSTEM_TIME AS OF t.actual_departure_date AS os
ON t.origin_station_key = os.station_key
LEFT JOIN stations FOR SYSTEM_TIME AS OF t.actual_departure_date AS ds
ON t.destination_station_key = ds.station_key;

Kafka 主题

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic train_activities –property print.key=true –property key.separator=” – ”

null – {“scheduled_departure_time”:“2020-12-19 13:52:37”,“actual_departure_date”:“2020-12-19 13:52:16”,“passenger_key”:7014937,“origin_station_key”:577,” destination_station_key”:862,”booking_channel_key”:2}
null – {“scheduled_departure_time”:“2020-12-19 13:52:38”,“actual_departure_date”:“2020-12-19 13:52:23”,“passenger_key”:2244807,“origin_station_key”:735,” destination_station_key”:739,”booking_channel_key”:2}
null – {“scheduled_departure_time”:“2020-12-19 13:52:46”,“actual_departure_date”:“2020-12-19 13:52:18”,“passenger_key”:2605313,“origin_station_key”:216,” destination_station_key”:453,”booking_channel_key”:3}
null – {“scheduled_departure_time”:”2020-12-19 13:53:13″,”actual_departure_date”:”2020-12-19 13:52:19″,”passenger_key”:7111654,”origin_station_key”:234,”德stination_station_key”:833,”booking_channel_key”:5}
null – {“scheduled_departure_time”:“2020-12-19 13:52:22”,“actual_departure_date”:“2020-12-19 13:52:17”,“passenger_key”:2847474,“origin_station_key”:763,” destination_station_key”:206,”booking_channel_key”:3}

client Topic

Flink SQL

CREATE TEMPORARY TABLE passengers_faker
WITH (
  'connector' = 'faker',
  'fields.passenger_key.expression' = '#{number.numberBetween ''0'',''10000000''}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'fields.first_name.expression' = '#{Name.firstName}',
  'fields.last_name.expression' = '#{Name.lastName}',
  'rows-per-second' = '1000'
) LIKE passengers (EXCLUDING OPTIONS);

INSERT INTO passengers SELECT * FROM passengers_faker;

Kafka Topic


➜  bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic passengers --property print.key=true --property key.separator=" - "
749049 - {"passenger_key":"749049","first_name":"Booker","last_name":"Hackett","update_time":"2020-12-19 14:02:32"}
7065702 - {"passenger_key":"7065702","first_name":"Jeramy","last_name":"Breitenberg","update_time":"2020-12-19 14:02:38"}
3690329 - {"passenger_key":"3690329","first_name":"Quiana","last_name":"Macejkovic","update_time":"2020-12-19 14:02:27"}
1212728 - {"passenger_key":"1212728","first_name":"Lawerence","last_name":"Simonis","update_time":"2020-12-19 14:02:27"}
6993699 - {"passenger_key":"6993699","first_name":"Ardelle","last_name":"Frami","update_time":"2020-12-19 14:02:19"}
stations Topic

Flink SQL

CREATE TEMPORARY TABLE stations_faker
WITH (
  'connector' = 'faker',
  'fields.station_key.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.city.expression' = '#{Address.city}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE stations (EXCLUDING OPTIONS);

INSERT INTO stations SELECT * FROM stations_faker;

Kafka Topic

➜  bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stations --property print.key=true --property key.separator=" - "
80 - {"station_key":"80","update_time":"2020-12-19 13:59:20","city":"Harlandport"}
33 - {"station_key":"33","update_time":"2020-12-19 13:59:12","city":"North Georgine"}
369 - {"station_key":"369","update_time":"2020-12-19 13:59:12","city":"Tillmanhaven"}
580 - {"station_key":"580","update_time":"2020-12-19 13:59:12","city":"West Marianabury"}
616 - {"station_key":"616","update_time":"2020-12-19 13:59:09","city":"West Sandytown"}

Flink SQL

CREATE TEMPORARY TABLE booking_channels_faker
WITH (
  'connector' = 'faker',
  'fields.booking_channel_key.expression' = '#{number.numberBetween ''0'',''7''}',
  'fields.channel.expression' = '#{regexify ''(bahn\.de|station|retailer|app|lidl|hotline|joyn){1}''}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE booking_channels (EXCLUDING OPTIONS);

INSERT INTO booking_channels SELECT * FROM booking_channels_faker;

Kafka Topic➜ bin

./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic booking_channels –property print.key=true –property key.separator=” – ”
1 – {“booking_channel_key”:”1″,”update_time”:”2020-12-19 13:57:05″,”channel”:”joyn”}
0 – {“booking_channel_key”:”0″,”update_time”:”2020-12-19 13:57:17″,”channel”:”station”}
4 – {“booking_channel_key”:”4″,”update_time”:”2020-12-19 13:57:15″,”channel”:”joyn”}
2 – {“booking_channel_key”:”2″,”update_time”:”2020-12-19 13:57:02″,”channel”:”app”}
1 – {“booking_channel_key”:”1″,”update_time”:”2020-12-19 13:57:06″,”channel”:”retailer”}

摘要在本文中,您了解了非时态表之间的连接压缩和压缩的 Kafka 主题,以及实时星型模式反规范化。您还了解了如何使用 Flink SQL 为这两种类型的场景编写查询。我们鼓励您在 Ververica Platform 上运行这些示例。您可以按照以下简单步骤安装平台。要了解有关 Flink SQL 的更多信息,请查看以下资源:Flink SQL Cookbook 入门 – Ververica Platform 上的 Flink SQL Flink SQL 官方文档 Flink Forward Talk: One SQL, Unified AnalyticsOnly SQL: Empower data Analyst end – 使用 Flink SQL 结束

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627