Flink SQL:Join 系列 3(Lateral Joins、LAG 聚合函数)

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。

什么是横向连接?

横向连接是一种 SQL 连接类型,允许您在 FROM 子句中指定子查询。然后针对外部查询中的每一行执行该子查询。横向联接可以通过减少表扫描次数来提高 SQL 查询的性能。换句话说,您可以将横向联接视为 SQL 中的 foreach 循环,它迭代集合,在每次迭代上应用一些转换,并且产生输出。横向联接在处理以分层或嵌套格式存储的数据时非常有用。

如何执行横向表联接

此示例将展示如何使用横向联接关联事件。给定一个包含人员地址的表,您需要找到每个州有两个人口最多的城市,并随着人们的流动而不断更新这些排名。

首先,使用连续聚合来计算每个城市的人口。虽然这很简单,但当人们移动时,Flink SQL 的真正威力就会显现出来。通过使用重复数据删除,当一个人搬家时,Flink 会自动为他们的旧城市发出撤回请求。因此,如果约翰从纽约搬到洛杉矶,纽约的人口将自动减少 1。这为我们提供了变更数据捕获的能力,而无需投资于设置它的实际基础设施!

有了这种动态手头有填充表后,您就可以使用 LATERAL 表连接来解决原始问题。与普通联接不同,横向联接允许子查询与 FROM 子句中其他参数的列相关联。与常规子查询不同,作为联接,横向可以返回多行。

sql复制代码
CREATE TABLE People (
    id INTEGER,
    city STRING,
    state STRING,
    arrival_time TIMESTAMP(3),
    arrival_watermark AS arrival_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'fake',
    'fields.id.expression' = '#{number.numberBetween ''1'',''100''}',
    'fields.city.expression' = '#{regexify ''(New York|Newport|Port|Shoesfort|Springfield){1}''}',
    'fields.state.expression' = '#{regexify ''(New York|Illinois|California|Washington){1}''}',
    'fields.arrival_time.expression' = '#{date.past ''15'',''seconds''}',
    'rows-per-second' = '10'
);
sql复制代码
CREATE TEMPORARY VIEW current_population AS
SELECT
    city,
    state,
    COUNT(*) AS population
FROM (
    SELECT
        city,
        state,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
    FROM People
) WHERE rownum = 1
GROUP BY city, state;
sql复制代码
SELECT
    state,
    city,
    population
FROM
    (SELECT DISTINCT state FROM current_population) states,
    LATERAL (
        SELECT city, population
        FROM current_population
        WHERE state = states.state
        ORDER BY population DESC
        LIMIT 2
    );

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627