通用的基于日志的增量检查点

自 Flink 1.16 发布以来,基于日志的通用增量检查点(本文简称 GIC)已成为生产就绪的功能。我们之前在题为“基于通用日志的增量检查点 I”的博客文章中讨论了 GIC 的基本概念和底层机制 [1]。在这篇博文中,我们旨在通过深入的实验和分析,全面分析 GIC 的优缺点。 概述 给大家简单介绍一下,Flink 中创建检查点分为两个阶段:同步阶段和异步阶段(sync 阶段和简称异步阶段)。在同步阶段,内存中的状态被刷新到磁盘,而在异步阶段,本地状态文件被上传到远程存储。如果每个任务都成功完成其异步阶段,则检查点成功完成。需要注意的是,对于具有大量任务和大状态的作业,异步阶段的持续时间决定了检查点的速度和稳定性。但是,在未启用 GIC 的情况下,异步阶段存在两个问题:要上传的文件大小严重依赖于 Flink 状态后端的实现,这意味着这些文件可能并不总是很小。异步阶段不会启动直到同步阶段完成。例如,我们以 RocksDB 为例,它是 Flink 中常用的状态后端之一。虽然Flink支持RocksDB增量检查点,但RocksDB的compaction会导致上传文件的大小波动较大。这是因为压缩可能会创建大量新文件,需要在下一个增量检查点上传。因此,某些任务最终可能会在异步阶段花费更多时间上传新生成的文件,这可能导致检查点完成时间更长。对于大量任务,某些任务在异步阶段花费更多时间的概率很高。对于第二个问题,如果没有 GIC,要上传的文件集要到同步阶段结束才准备好。因此,异步阶段无法在此之前开始,从而浪费了两个连续检查点之间的大部分时间。也就是说,异步阶段的文件上传是在没有GIC的情况下在短时间内发生的。如果上传的文件较大,文件上传会在短时间内占用大量的CPU和网络资源。GIC的引入就是为了解决上述问题,将状态后端物化过程与Flink检查点过程解耦。具体来说,状态更新不仅被插入到状态后端,而且还以仅附加模式存储为状态更改日志。状态后端定期拍摄快照并将快照保存到远程 DFS。该过程在 GIC 中表示为物化,并且独立于 Flink 作业的检查点过程。物化的间隔通常比检查点大得多,默认为 10 分钟。同时,状态变更日志不断上传到持久存储。在检查点中,只有尚未上传的部分状态更改日志需要强制刷新到DFS。这样,每个异步阶段上传的数据变得小而稳定。GIC 旨在提高检查点过程的速度和稳定性,其好处包括更稳定和更低的一次性接收器的端到端延迟,更少由于检查点持续时间更短,并且集群资源利用率更稳定,因此故障转移后可以重放数据。另一方面,GIC 稍微降低了最大处理能力并控制了资源消耗的增加(远程 DFS 存储)。本博客分析了上述GIC的收益和成本,并详细分享了相应的评估结果。2 优点。 2.1 更稳定、更低的端到端延迟端到端处理延迟是 Flink 作业的一个关键性能指标,它代表接收输入数据和产生结果之间的时间。较低的端到端处理延迟可以提高下游系统中的数据新鲜度。 Flink 的检查点过程实现了作业内的恰好一次语义。为了保证端到端的一次性语义,源需要支持重放,而宿需要支持事务。

只有在检查点完成后,事务接收器才能提交已提交的偏移量。因此,创建检查点的速度越快,事务接收器提交的频率就越高,从而提供更低的端到端延迟。

事务接收器的端到端延迟GIC 减少了端到端处理延迟并提高稳定性。如上所述,GIC 将状态后端的物化与检查点过程解耦,从而减轻了由于状态后端实现细节(例如压缩)而导致的上传文件时间不可预测的影响。而且,在c期间只需要上传少量的状态变更日志。检查点程序。这些增强功能加速了检查点的异步阶段,使检查点过程更快,从而降低端到端延迟和更好的数据新鲜度。2.2 故障转移后更少的数据重放如果接收器不支持事务,数据重放会导致向下游重复输出系统。如果下游系统不支持重复数据删除,这会导致数据正确性问题。数据重放引入了输出重复。当从故障转移中恢复时,Flink 会回滚到最新的完整检查点。如图2所示,重放的数据量是从最后一个检查点到作业失败时的数据量。由于 GIC 允许更快地完成检查点,因此减少了重放的数据量,这对于对数据重复敏感的业务(例如 BI 聚合)尤其有利,从而提高了数据质量。

使用 RocksDB 进行故障转移后的数据重放对比。 GIC2.3 更稳定地利用资源触发检查点可能会导致CPU和网络使用量的爆发,特别是对于状态较大的作业。我们以 RocksDB 为例,在没有启用 GIC 的情况下,每个检查点都会触发从内存到磁盘的刷新,从而产生 RocksDB 压缩。压缩通常会消耗大量 IO 和 CPU 资源,这就是为什么我们在检查点触发后立即看到 CPU 和 IO 指标出现峰值的原因。此外,所有实例的状态文件的上传(检查点的异步阶段)是相互重叠的。这会导致网络资源使用量在短时间内激增,有时会导致出站流量饱和,导致整个集群不稳定。

触发检查点后,所有实例几乎同时上传文件到DFS。对于状态较大的作业,CPU 使用率突发意味着可能需要预留更多的 CPU 资源,突发流量可能会导致检查点超时。即使现代容器技术提供隔离,集群方面的作业仍可能共享 CPU 和网络资源。因此,突发的CPU和网络使用可能会导致整个集群不稳定。

RocksDB增量检查点异步过程中的文件上传。GIC可以有效减少和稳定CPU和网络使用。一方面,GIC不断地将增量状态变更日志上传到DFS,从而均匀地分散文件上传工作。另一方面,GIC可以减少状态后端的物化频率,并且每个实例的物化是随机的。如图4所示,启用GIC后,不同实例的文件上传在整个物化间隔内均匀分布。 RocksDB 状态后端的物化变得低频且分散。

GIC 中 RocksDB 状态后端物化过程中的文件上传。 2.4 可预测的开销 双写对性能的影响可以忽略不计(2 ~ 3%)双写表示GIC需要同时将状态更新写入底层状态后端和状态变更日志。直观上,双写会给每个状态更新带来额外的处理开销。如图 5 所示,GIC 引入了持久短期日志 (DSTL) 来管理状态变更日志。状态变更日志以仅附加格式顺序批处理。根据评估结果,当网络带宽不是瓶颈时,双写对最大处理能力的影响可以忽略不计(2~3%)。图5:双写流程,使用RocksDB状态后端。可预测的开销网络使用和 DFS 存储启用 GIC 的远程 DFS 中存储的数据由两部分组成:底层状态后端的物化快照和 DSTL 中的状态更改日志。在相应的更改反映在物化快照中后,状态更改日志将被删除,并且不再被任何活动检查点引用。因此,理论上,DSTL 在实现完成之前就达到了最大值,但其相应的变更日志尚未清理。鉴于给定算子的状态更新通常具有规则模式,DSTL 的最大大小是可估计的。据此,我们可以调整参数来控制大小。例如,缩短实现间隔可以减小最大 DSTL 大小。如图 6 所示,物化部分在两次相邻的物化执行之间保持不变,而 DSTL 不断累积状态更改日志。 CHK-7 是从 t1 到 t8 的检查点中最大的,即 CHK-2 到 CHK-9,因为 CHK-7 是 MID-1 之后和 MID-2 之前的最后一个检查点,包括 t1 和 t7 之间的所有状态更改日志。图6:OP-1在检查点进程中使用完整检查点大小的DFS。值得一提的是,DFS的成本仅占整个作业计费的一小部分。例如,Amazon S3 的月费为 0.021 美元/GB [2]。也就是说,10 GB 的每月存储费用不到 0.21 美元,远远低于 CPU 和内存等计算资源的费用。额外的网络开销是由状态变更日志的持续上传引起的。然而,这种开销可以通过较低的实现频率来抵消。而且,检查点通常使用内网流量,成本也可以忽略不计。3 性能评估我们通过以下实验来说明其优点。和缺点。

3.1 安装版本:Flink 1.16,FRocksDB 6.20.3DFS:阿里云 OSSM 内存:任务管理器 1 核 4GB,作业管理器 1 核 4GB 部署模式:K8S 应用并行度:50 状态后端:RocksDB(启用增量检查点)检查点间隔: 1秒材料化间隔:3分钟选择两种广泛使用的状态类型:具有聚合数据的状态和具有原始数据的状态:聚合(具有聚合数据):通常,聚合作业基于聚合函数(例如min、max、 count等。本例中选择WordCount作为典型的聚合作业。在测试中,每个实例的状态大小约为 500 MB(中等大小状态),每个字 200 字节。 窗口(包含原始数据):窗口状态保留来自流的原始输入,并在其上滑动。在这种情况下,会选择滑动窗口作业。每个实例的状态大小约为 2 GB,每个记录长度为 100 字节。 3.2 更快的检查点 该实验使用已完成的检查点数量(表 1)和检查点持续时间(表 2)来衡量检查点完成的速度。他们的结果表明,GIC 可以大大加快检查点的完成速度。表 1 中启用 GIC 后,WordCount 作业完成的检查点数量增加了 4.23 倍,而滑动窗口作业则增加了近 40 倍。在表 2 中,启用 GIC 后,wordCount 和窗口作业的平均检查点持续时间分别下降了 89.7% 和 98.8%。对于状态大小比较大的滑动窗口作业,检查点持续时间可以从分钟减少到秒。表1:12小时内完成的检查点数量。GIC启用GIC禁用增加比例Wordcount1893636214.23次Window1158029438.39次表2:检查点持续时间与或不带 GIC.P50P99P99.9Wordcount-89.7%(10.21s -> 1.05s)-79.5%(16.08s -> 3.30s)-72.3%(17.76s -> 4.92s)Window-98.8%(129.47s -> 1.58s) )-98.8%(383.63s -> 4.47s)-98.8%(408.79s -> 4.96s)由于检查点异步阶段上传到DFS的数据量减少,检查点加速。

启用GIC后增量检查点的大小减少了95%以上。这是因为变更日志数据的不断写入和上传。因此,当检查点被触发时,只有尚未上传的状态变化(小于5MB)需要刷新并传输到DFS,这比没有GIC的大小要小得多。

wordCount的增量检查点大小。

更快的检查点会缩短支持事务的接收器的端到端延迟。事务接收器仅在检查点完成后才完成提交。因此,对于Exactly-once Sinks,GIC可以显着降低端到端延迟(从分钟到秒)。 3.3更稳定和可预测的检查点检查点持续时间(完成一个检查点的持续时间)的波动范围用于衡量稳定性本实验中的检查点。启用 GIC 后,wordCount 和窗口作业的检查点持续时间保持在 5 秒以内。如果没有 GIC,久期的波动范围就会更大。在滑动窗口的情况下,范围超过100秒,极端情况下超过400秒。

检查点稳定性提高的主要原因是解耦检查点过程中状态后端的具体化。本次实验使用RocksDB。增量检查点大小取决于RocksDB快照机制,RocksDB内部compaction会影响RocksDB快照生成的增量文件大小,导致上传时间变化范围较大。 GIC 只需要在异步阶段上传增量状态变更日志,有效避免了 RocksDB Compaction 的负面影响,使得检查点持续时间在小范围内波动。

3.4 故障转移后较少的数据重播我们使用 Sources 的 lag 来粗略估计数据量故障转移后重播的数据,由 Kafka Source 中的 currentEmitEventTimeLag[3] 测量。在这种情况下使用相同的滑动窗口作业,并通过在源中注入特殊记录来触发失败。如图11所示,使用GI后数据回放时间从180秒减少到105秒C 启用,减少了 41.7%。图 11:窗口作业中 Kafka 源的 CurrentEmitEventTimeLagspan> 指标。故障转移后源的延迟也受到恢复速度的影响,所以让我们看看这部分进行相同的实验。表 3 显示了 P99 在恢复上花费的时间。启用 GIC 后,需要额外 16 秒才能重新应用状态更改日志。由于启用了本地恢复,因此节省了状态下载所花费的时间。将表3所示的结果与图11所示的源滞后相结合,我们可以得出结论,重放的数据量显着减少。数据重放会影响非事务性接收器的数据重复。

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627