在 Flink SQL 中加入高度倾斜的流

Flink SQL 是一种功能强大的工具,可用于批处理和流式处理。它提供低代码数据分析,并符合 SQL 标准。在生产系统中,我们的客户发现,随着工作负载的扩展,以前运行良好的 SQL 作业可能会显著减慢,甚至失败。数据倾斜是一个常见且重要的原因。

数据倾斜是指变量的概率分布关于其均值的不对称性。换句话说,数据在某些属性上分布不均匀。本文讨论并分析了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。如果您是该领域的新手,或者有兴趣了解更多有关 Flink 或 Flink SQL 的信息,请查看末尾的相关信息。

加入具有键偏差的流

考虑以下场景:我们有一个 Users 表,其中包含有关某些业务应用程序的用户的信息。我们有一个 GenOrders 表,其中包含有关订单(买/卖东西)的信息。我们想知道 Users 表中每个用户的订单数。

(简化的)查询可能如下所示:

SQL
SELECT o.uid, COUNT(o.oid)
FROM GenOrders o
JOIN Users u ON o.uid = u.uid
GROUP BY o.uid;

在流连接的上下文中,了解两个表代表连续的信息流非常重要。在 Flink 中,聚合(例如 COUNT 和 SUM)是由聚合算子执行的。聚合运算符是有状态的,它将中间聚合结果存储在其状态中。默认情况下,流聚合运算符会逐条处理输入记录。当一条记录进来时,操作员会执行以下步骤:

  1. 从状态中检索累加器。
  2. 将记录累加/收回到累加器。
  3. 将累加器存储回状态。

每次读取状态的/写入是有一定成本的。

数据倾斜

在大规模的 Flink 应用中,流通常会根据特定的 key 来划分,并分发到多个任务中进行并行处理。我们将这样的 key 称为“分组 key”。如果记录分布不均匀,则某些任务会比其他任务重。而且较重的任务需要更长的时间才能完成,并且可能成为数据管道的瓶颈。当这种情况发生时,我们说数据出现了偏差。此外,如果数据是基于某些键分布的,我们将其称为“键倾斜”。

对于上述用例,我们可以按用户 ID 将记录分布到聚合任务以进行并行处理。由于我们要查找每个用户的订单数,因此按用户 ID 对数据进行分组是有意义的。下图使用颜色来指示不同用户的数据记录。我们可以看到,Red 用户有 8 条记录,比其他用户多得多。在这种情况下,我们称数据在用户 ID 上存在偏差。如果我们按照用户 ID 分配数据,那么处理红色记录的顶级聚合任务会处理更多的数据,并且可能会比其他任务花费更长的时间。

如何处理它?

MiniBatch 聚合

MiniBatch 聚合将输入记录放入缓冲区并在缓冲区满后或一段时间后执行聚合操作。这样,缓冲区中具有相同 key 值(键值)的记录会一起处理,因此每批每个键值只有一次状态写入。小批量聚合提高了吞吐量,因为聚合运算符的状态访问次数较少,并且输出较少的记录,特别是当有撤回消息且下游算子的性能不佳时。下图演示了这一点。

我们可以使用以下选项来实现:

Properties
table.exec.mini-batch.enabled: true # 启用 mini-批
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000 # [可选] MiniBatch 可以缓冲的最大输入记录数

  • 增量聚合状态

由于明显的键偏差,部分聚合可能会变得很大。一种解决方案是采用三个阶段的聚合。首先,每个上游任务对每个不同的键值进行本地聚合。不维护本地聚合的状态。这些结果按照分组键和桶键进行划分,并发送到第二阶段聚合(称为增量聚合)。增量聚合仅将不同的键存储在其状态中。增量聚合的结果按照分组键进行划分,并发送到第三阶段聚合(最终聚合),从而保持聚合函数值的状态。这种技术称为增量聚合。

我们可以使用以下选项可利用增量聚合以及小批量聚合、本地/全局聚合和拆分不同聚合:

Properties
table.exec.mini-batch.enabled: true # 启用小批量
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000
table.optimizer.agg-phase-strategy: TWO_PHASE/AUTO # 启用本地/全局聚合
table.optimizer.distinct-agg.split.enabled: true # 启用拆分不同聚合
table.optimizer.distinct-agg.split.bucket-num: 1024 # 桶数
table.optimizer.incremental-agg-enabled: true # 启用增量聚合,默认为true

注意:查询也必须支持 MiniBatch 聚合、Local/Global 聚合和 Split Distinct 聚合。

  • 结论

在本文中,我们讨论了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。我们讨论了三种不同的解决方案:MiniBatch 聚合、本地/全局聚合和增量聚合。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627