Flink1.7文档 时间表函数
时间表函数提供了在特定时间点访问时间表版本的功能。为了访问时间表中的数据,必须传递一个时间属性,该属性确定返回的表的版本。Flink 使用表函数的 SQL 语法来提供这种访问方式。
与版本化表不同,时间表函数只能在追加-only 流上定义——它不支持变更日志输入。此外,时间表函数不能通过纯 SQL DDL 来定义。
定义时间表函数
时间表函数可以使用 Table API 在追加-only 流上定义。表会注册一个或多个键列,以及用于版本控制的时间属性。
假设我们有一个追加-only 的货币汇率表,我们希望将其注册为时间表函数。
SELECT * FROM currency_rates; | update_time | currency | rate | 
|---|---|---|
| 09:00:00 | Yen | 102 | 
| 09:00:00 | Euro | 114 | 
| 09:00:00 | USD | 1 | 
| 11:15:00 | Euro | 119 | 
| 11:49:00 | Pounds | 108 | 
使用 Table API,我们可以使用 currency 作为键,并将 update_time 作为版本时间属性来注册该流。
Java 示例:
TemporalTableFunction rates = tEnv
    .from("currency_rates")
    .createTemporalTableFunction("update_time", "currency");
 
tEnv.createTemporarySystemFunction("rates", rates);      时间表函数连接
定义时间表函数后,它可以作为标准表函数使用。追加-only 表(左输入/探测方)可以与时间表(右输入/构建方)连接,即一个随着时间变化并跟踪其变化的表,用于在特定时间点获取某个键的值。
考虑一个追加-only 表 orders,它跟踪客户的订单并使用不同的货币。
SELECT * FROM orders; | order_time | amount | currency | 
|---|---|---|
| 10:15 | 2 | Euro | 
| 10:30 | 1 | USD | 
| 10:32 | 50 | Yen | 
| 10:52 | 3 | Euro | 
| 11:04 | 5 | USD | 
给定这些表,我们希望将订单转换为一种统一的货币——美元(USD)。
SQL 查询:
SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency