Flink SQL:使用 MATCH_RECOGNIZE 检测模式

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 是两全其美的:它使您能够使用 SQL 处理流数据,但它还支持批处理。

Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了额外的工具,用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器,以及操作生成的长时间运行的查询。

我们已经看到 Flink SQL 有很多用例,我们很兴奋看看您将用它构建什么。在这篇博文中,我们将向您展示 MATCH_RECOGNIZE 函数可以做什么。

什么是 MATCH_RECOGNIZE?

MATCH_RECOGNIZE 是 SQL 标准中的一个子句,允许您检测数据中的模式。它类似于许多编程语言中的正则表达式功能。MATCH_RECOGNIZE 允许您:

  • 定义模式
  • 根据这些模式匹配数据
  • 提取与模式匹配的数据部分
  • 对与模式匹配的数据执行操作

例如,您可以使用 MATCH_RECOGNIZE 查找所有表中代表股票价格趋势的行。然后,您可以提取与模式匹配的数据并对其执行进一步分析。

SQL 日常工作中的一个常见(但历史上很复杂)任务是识别数据集中有意义的事件序列 – 也称为复杂事件处理(CEP)。在处理流数据时,这一点变得更加重要,因为您希望对已知模式或不断变化的趋势做出快速反应,以提供最新的业务洞察。在 Flink SQL 中,您可以使用标准 SQL 子句 MATCH_RECOGNIZE 轻松执行此类任务。

如何使用 MATCH_RECOGNIZE 的示例

在此示例中,您将使用 Flink SQL 和 MATCH_RECOGNIZE 来查找从高级级别之一降级其服务订阅的用户( type IN (‘premium’,’platinum’)) 到基本层。完整的 Flink SQL 查询源表(订阅)由 thefaker 连接器支持,它基于 Java Faker 表达式在内存中不断生成行。

 
CREATE TABLE 订阅 (
    ID STRING,
    user ID INT,
    输入 STRING,
    开始日期 TIMESTAMP(3),
    结束日期 TIMESTAMP(3),
    payment_expiration TIMESTAMP(3),
    proc_time AS PROCTIME()
)
WITH (
    'connector' = 'faker',
    'fields.id.expression' = '#{Internet.uuid}',
    'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
    'fields.type.expression' = '#{regexify ''(basic|premium|platinum){1}''}',
    'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
    'fields.end_date.expression' = '#{date.future ''15'',''DAYS''}',
    'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
);

SELECT *
FROM 订阅
MATCH_RECOGNIZE (
    PARTITION BY user ID
    ORDER BY proc_time
    MEASURES
        LAST(PREMIUM.type) AS premium_type,
        AVG(TIMESTAMPDIFF(DAY, PREMIUM.start_date, PREMIUM.end_date)) AS premium_avg_duration,
        BASIC.start_date AS downgrade_date
    AFTER MATCH SKIP TO LAST ROW
    PATTERN (PREMIUM+ BASIC)
    DEFINE
        PREMIUM AS PREMIUM.type IN ('premium', 'platinum'),
        BASIC AS BASIC.type = 'basic'
) AS MR;

MATCH_RECOGNIZE 的输入参数将是基于订阅的行模式表。第一步,必须对输入行模式表应用逻辑分区和排序,以确保事件处理正确且具有确定性:

 
PARTITION BY user ID
ORDER BY proc_time

然后在 MEASURES 子句中定义 ORDER BY proc_timeOutputRow 模式列,可以将其视为 MATCH_RECOGNIZE 的 SELECT。如果您有兴趣获取与降级之前的最后一个事件关联的高级订阅类型,可以使用逻辑偏移运算符 LAST 获取它。降级日期可以从任何现有高级订阅事件之后的第一个基本订阅事件的开始日期推断出来。AFTER MATCH SKIP 子句指定在非空之后模式匹配恢复的位置找到 y 匹配项。

sql复制代码
AFTER MATCH SKIP TO LAST ROW

模式定义模式在 PATTERN 子句中使用行模式变量(即事件类型)和正则表达式来指定。这些变量还必须使用 DEFINE 子句与事件必须满足才能包含在模式中的匹配条件相关联。在这里,您有兴趣匹配一个或多个高级订阅事件 (PREMIUM+),后跟基本订阅事件 (BASIC)。您可以使用正则表达式语法来定义模式的结构。在这个例子中,PREMIUM+ 表示一个或多个连续的高级订阅事件,而 BASIC 表示一个基本订阅事件。

在 MEASURES 子句中,您定义了要在匹配后提取的数据。在这个查询中,您从匹配的最后一个 PREMIUM 事件中提取高级订阅类型(premium_type),从所有匹配的 PREMIUM 事件中计算平均持续时间(premium_avg_duration),以及降级的日期(downgrade_date)。

最终的查询将返回按用户 ID 分区的每个用户的匹配结果。这些结果显示了每个用户从高级订阅降级到基本订阅的情况,以及有关此过程的相关信息。

 

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627