Flink SQL:Join 系列 3(Lateral Joins、LAG 聚合函数)
Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。
什么是横向连接?
横向连接是一种 SQL 连接类型,允许您在 FROM 子句中指定子查询。然后针对外部查询中的每一行执行该子查询。横向联接可以通过减少表扫描次数来提高 SQL 查询的性能。换句话说,您可以将横向联接视为 SQL 中的 foreach 循环,它迭代集合,在每次迭代上应用一些转换,并且产生输出。横向联接在处理以分层或嵌套格式存储的数据时非常有用。
如何执行横向表联接
此示例将展示如何使用横向联接关联事件。给定一个包含人员地址的表,您需要找到每个州有两个人口最多的城市,并随着人们的流动而不断更新这些排名。
首先,使用连续聚合来计算每个城市的人口。虽然这很简单,但当人们移动时,Flink SQL 的真正威力就会显现出来。通过使用重复数据删除,当一个人搬家时,Flink 会自动为他们的旧城市发出撤回请求。因此,如果约翰从纽约搬到洛杉矶,纽约的人口将自动减少 1。这为我们提供了变更数据捕获的能力,而无需投资于设置它的实际基础设施!
有了这种动态手头有填充表后,您就可以使用 LATERAL 表连接来解决原始问题。与普通联接不同,横向联接允许子查询与 FROM 子句中其他参数的列相关联。与常规子查询不同,作为联接,横向可以返回多行。
sql复制代码
CREATE TABLE People (
id INTEGER,
city STRING,
state STRING,
arrival_time TIMESTAMP(3),
arrival_watermark AS arrival_time - INTERVAL '1' MINUTE
) WITH (
'connector' = 'fake',
'fields.id.expression' = '#{number.numberBetween ''1'',''100''}',
'fields.city.expression' = '#{regexify ''(New York|Newport|Port|Shoesfort|Springfield){1}''}',
'fields.state.expression' = '#{regexify ''(New York|Illinois|California|Washington){1}''}',
'fields.arrival_time.expression' = '#{date.past ''15'',''seconds''}',
'rows-per-second' = '10'
);
sql复制代码
CREATE TEMPORARY VIEW current_population AS
SELECT
city,
state,
COUNT(*) AS population
FROM (
SELECT
city,
state,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
FROM People
) WHERE rownum = 1
GROUP BY city, state;
sql复制代码
SELECT
state,
city,
population
FROM
(SELECT DISTINCT state FROM current_population) states,
LATERAL (
SELECT city, population
FROM current_population
WHERE state = states.state
ORDER BY population DESC
LIMIT 2
);
关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书