Flink1.7文档 时间表函数

时间表函数提供了在特定时间点访问时间表版本的功能。为了访问时间表中的数据,必须传递一个时间属性,该属性确定返回的表的版本。Flink 使用表函数的 SQL 语法来提供这种访问方式。

与版本化表不同,时间表函数只能在追加-only 流上定义——它不支持变更日志输入。此外,时间表函数不能通过纯 SQL DDL 来定义。

定义时间表函数

时间表函数可以使用 Table API 在追加-only 流上定义。表会注册一个或多个键列,以及用于版本控制的时间属性。

假设我们有一个追加-only 的货币汇率表,我们希望将其注册为时间表函数。

SELECT * FROM currency_rates; 
update_timecurrencyrate
09:00:00Yen102
09:00:00Euro114
09:00:00USD1
11:15:00Euro119
11:49:00Pounds108

使用 Table API,我们可以使用 currency 作为键,并将 update_time 作为版本时间属性来注册该流。

Java 示例:

TemporalTableFunction rates = tEnv
    .from("currency_rates")
    .createTemporalTableFunction("update_time", "currency");
 
tEnv.createTemporarySystemFunction("rates", rates);      

时间表函数连接

定义时间表函数后,它可以作为标准表函数使用。追加-only 表(左输入/探测方)可以与时间表(右输入/构建方)连接,即一个随着时间变化并跟踪其变化的表,用于在特定时间点获取某个键的值。

考虑一个追加-only 表 orders,它跟踪客户的订单并使用不同的货币。

SELECT * FROM orders; 
order_timeamountcurrency
10:152Euro
10:301USD
10:3250Yen
10:523Euro
11:045USD

给定这些表,我们希望将订单转换为一种统一的货币——美元(USD)。

SQL 查询:

SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注