Flink1.7文档 时间表函数
时间表函数提供了在特定时间点访问时间表版本的功能。为了访问时间表中的数据,必须传递一个时间属性,该属性确定返回的表的版本。Flink 使用表函数的 SQL 语法来提供这种访问方式。
与版本化表不同,时间表函数只能在追加-only 流上定义——它不支持变更日志输入。此外,时间表函数不能通过纯 SQL DDL 来定义。
定义时间表函数
时间表函数可以使用 Table API 在追加-only 流上定义。表会注册一个或多个键列,以及用于版本控制的时间属性。
假设我们有一个追加-only 的货币汇率表,我们希望将其注册为时间表函数。
SELECT * FROM currency_rates;
update_time | currency | rate |
---|---|---|
09:00:00 | Yen | 102 |
09:00:00 | Euro | 114 |
09:00:00 | USD | 1 |
11:15:00 | Euro | 119 |
11:49:00 | Pounds | 108 |
使用 Table API,我们可以使用 currency
作为键,并将 update_time
作为版本时间属性来注册该流。
Java 示例:
TemporalTableFunction rates = tEnv
.from("currency_rates")
.createTemporalTableFunction("update_time", "currency");
tEnv.createTemporarySystemFunction("rates", rates);
时间表函数连接
定义时间表函数后,它可以作为标准表函数使用。追加-only 表(左输入/探测方)可以与时间表(右输入/构建方)连接,即一个随着时间变化并跟踪其变化的表,用于在特定时间点获取某个键的值。
考虑一个追加-only 表 orders
,它跟踪客户的订单并使用不同的货币。
SELECT * FROM orders;
order_time | amount | currency |
---|---|---|
10:15 | 2 | Euro |
10:30 | 1 | USD |
10:32 | 50 | Yen |
10:52 | 3 | Euro |
11:04 | 5 | USD |
给定这些表,我们希望将订单转换为一种统一的货币——美元(USD)。
SQL 查询:
SELECT
SUM(amount * rate) AS amount
FROM
orders,
LATERAL TABLE (rates(order_time))
WHERE
rates.currency = orders.currency