gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

年度归档2023

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 2023
  • ( 页面6 )
Flink 8月 13,2023

Flink SQL:查询、窗口和时间 – 第 2 部分

如何创建时间窗口

在上一篇文章中,我们讨论了创建时间窗口的必要性以及如何选择时间窗口的长度。在本文中,我们将更深入地了解如何创建时间窗口。

时间窗口 是收集数据的一段时间段。时间窗口的长度将取决于所收集的数据类型和研究的目的。例如,如果您有兴趣研究新产品对消费者行为的影响,则需要收集与有兴趣研究新营销活动对销售影响的消费者行为相比更长的时间段的数据。

在选择适合您案例的时间窗口时,您应该考虑以下属性:

  • 数据的频率:如果每天、每周、每月等收集数据。
  • 数据的季节性:如果数据受季节性影响(例如,销售数据通常在假期期间更高),您需要在选择时间窗口时考虑这一点。
  • 数据的稳定性:如果数据不稳定(例如,股票价格),您需要在选择时间窗口时考虑这一点。
  • 时间窗口的长度:时间窗口的长度将取决于所收集的数据类型和研究的目的。

一旦您考虑了数据的属性,您就可以为您的案例选择最合适的时间窗口。

如何创建时间窗口

有几种不同方法可以创建时间窗口。最常见的方法是使用滚动窗口、滑动窗口 或固定窗口。

  • 滚动窗口:滚动窗口是一组连续的时间段。每个时间段都会随着新数据的到来而向前移动。
  • 滑动窗口:滑动窗口是一组重叠的时间段。每个时间段都会随着新数据的到来而向前移动,但不会覆盖之前的时间段。
  • 固定窗口:固定窗口是一组大小相同的时间段。这些窗口不会随着新数据的到来而移动。

哪种方法最适合您将取决于您的特定需求。例如,如果您想跟踪一段时间内的趋势,则滚动窗口可能是最好的选择。如果您想跟踪一段时间内的变化,则滑动窗口可能是最好的选择。如果您想跟踪一段时间内的固定间隔的数据,则固定窗口可能是最好的选择。

如何使用时间窗口

一旦您创建了时间窗口,您就可以使用它来聚合数据或运行查询。例如,您可以使用时间窗口来计算一段时间内的平均值、最大值或最小值。您还可以使用时间窗口来运行查询以查找特定时间段内的数据。

使用时间窗口的示例

以下是使用时间窗口的示例:

  • 计算一段时间内的平均值:您可以使用时间窗口来计算一段时间内的平均值。例如,您可以使用滚动窗口来计算过去 1 分钟内的平均温度。
  • 查找特定时间段内的数据:您可以使用时间窗口来查找特定时间段内的数据。例如,您可以使用滑动窗口来查找过去 1 小时内发生的所有交易。

结论

时间窗口是用于聚合数据或运行查询的强大工具。了解如何创建和使用时间窗口可以帮助您更好地分析您的数据。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 13,2023

Flink-Kafka连接器的流模式

介绍

这篇博文将介绍 Flink Table API 中提供的 Kafka 连接器。读完这篇博文,您将更好地了解哪种连接器更适合特定的应用程序。

Flink DataStream API 中的 Kafka 连接器

Flink DataStream API 提供了一个 Kafka 连接器,它工作在附加模式下,可以被您用 Scala/Java API 编写的 Flink 程序使用。除了这个,Flink 的 Table API 还提供了两种 Kafka 连接器:

  • Kafka-unboundedsource,对sink使用“append 模式”
  • Upsert Kafka-unboundedsource,对sink使用“upsert 模式”

这篇博文将专注于用于 Table API 的 Kafka 连接器。我还将尝试回答何时使用 Kafka 连接器(追加)或选择 Upsert Kafka 连接器的问题。

简单的 Kafka 连接器 – 追加模式

以下示例是将数据从内存数据流复制到输出 Kafka 主题。在生产场景中,输入数据可以丰富或聚合,但我们将保持这个示例简单,以展示 Flink 在使用第一个 Kafka 连接器时的行为。

首先,创建一个表,其中包含订单作为流数据的来源,这些数据是由数据生成连接器提供的:

CREATE TABLE `orders` (
`id` INT,
`bid` DOUBLE,
`order_time` AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND() * -3 + 5) * -1 AS INTEGER), CURRENT_TIMESTAMP)
)
WITH (
'connector' = 'datagen',
'fields.id.kind' = 'random',
'fields.id.max' = '100',
'fields.id.min' = '1',
'每秒行数' = '100'
);

然后,使用 Kafka 连接器创建一个输出表作为接收器来存储输入数据:

CREATE TABLE `orders_sink_append` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3)
)
WITH (
'connector' = 'kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_sink_append',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'order-receiver-append',
'值.格式' = 'csv'
);

要运行本文中的所有 Flink 代码示例,您需要使用 Ververica Platform (VVP),它可以在任何 Kubernetes 集群上轻松安装:

  • VVP 文档:安装在 Google Kubernetes Engine 上开始使用 Ververica Platform
  • 在 Azure Kubernetes 服务上开始使用 Ververica Platform
  • 在 AWS EKS 上开始使用 Ververica Platform

执行上述表 DDL 以在 VVP 的内置目录中注册新表。这可以通过打开 VVP -> SQL -> 编辑器窗口来完成。然后选择每个“CREATE TABLE … ;”单独声明并单击右侧的“运行选择”。

现在我们可以使用以下 SQL 脚本在 VVP 中创建并启动 Flink SQL 部署。它将生成的数据流连续存储到带有 Kafka 连接器的 Kafka 主题中,即以追加模式运行。

选择 SQL 查询并单击“运行选择”来运行下面的 SQL 查询:

INSERT INTO `orders_sink_append` SELECT * FROM `orders`;

VVP 将引导您完成新的 VVP 部署过程。只需遵循它并单击“开始”按钮即可。

以下是从上面的 SQL 查询创建的 VVP 部署的概述:

Kafka 连接器 – Upsert 模式

让我们看看另一个连接器以及它的不同之处。输入表的定义保持不变,但接收器连接器设置为“upsert-kafka”。为了清楚起见,让我们使用“upsert-kafka”连接器创建一个克隆表。

CREATE TABLE `orders_sink_upserts` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3),
`PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_upserts',
'properties.group.id' = 'order-upserts-consumers',
'值.格式' = 'csv'
);

与上一节类似,我们创建另一个 VVP 部署将数据存储到表 orders_sink_upserts 中,使用“upsert-kafka”连接器和以下 SQL 语句:

INSERT INTO `orders_sink_upserts` SELECT * FROM `orders`;

VVP 部署的概述和作业图看起来与以前一样:

Flink Job 图的拓扑保持不变:

让我们检查 orders_sink_upserts 主题/表的输出:

SELECT * FROM `orders_sink_upserts`;

您可以看到 VVP SQL 编辑器会话 i 显示 100 个插入 (-I),然后其余更改是更新 (+U、-U)。datagen 中配置了 100 个唯一的订单 ID。这就是为什么仅在此处获取 100 条插入的原因,其余所有都是对这 100 个唯一订单的更新。

当您使用 Kafka 支持的 SQL 表时,这是两种流模式“append”和“upsert”之间的主要区别。Upsert 模式可以轻松获取最新更改或了解流数据是否是新的或是否应视为更新或删除。当特定键的任何值为 NULL 时,就会检测到删除。

“upsert-kafka”如何检测 upsert?

首先,任何使用“upsert-kafka”连接器的表都必须有一个主键。在上面的示例中,它是:

PRIMARY KEY (`id`) NOT ENFORCED

您还可以看到,Flink 在使用“upsert-kafka”表中的数据时又注入了一个运算符“ChangeLogNormalize”。注入的运算符聚合输入数据并返回特定主键的最新记录。

下面是另一个 VVP 部署来展示这一点。它将 upsert 表中的数据打印到标准输出:

CREATE TEMPORARY TABLE SinkTable WITH (‘connector’ = ‘print’) LIKE orders_sink_upserts (EXCLUDING OPTIONS);

INSERT INTO `SinkTable` SELECT * FROM `orders_sink_upserts`;

相反,如果从使用 append 模式工作的 orders_sink_append 读取数据,Flink 不会将 ChangelogNormalize 操作符注入到作业图中:

CREATE TEMPORARY TABLE `SinkTable`
WITH ('connector' = 'print')
LIKE `orders_sink_append`
(EXCLUDING OPTIONS);
INSERT INTO `SinkTable` SELECT * FROM `orders_sink_append`;

连接表:upsert 与追加模式

当多个表连接在一起时,两种不同的流模式会产生很大的差异。这种差异可能会导致数据重复。以下示例展示了如何在连接由 Kafka 主题支持的两个 Flink 表时避免数据重复。

以下是一个关于出租车运行的 Flink SQL 作业示例。我们有一个汽车注册表,每个汽车在第一个表中都有一个“蓝色”或“黑色”类别。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 12,2023

在 Flink SQL 中加入高度倾斜的流

Flink SQL 是一种功能强大的工具,可用于批处理和流式处理。它提供低代码数据分析,并符合 SQL 标准。在生产系统中,我们的客户发现,随着工作负载的扩展,以前运行良好的 SQL 作业可能会显著减慢,甚至失败。数据倾斜是一个常见且重要的原因。

数据倾斜是指变量的概率分布关于其均值的不对称性。换句话说,数据在某些属性上分布不均匀。本文讨论并分析了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。如果您是该领域的新手,或者有兴趣了解更多有关 Flink 或 Flink SQL 的信息,请查看末尾的相关信息。

加入具有键偏差的流

考虑以下场景:我们有一个 Users 表,其中包含有关某些业务应用程序的用户的信息。我们有一个 GenOrders 表,其中包含有关订单(买/卖东西)的信息。我们想知道 Users 表中每个用户的订单数。

(简化的)查询可能如下所示:

SQL
SELECT o.uid, COUNT(o.oid)
FROM GenOrders o
JOIN Users u ON o.uid = u.uid
GROUP BY o.uid;

在流连接的上下文中,了解两个表代表连续的信息流非常重要。在 Flink 中,聚合(例如 COUNT 和 SUM)是由聚合算子执行的。聚合运算符是有状态的,它将中间聚合结果存储在其状态中。默认情况下,流聚合运算符会逐条处理输入记录。当一条记录进来时,操作员会执行以下步骤:

  1. 从状态中检索累加器。
  2. 将记录累加/收回到累加器。
  3. 将累加器存储回状态。

每次读取状态的/写入是有一定成本的。

数据倾斜

在大规模的 Flink 应用中,流通常会根据特定的 key 来划分,并分发到多个任务中进行并行处理。我们将这样的 key 称为“分组 key”。如果记录分布不均匀,则某些任务会比其他任务重。而且较重的任务需要更长的时间才能完成,并且可能成为数据管道的瓶颈。当这种情况发生时,我们说数据出现了偏差。此外,如果数据是基于某些键分布的,我们将其称为“键倾斜”。

对于上述用例,我们可以按用户 ID 将记录分布到聚合任务以进行并行处理。由于我们要查找每个用户的订单数,因此按用户 ID 对数据进行分组是有意义的。下图使用颜色来指示不同用户的数据记录。我们可以看到,Red 用户有 8 条记录,比其他用户多得多。在这种情况下,我们称数据在用户 ID 上存在偏差。如果我们按照用户 ID 分配数据,那么处理红色记录的顶级聚合任务会处理更多的数据,并且可能会比其他任务花费更长的时间。

如何处理它?

MiniBatch 聚合

MiniBatch 聚合将输入记录放入缓冲区并在缓冲区满后或一段时间后执行聚合操作。这样,缓冲区中具有相同 key 值(键值)的记录会一起处理,因此每批每个键值只有一次状态写入。小批量聚合提高了吞吐量,因为聚合运算符的状态访问次数较少,并且输出较少的记录,特别是当有撤回消息且下游算子的性能不佳时。下图演示了这一点。

我们可以使用以下选项来实现:

Properties
table.exec.mini-batch.enabled: true # 启用 mini-批
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000 # [可选] MiniBatch 可以缓冲的最大输入记录数

  • 增量聚合状态

由于明显的键偏差,部分聚合可能会变得很大。一种解决方案是采用三个阶段的聚合。首先,每个上游任务对每个不同的键值进行本地聚合。不维护本地聚合的状态。这些结果按照分组键和桶键进行划分,并发送到第二阶段聚合(称为增量聚合)。增量聚合仅将不同的键存储在其状态中。增量聚合的结果按照分组键进行划分,并发送到第三阶段聚合(最终聚合),从而保持聚合函数值的状态。这种技术称为增量聚合。

我们可以使用以下选项可利用增量聚合以及小批量聚合、本地/全局聚合和拆分不同聚合:

Properties
table.exec.mini-batch.enabled: true # 启用小批量
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000
table.optimizer.agg-phase-strategy: TWO_PHASE/AUTO # 启用本地/全局聚合
table.optimizer.distinct-agg.split.enabled: true # 启用拆分不同聚合
table.optimizer.distinct-agg.split.bucket-num: 1024 # 桶数
table.optimizer.incremental-agg-enabled: true # 启用增量聚合,默认为true

注意:查询也必须支持 MiniBatch 聚合、Local/Global 聚合和 Split Distinct 聚合。

  • 结论

在本文中,我们讨论了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。我们讨论了三种不同的解决方案:MiniBatch 聚合、本地/全局聚合和增量聚合。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
CDH 8月 8,2023

修复HDFS JournalNode和NameNode硬盘损坏

HDFS JournalNode和NameNode是HDFS文件系统的关键组件。JournalNode负责记录NameNode的所有操作,NameNode负责管理HDFS文件系统中的所有数据。如果JournalNode或NameNode的硬盘损坏,可能会导致HDFS文件系统不可用。

如果部署了Cloudera的HDFS HA,那么在更换了新硬盘后,重启JournalNode会出现”JournalNotFormattedException: Journal Storage Directory /opt/dfs/jn/nameservice1 not formatted”这样的错误。原因是在新的硬盘上的jn目录下没有VERSION文件用来恢复。

解决方法是重新创建对应的目录结构,并拷贝其他JournalNode上的VERSION文件。之后重启JournalNode,会自动进行目录的初始化,并自动同步日志。

以下是修复HDFS JournalNode硬盘损坏的步骤:

  • 重新创建对应的目录结构。

mkdir -p /opt/dfs/jn/nameservice1/current/

  • 拷贝其他JournalNode上的VERSION文件。

scp 目标服务器:/opt/dfs/jn/nameservice1/current/VERSION /opt/dfs/jn/nameservice1/current/VERSION

(3)更改JournalNode的权限。

chown -R hdfs:hdfs /opt/dfs/jn

重启JournalNode。

  • service hadoop-journalnode restart

以下是修复HDFS NameNode硬盘损坏的步骤:

如果启动NameNode出现

We expected txid 266899638, but got txid 267088002.

方法一:可以运行以下命令进行修复:

hadoop namenode -recover

方法二:如果上面的命令出错,如果NameNode是HA的,首先从健康节点的NameNode拷贝到出故障的NameNode,然后重启看是否正常。

如果仍然无法修复,可以联系Cloudera支持获取帮助。

以下是一些额外的提示:

定期备份HDFS JournalNode和NameNode的硬盘。

使用高可用性(HA)配置HDFS JournalNode和NameNode。

监控HDFS JournalNode和NameNode的健康状况。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 7,2023

Flink SQL:查询、窗口和时间 – 第 1 部分

时间是流处理中的一个关键因素,因为数据在到达时就被处理,并且必须快速处理以避免延迟。流处理中时间的普遍性意味着数据处理的设计必须考虑到时间因素。基于时间的窗口是流处理中常用的技术,用于确保数据得到及时处理。流处理可用于各种应用程序,例如监控系统、欺诈检测以及任何想要提供实时数据的应用程序。 -时间洞察。流处理中普遍存在的时间给数据处理带来了挑战和机遇。通过正确的设计,流处理可用于提供对数据流的实时洞察。在这篇文章中,我们将了解在使用 Flink SQL 时如何考虑时间。

时间戳和查询

在流处理中,时间戳是用于记录事件发生的时间。此信息可用于确定处理事件所需的时间,或监视流处理系统的性能。时间戳还可以用于对同时发生的事件进行排序。

示例:

  • 用户交互:点击
  • 应用程序日志:应用程序
  • 机器事务:信用卡、广告服务
  • 传感器:手机、汽车、物联网

流处理中涉及时间的查询通常是用于分析一段时间内的数据。这可能涉及查找数据中的趋势或模式,或比较不同时间段的数据。流处理系统通常提供对数据加窗的方法,以便仅考虑特定时间段的数据。这使得可以对传入的数据进行实时分析,或者对历史数据进行分析。

示例:

  • 最后一分钟的平均值
  • 使用最新汇率加入
  • 在 5 分钟内尝试 3 次失败后发出警报

时间属性

流处理中有多种不同的时间属性。它们是事件时间、处理时间和摄取时间。

  • 事件时间是事件发生时的时间戳。
  • 处理时间是处理事件的时间戳。
  • 摄取时间是事件被摄取到系统中的时间戳。

事件时间是唯一完全由用户控制的时间属性。所有其他时间属性均由系统控制。

事件时间允许用户控制事件发生的时间,这在某些情况下非常重要。处理时间可能会受到系统速度的影响。在某些情况下,系统可能会很慢并且处理时间可能会延迟。在其他情况下,系统可能很快,处理时间可能比预期早。

摄取时间完全不受用户控制。提取时间由系统控制,取决于系统的速度。

事件时间属性

事件时间属性是带有关联水印的 TIMESTAMP 或 TIMESTAMP_LTZ。水印使用有界无序水印策略。

  • TIMESTAMP 是一种记录精确到小数秒的日期和时间的数据类型,而 TIMESTAMP_LTZ(本地时区)是一种存储日期、时间和本地时区的数据类型。

访问 Flink 官方文档网站了解更多信息。

CREATE TABLE clicks (
  user STRING,
  url STRING,
  cTime TIMESTAMP_LTZ(3),
  cTime WITH WATERMARK AS cTime - INTERVAL '2' MINUTES
)

处理时间属性

处理时间属性是一个计算列,不保存数据;每当访问该属性时都会查询本地计算机时间。处理时间属性可以像常规 TIMESTAMP_LTZ 一样使用。

CREATE TABLE clicks (
  user STRING,
  url STRING,
  cTime AS PROCTIME()
)

随着时间推移,状态可能会过期,例如,在长达一小时的窗口中对每个用户的点击进行计数。

  • 事件时间 windows:clicks 将计入其发生的小时。水印触发关闭窗口并丢弃其状态。
  • 处理时间 windows:clicks 将计入当前时间处理时的小时。本地系统时钟触发关闭窗口并丢弃其状态。

如果输入表中没有时间属性,窗口操作员将不知道窗口何时完成。

时间戳与时间属性

在流处理中有两种表示时间的常见方法:时间戳和时间属性。时间戳是与事件关联的时间点,而时间属性可以存在于每个表模式中。时间戳更精确,但时间属性更灵活,可用于表示复杂的时间关系。

时态运算符

时态运算符是流处理中处理基于时间的数据的一种方法。有几种不同类型的时态运算符:

  • 窗口:窗口是数据的集合,在特定的时间范围内被处理。窗口可以是滚动的、跳跃的,或者会话的。
  • 聚合:聚合是对数据的集合进行操作,以产生单个值。聚合可以是平均值、计数或总和等。
  • 连接:连接是将两个数据集合结合起来,以便可以根据它们的时间属性进行比较。连接可以是基于事件时间、处理时间或摄取时间的。
  • 模式匹配:模式匹配是对数据的集合进行操作,以查找满足特定模式的数据。模式可以是简单的,如连续的数字,也可以是复杂的,如识别欺诈交易的模式。

时态运算符及时跟踪进度,以确定输入何时完成。它们发出无法更新的最终结果行,并且能够丢弃不再需要的状态(记录和结果)。

窗口示例

窗口是流处理中处理基于时间的数据的一种常见方法。有几种不同类型的窗口,每个窗口都有其优点和缺点。

  • 滚动窗口:滚动窗口是数据的集合,在特定的时间范围内被处理。滚动窗口随着时间的推移而移动,因此每次窗口中的数据都不同。滚动窗口适用于需要实时分析的数据。
  • 跳跃窗口:跳跃窗口是数据的集合,在特定的时间间隔内被处理。跳跃窗口不随着时间的推移而移动,因此每次窗口中的数据相同。跳跃窗口适用于需要每隔一段时间分析的数据。
  • 会话窗口:会话窗口是数据的集合,在没有活动的情况下保持打开状态。会话窗口适用于需要分析数据流中的活动的数据。

窗口表值函数

窗口表值函数 (WTVF) 是一种特殊函数,可用于从窗口中返回数据。WTVF 可以用于聚合窗口数据、计算窗口统计信息或查找模式。

结论

在流处理中,时间是一个关键因素。时间属性和时态运算符是流处理中处理基于时间的数据的强大工具。通过正确的设计,可以使用这些工具来提供对数据流的实时洞察。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 7,2023

Flink SQL:Join 系列 3(Lateral Joins、LAG 聚合函数)

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。

什么是横向连接?

横向连接是一种 SQL 连接类型,允许您在 FROM 子句中指定子查询。然后针对外部查询中的每一行执行该子查询。横向联接可以通过减少表扫描次数来提高 SQL 查询的性能。换句话说,您可以将横向联接视为 SQL 中的 foreach 循环,它迭代集合,在每次迭代上应用一些转换,并且产生输出。横向联接在处理以分层或嵌套格式存储的数据时非常有用。

如何执行横向表联接

此示例将展示如何使用横向联接关联事件。给定一个包含人员地址的表,您需要找到每个州有两个人口最多的城市,并随着人们的流动而不断更新这些排名。

首先,使用连续聚合来计算每个城市的人口。虽然这很简单,但当人们移动时,Flink SQL 的真正威力就会显现出来。通过使用重复数据删除,当一个人搬家时,Flink 会自动为他们的旧城市发出撤回请求。因此,如果约翰从纽约搬到洛杉矶,纽约的人口将自动减少 1。这为我们提供了变更数据捕获的能力,而无需投资于设置它的实际基础设施!

有了这种动态手头有填充表后,您就可以使用 LATERAL 表连接来解决原始问题。与普通联接不同,横向联接允许子查询与 FROM 子句中其他参数的列相关联。与常规子查询不同,作为联接,横向可以返回多行。

sql复制代码
CREATE TABLE People (
    id INTEGER,
    city STRING,
    state STRING,
    arrival_time TIMESTAMP(3),
    arrival_watermark AS arrival_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'fake',
    'fields.id.expression' = '#{number.numberBetween ''1'',''100''}',
    'fields.city.expression' = '#{regexify ''(New York|Newport|Port|Shoesfort|Springfield){1}''}',
    'fields.state.expression' = '#{regexify ''(New York|Illinois|California|Washington){1}''}',
    'fields.arrival_time.expression' = '#{date.past ''15'',''seconds''}',
    'rows-per-second' = '10'
);
sql复制代码
CREATE TEMPORARY VIEW current_population AS
SELECT
    city,
    state,
    COUNT(*) AS population
FROM (
    SELECT
        city,
        state,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
    FROM People
) WHERE rownum = 1
GROUP BY city, state;
sql复制代码
SELECT
    state,
    city,
    population
FROM
    (SELECT DISTINCT state FROM current_population) states,
    LATERAL (
        SELECT city, population
        FROM current_population
        WHERE state = states.state
        ORDER BY population DESC
        LIMIT 2
    );

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
CDH 8月 7,2023

CDH启动namenode报错:java.io.IOException: NameNode is not formatted. 请分析原因和给解决方案

CDH的磁盘发生故障,磁盘恢复数据后,重启namenode发生错误:java.io.IOException: NameNode is not formatted.

这个错误说明你的Hadoop NameNode未被正确格式化。当/opt目录故障后重新恢复,可能导致NameNode元数据丢失或损坏,从而导致该错误。

原因:

  • 故障导致Hadoop NameNode的元数据损坏或丢失。
  • 在故障恢复期间可能没有正确备份Hadoop的/opt目录。

解决方案: 为了解决这个问题,你可以采取以下步骤来重新格式化Hadoop NameNode:

  1. 备份数据: 在执行任何恢复操作之前,确保你有/opt目录的备份。如果有最近的备份,可以尝试恢复备份数据。
  2. 格式化NameNode: 格式化NameNode将删除现有的元数据,并创建一个新的空的Hadoop文件系统。请注意,这将清空之前的Hadoop数据,包括HDFS中的所有文件和目录。在启动Hadoop之前,使用以下命令格式化NameNode:bash复制代码hdfs namenode -format
  3. 启动Hadoop: 在成功格式化NameNode后,启动Hadoop集群。bash复制代码start-dfs.sh start-yarn.sh
  4. 验证: 确保Hadoop集群正常运行,并且你的应用程序能够正确访问和使用HDFS中的数据。

在重新格式化NameNode之前,请确保备份你认为重要的数据,因为这将清除所有现有的数据。这个操作是不可逆的。同时,还要检查/opt目录故障的根本原因,以避免类似问题的再次发生。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
CDH 8月 7,2023

启动CDH的zookeeper报错: org.apache.zookeeper.server.persistence.FileTxnSnapLog$DatadirException: Missing data directory /opt/zookeeper/version-2, automatic data directory creation is disabled

CDH的磁盘发生故障,磁盘恢复数据后,重启Zookeeper发生错误:Missing data directory /opt/zookeeper/version-2, automatic data directory creation is disabled

这个错误是由于ZooKeeper没有找到指定的数据目录 /opt/zookeeper/version-2 导致的,而且自动数据目录创建已被禁用。

为了解决这个问题,你可以采取以下步骤:

  1. 检查目录路径: 首先,确保 /opt/zookeeper/version-2 目录路径是正确的,并且在该路径下确实存在 ZooKeeper 数据文件。
  2. 手动创建数据目录: 如果该目录不存在,你可以手动创建它,然后将之前的 ZooKeeper 数据文件移到这个目录下。
  3. 启用自动数据目录创建: 如果你希望 ZooKeeper 能够自动创建数据目录,可以修改 ZooKeeper 配置文件,将 autopurge.snapRetainCount 和 autopurge.purgeInterval 设置为适当的值,以启用自动清理和数据目录创建。示例如下:复制代码autopurge.snapRetainCount=3 autopurge.purgeInterval=1

确保在修改配置文件后重新启动 ZooKeeper 服务。

如果你仍然遇到问题,可能需要检查 ZooKeeper 配置和文件权限,确保其设置正确,并且服务具有足够的权限来访问指定的数据目录。

作者 east
Flink 8月 6,2023

混合 Shuffle 模式的性能分析和调优指南

概述

Apache Flink 社区在 Flink 1.16 中引入了 Hybrid Shuffle 模式,它将传统的 Batch Shuffle 与流处理中的 Pipelined Shuffle 结合起来,赋予 Flink 批处理更强大的能力。

Hybrid Shuffle 的核心思想是打破调度约束,根据资源的可用性来决定是否需要调度下游任务,同时在条件允许的情况下支持内存中的数据交换而不溢出到磁盘。

我们基于 Flink 1.17 在多个场景下对 Hybrid Shuffle 进行了评估。本文将根据评测结果详细分析 Hybrid Shuffle 的优势,并根据我们的经验提供一些调优指南。

Hybrid Shuffle 的优势

与传统的 Batch Blocking Shuffle 相比,Hybrid Shuffle 主要有以下优势:

  • 灵活的调度:Hybrid Shuffle 打破了 Pipelined Shuffle 模式下所有任务必须同时调度,或者 Blocking Shuffle 模式下必须分段调度的前提:当资源充足时,上下游任务可以同时运行。当资源不足时,下游任务可以批量执行。
  • 减少 IO 开销:Hybrid Shuffle 打破了批处理作业的所有数据都必须写入磁盘并从磁盘消费的约束。当上下游任务同时运行时,支持直接从内存中消费数据,在提升作业性能的同时,显着减少了磁盘 IO 的额外开销。

这些独特的优势让 Hybrid Shuffle 具备了传统 Blocking Shuffle 所缺乏的能力。为了验证其有效性,我们进行了一系列的实验和分析,主要分为以下几个方面:

  • 填补资源缺口:资源缺口是指作业执行过程中某些时间点出现的空闲任务槽位,即集群资源没有被充分利用。这种情况在 Flink Blocking Shuffle 中会出现,在某些任务存在数据倾斜的场景下尤为明显。下图是 Blocking Shuffle 和 Hybrid Shuffle 的对比。可以看到,Blocking Shuffle 中的两个任务槽位无法使用,而 Hybrid Shuffle 中的三个任务槽位都在使用中。

数据倾斜是一种普遍存在的现象。以 TPC-DS q4 为例,一个 HashJoin 算子平均读取 204 MB 数据,而一个倾斜任务读取多达 7.03 GB 数据。测试发现,Hybrid Shuffle 相比 Blocking Shuffle 减少了该查询的总执行时间 18.74%。

  • 减少磁盘负载:Flink Blocking Shuffle 将所有中间数据写入磁盘。因此,随机写入和随机读取阶段分别执行磁盘写入和读取操作。这带来了两个主要问题:
    • 磁盘 IO 负载增加,影响整个集群的吞吐量。随着集群上作业数量的增加,磁盘 IO 将成为瓶颈。
    • 大规模批量作业的 Shuffle 数据会占用相当大的磁盘存储空间,且大小难以估计。这个问题在以 Kubernetes 为代表的云原生环境中更为突出:配置值太小,会出现存储空间不足的情况;如果配置的值太大,会浪费存储资源,因为资源在大多数情况下是在 pod 级别隔离的。

Hybrid Shuffle 引入了两种数据溢出策略:

* 选择性溢出策略:当有数据溢出时,只将一部分数据溢出到磁盘,内存空间不足。该策略可以同时减少磁盘读写指令。
* 全溢出策略:所有中间数据都写入磁盘,但下游任务可以直接从内存中消费未释放的数据。该策略可以有效减少磁盘读取指令,同时还可以提高容错能力。

为了比较不同 shuffle 模式和溢出策略对磁盘 IO 负载的影响,我们进行了以下实验:

* 测试磁盘读取和写入的数据与磁盘 IO 负载的比例。
* 不同 shuffle 模式和溢出策略下的总数据。
* 测试 Hybrid Shuffle 选择性溢出策略下不同网络内存大小下磁盘读写数据占总数据的比例。

从实验结果可以看出:

* 与 Blocking Shuffle 相比,Hybrid Shuffle 大大减少了磁盘读取和写入的数据量。

Hybrid Shuffle 的调优指南

基于上面的分析和实验结果,我们总结了以下三个调优指南:

  • **使用 Hybrid Shuffle:**当作业存在数据倾斜或磁盘 I/O 负载高时,使用 Hybrid Shuffle 可以显著提高作业性能。
  • **降低算子的并行度:**当作业并行度设置过高时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以降低算子的并行度来提高作业性能。
  • **增加网络内存的大小:**当网络内存大小设置过小时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以增加网络内存的大小来提高作业性能。

结论

在本文中,我们主要研究了导致 Hybrid Shuffle 优越性能的因素。我们的研究包括对这些因素的综合实验评估和分析。此外,我们还提出了相应的增强实用性的优化指南:

  • **使用 Hybrid Shuffle:**当作业存在数据倾斜或磁盘 I/O 负载高时,使用 Hybrid Shuffle 可以显著提高作业性能。
  • **降低算子的并行度:**当作业并行度设置过高时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以降低算子的并行度来提高作业性能。
  • **增加网络内存的大小:**当网络内存大小设置过小时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以增加网络内存的大小来提高作业性能。

我们相信,Hybrid Shuffle 是 Apache Flink 批处理性能优化的一项重要技术。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
未分类 8月 6,2023

流处理可扩展性:挑战和解决方案

什么是流处理?

流处理是一种处理不断生成的数据的编程范例。它与批处理处理不同,批处理处理的数据是事先收集的并存储在数据库或文件中。流处理更适合处理需要实时处理的数据,例如来自传感器、社交媒体或交易系统的数据。

流处理系统擅长处理大量数据,并且可以实时处理数据,这使得它们非常适合实时分析和决策。例如,流处理系统可用于检测欺诈交易、跟踪客户行为或优化网络性能。

为什么选择流处理?

有许多原因为什么公司选择使用流处理。以下是一些最常见的原因:

  • **实时分析:**流处理允许公司实时分析数据,这对于检测欺诈、跟踪客户行为或优化网络性能等任务非常重要。
  • **决策:**流处理系统可用于实时做出决策,这对于需要快速响应瞬息万变的市场条件的公司非常重要。
  • **响应能力:**流处理系统可用于提高公司对变化的响应能力。例如,流处理系统可用于检测欺诈交易并阻止它们发生,或跟踪客户行为并优化营销活动。
  • **效率:**流处理系统可用于提高公司的效率。例如,流处理系统可用于优化网络性能,减少成本或提高客户服务水平。

流处理的挑战

流处理系统面临一些挑战,包括:

  • **数据量:**流处理系统需要能够处理大量数据。随着产生的数据量的增加,流处理系统变得更加复杂和昂贵。
  • **延迟:**流处理系统必须能够实时处理数据,这可能很困难。
  • **可扩展性:**流处理系统必须能够随着数据量的增加而扩展。
  • **容错性:**流处理系统必须能够处理故障。

流处理的未来

流处理是一项快速发展的技术,随着数据量的增加,它将变得越来越重要。流处理系统将继续发展,以满足企业的需求。未来,流处理系统将变得更加复杂、高效和可扩展。它们也将能够处理更大的数据量,具有更低的延迟。

  • 流处理系统通常使用以下步骤来处理数据:
    • 数据收集:数据来自各种来源,例如传感器、社交媒体和交易系统。
    • 数据预处理:数据在处理之前需要预处理,以使其易于处理。这可能包括数据清理、格式化和标准化。
    • 数据分析:数据分析的目的是从数据中提取洞察力。这可能包括统计分析、机器学习和自然语言处理。
    • 数据可视化:数据可视化是一种将数据以易于理解的方式显示给用户的过程。这可以帮助用户更好地理解数据并做出决策。
  • 流处理系统可用于多种应用,包括:
    • 欺诈检测
    • 客户行为分析
    • 网络性能优化
    • 实时建议
    • 工业控制
  • 流处理是一项强大的工具,可帮助企业从数据中提取洞察力并做出更好的决策。随着数据量的增加,流处理将变得越来越重要。
作者 east
Flink 8月 6,2023

减轻事件时间偏差如何减少检查点故障

简介

Apache Flink 提供对事件时间和有状态事件处理的支持,这使其与其他流处理器区分开来。但是,在各种源以不同速度随时间进展的情况下,使用事件时间可能会导致检查点失败。本文描述了导致这些问题的原因,如何检查您的 Flink 作业是否存在这种情况,并提出了缓解该问题的解决方案。

了解检查点

当定期触发检查点时,系统会生成关于 Flink 作业中所有有状态运算符的一致快照。这需要制定执行计划。有状态运算符的快照彼此对齐这一事实使其保持一致。此对齐过程从 Flink 在作业源的输入通道中注入所谓的检查点屏障开始。

了解水印

除了流中的常规数据事件之外,还有 Flink 注入流中的所谓“系统事件”。此类系统事件的示例包括检查点障碍(如上一段所述),以及水印。Apache Flink 使用水印来跟踪事件时间的进度。事件时间是从数据事件的字段之一中提取的,该字段包含最初创建该事件时的时间戳。通常,会生成水印并将其添加到源处的流中。重要的是要认识到,每个单独的源都会生成自己的水印并将其添加到流中。因此,在任何给定的时间点,系统中不只有一个事件时间,而是与源(的实例)一样多。

什么会导致事件时间偏差

事件时间偏差是操作符的各个实例的水印随着时间的推移彼此相距较远的结果。事件时间偏差的原因之一是当 Flink 作业需要使用来自具有不同特征的源的事件时。让我们以以下一组源为例:

  • 一个源每小时加载一个事件
  • 另一个源每秒加载 10K 个事件
  • 最后,具有不频繁的高突发事件的源

随着时间的推移,使用此源组合将导致各自的水印进展彼此显着不同。在 Flink 作业失败后,这种情况通常会被放大,作业需要赶上读取事件。

事件时间偏差的另一个原因是数据本身的分布不平衡。例如,某些键可能比其他键更频繁地出现,使得处理此类键的相应运算符只需进行更多处理,这可能会导致水印进展变慢。

由事件时间偏差引起的问题

许多 Flink 作业都使用与窗口相关的函数和/或键控操作,其中计时器在有状态运算符确定发出什么内容以及何时发出内容的过程中发挥着重要作用。这些计时器带有 anonTimer( …)仅当运算符的水印达到或超过这些计时器的时间戳时才调用的方法。考虑到此类操作符使用最低水位线来触发其计时器,很明显,事件时间偏度会导致许多副作用,即:

  • 待处理计时器的数量将会增加,与它们相关的资源量也会增加
  • 背压正在增加
  • 进度检查点屏障的速度会减慢,并且整个检查点过程将花费更长的时间,导致输入流被阻塞,从而进一步减慢事件的处理速度

这些副作用最终可能导致内存不足错误、检查点故障甚至作业崩溃。

减少事件时间偏差

由于事件时间偏差有多种原因,因此有多种解决方案可以应用。从 Flink 1.11 开始,对所谓的空闲检测提供了开箱即用的支持。如果一个主题在较长一段时间内不产生事件并因此导致没有水印进展,则该主题被称为空闲。 Flink 提供的解决方案是,当在可配置的时间段内没有接收到任何事件时,将此输入源标记为“暂时空闲”,从而导致在确定算子的最低水印时忽略此源。

如何检测事件时间偏差

了解此处描述的问题是否发生在您的 Flink 作业中最可靠的方法是查看每个 Flink Kafka 消费者分配的 Kafka 分区的滞后。不幸的是,这些指标不可用。当然可以查看所有 Kafka 分区的滞后,但由于您不知道这些分区是如何分配给所有 Flink Kafka 消费者实例的,因此很难得出任何结论。因此,您需要根据一些间接指标得出自己的结论,例如检查总检查点时间的增长是否快于状态大小的增长,或者状态运算符的各个实例之间的检查点确认时间是否存在差异。后者可能部分是由于您的数据分布不均匀造成的。最可靠的指标可能是 Flink Kafka Consumer 实例的不规则水印进展。

平衡读取来救援

以平衡的方式读取 Kafka 主题中的事件将极大地缓解 Kafka Consumer 的默认行为导致的问题。我们使用的平衡读取算法也被称为 K-way merge。K-way merge(或平衡读取)可以应用于两个级别:第一个是单个 Kafka 消费者实例中指定主题的平衡读取,而第二个包括所有 Kafka 消费者实例所需的协调。我们将在下面的段落中介绍这两个级别。

单个 Kafka 消费者内的平衡读取

单个 Kafka 消费者内的平衡读取可以通过利用其所谓的“消费流控制”机制来实现。此机制使您能够在轮询周期内暂时暂停使用某些分区的事件,然后再恢复使用事件。要知道何时暂停和何时恢复,需要更改 Flink Kafka 消费者的算法,如下所示:

  1. 最初消费者尝试读取所有分配的分区。
  2. 如果在第一个轮询周期中无法读取某些已分配的分区,则已读取的分区将暂停,以确保剩余的已分配分区将在下一个轮询周期或此后的任何轮询周期中读取。
  3. 在此过程中,我们将对来自所有分区的所有事件进行排队,并且此时我们不会发出任何内容。
  4. 只有当所有分区都被读取时,我们才能通过在所有非空闲主题分区上建立最高水印来确定必须发出什么。
  5. 用于组装要发出的事件集合的算法是所谓的 K 路合并算法。该算法使用 K 排序列表作为输入 – 在我们的例子中,来自指定分区的消费事件根据时间戳进行过滤,直到最高的常见水印。结果是所有分区中按事件时间排序的平衡事件列表。发出此排序列表将导致水印逐渐进展。
  6. 然后重复此过程,此时我们可以选择通过让算法计算每个轮询周期的“轮询超时”的最佳值来进一步优化它。

为 Flink 作业中的所有 Kafka 消费者实例平衡读取

到目前为止,我们已经解释了如何为单个 Kafka 消费者实例进行平衡读取。问题仍然是我们如何在 Flink 作业中的所有 Kafka 消费者的所有实例中执行此操作。我们在这里选择默认的 Fink 行为,其中具有多个输入流的 Flink 运算符管理其水印。然后通过背压间接管理,这可能会引入一些延迟,这比引入复杂的节点间通信更好。

结论

我们希望这篇文章能帮助您理解事件时间偏差是如何影响 Apache Flink 作业的,以及您可以采取哪些措施来减轻其影响。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 5,2023

通用的基于日志的增量检查点

自 Flink 1.16 发布以来,基于日志的通用增量检查点(本文简称 GIC)已成为生产就绪的功能。我们之前在题为“基于通用日志的增量检查点 I”的博客文章中讨论了 GIC 的基本概念和底层机制 [1]。在这篇博文中,我们旨在通过深入的实验和分析,全面分析 GIC 的优缺点。 概述 给大家简单介绍一下,Flink 中创建检查点分为两个阶段:同步阶段和异步阶段(sync 阶段和简称异步阶段)。在同步阶段,内存中的状态被刷新到磁盘,而在异步阶段,本地状态文件被上传到远程存储。如果每个任务都成功完成其异步阶段,则检查点成功完成。需要注意的是,对于具有大量任务和大状态的作业,异步阶段的持续时间决定了检查点的速度和稳定性。但是,在未启用 GIC 的情况下,异步阶段存在两个问题:要上传的文件大小严重依赖于 Flink 状态后端的实现,这意味着这些文件可能并不总是很小。异步阶段不会启动直到同步阶段完成。例如,我们以 RocksDB 为例,它是 Flink 中常用的状态后端之一。虽然Flink支持RocksDB增量检查点,但RocksDB的compaction会导致上传文件的大小波动较大。这是因为压缩可能会创建大量新文件,需要在下一个增量检查点上传。因此,某些任务最终可能会在异步阶段花费更多时间上传新生成的文件,这可能导致检查点完成时间更长。对于大量任务,某些任务在异步阶段花费更多时间的概率很高。对于第二个问题,如果没有 GIC,要上传的文件集要到同步阶段结束才准备好。因此,异步阶段无法在此之前开始,从而浪费了两个连续检查点之间的大部分时间。也就是说,异步阶段的文件上传是在没有GIC的情况下在短时间内发生的。如果上传的文件较大,文件上传会在短时间内占用大量的CPU和网络资源。GIC的引入就是为了解决上述问题,将状态后端物化过程与Flink检查点过程解耦。具体来说,状态更新不仅被插入到状态后端,而且还以仅附加模式存储为状态更改日志。状态后端定期拍摄快照并将快照保存到远程 DFS。该过程在 GIC 中表示为物化,并且独立于 Flink 作业的检查点过程。物化的间隔通常比检查点大得多,默认为 10 分钟。同时,状态变更日志不断上传到持久存储。在检查点中,只有尚未上传的部分状态更改日志需要强制刷新到DFS。这样,每个异步阶段上传的数据变得小而稳定。GIC 旨在提高检查点过程的速度和稳定性,其好处包括更稳定和更低的一次性接收器的端到端延迟,更少由于检查点持续时间更短,并且集群资源利用率更稳定,因此故障转移后可以重放数据。另一方面,GIC 稍微降低了最大处理能力并控制了资源消耗的增加(远程 DFS 存储)。本博客分析了上述GIC的收益和成本,并详细分享了相应的评估结果。2 优点。 2.1 更稳定、更低的端到端延迟端到端处理延迟是 Flink 作业的一个关键性能指标,它代表接收输入数据和产生结果之间的时间。较低的端到端处理延迟可以提高下游系统中的数据新鲜度。 Flink 的检查点过程实现了作业内的恰好一次语义。为了保证端到端的一次性语义,源需要支持重放,而宿需要支持事务。

只有在检查点完成后,事务接收器才能提交已提交的偏移量。因此,创建检查点的速度越快,事务接收器提交的频率就越高,从而提供更低的端到端延迟。

事务接收器的端到端延迟GIC 减少了端到端处理延迟并提高稳定性。如上所述,GIC 将状态后端的物化与检查点过程解耦,从而减轻了由于状态后端实现细节(例如压缩)而导致的上传文件时间不可预测的影响。而且,在c期间只需要上传少量的状态变更日志。检查点程序。这些增强功能加速了检查点的异步阶段,使检查点过程更快,从而降低端到端延迟和更好的数据新鲜度。2.2 故障转移后更少的数据重放如果接收器不支持事务,数据重放会导致向下游重复输出系统。如果下游系统不支持重复数据删除,这会导致数据正确性问题。数据重放引入了输出重复。当从故障转移中恢复时,Flink 会回滚到最新的完整检查点。如图2所示,重放的数据量是从最后一个检查点到作业失败时的数据量。由于 GIC 允许更快地完成检查点,因此减少了重放的数据量,这对于对数据重复敏感的业务(例如 BI 聚合)尤其有利,从而提高了数据质量。

使用 RocksDB 进行故障转移后的数据重放对比。 GIC2.3 更稳定地利用资源触发检查点可能会导致CPU和网络使用量的爆发,特别是对于状态较大的作业。我们以 RocksDB 为例,在没有启用 GIC 的情况下,每个检查点都会触发从内存到磁盘的刷新,从而产生 RocksDB 压缩。压缩通常会消耗大量 IO 和 CPU 资源,这就是为什么我们在检查点触发后立即看到 CPU 和 IO 指标出现峰值的原因。此外,所有实例的状态文件的上传(检查点的异步阶段)是相互重叠的。这会导致网络资源使用量在短时间内激增,有时会导致出站流量饱和,导致整个集群不稳定。

触发检查点后,所有实例几乎同时上传文件到DFS。对于状态较大的作业,CPU 使用率突发意味着可能需要预留更多的 CPU 资源,突发流量可能会导致检查点超时。即使现代容器技术提供隔离,集群方面的作业仍可能共享 CPU 和网络资源。因此,突发的CPU和网络使用可能会导致整个集群不稳定。

RocksDB增量检查点异步过程中的文件上传。GIC可以有效减少和稳定CPU和网络使用。一方面,GIC不断地将增量状态变更日志上传到DFS,从而均匀地分散文件上传工作。另一方面,GIC可以减少状态后端的物化频率,并且每个实例的物化是随机的。如图4所示,启用GIC后,不同实例的文件上传在整个物化间隔内均匀分布。 RocksDB 状态后端的物化变得低频且分散。

GIC 中 RocksDB 状态后端物化过程中的文件上传。 2.4 可预测的开销 双写对性能的影响可以忽略不计(2 ~ 3%)双写表示GIC需要同时将状态更新写入底层状态后端和状态变更日志。直观上,双写会给每个状态更新带来额外的处理开销。如图 5 所示,GIC 引入了持久短期日志 (DSTL) 来管理状态变更日志。状态变更日志以仅附加格式顺序批处理。根据评估结果,当网络带宽不是瓶颈时,双写对最大处理能力的影响可以忽略不计(2~3%)。图5:双写流程,使用RocksDB状态后端。可预测的开销网络使用和 DFS 存储启用 GIC 的远程 DFS 中存储的数据由两部分组成:底层状态后端的物化快照和 DSTL 中的状态更改日志。在相应的更改反映在物化快照中后,状态更改日志将被删除,并且不再被任何活动检查点引用。因此,理论上,DSTL 在实现完成之前就达到了最大值,但其相应的变更日志尚未清理。鉴于给定算子的状态更新通常具有规则模式,DSTL 的最大大小是可估计的。据此,我们可以调整参数来控制大小。例如,缩短实现间隔可以减小最大 DSTL 大小。如图 6 所示,物化部分在两次相邻的物化执行之间保持不变,而 DSTL 不断累积状态更改日志。 CHK-7 是从 t1 到 t8 的检查点中最大的,即 CHK-2 到 CHK-9,因为 CHK-7 是 MID-1 之后和 MID-2 之前的最后一个检查点,包括 t1 和 t7 之间的所有状态更改日志。图6:OP-1在检查点进程中使用完整检查点大小的DFS。值得一提的是,DFS的成本仅占整个作业计费的一小部分。例如,Amazon S3 的月费为 0.021 美元/GB [2]。也就是说,10 GB 的每月存储费用不到 0.21 美元,远远低于 CPU 和内存等计算资源的费用。额外的网络开销是由状态变更日志的持续上传引起的。然而,这种开销可以通过较低的实现频率来抵消。而且,检查点通常使用内网流量,成本也可以忽略不计。3 性能评估我们通过以下实验来说明其优点。和缺点。

3.1 安装版本:Flink 1.16,FRocksDB 6.20.3DFS:阿里云 OSSM 内存:任务管理器 1 核 4GB,作业管理器 1 核 4GB 部署模式:K8S 应用并行度:50 状态后端:RocksDB(启用增量检查点)检查点间隔: 1秒材料化间隔:3分钟选择两种广泛使用的状态类型:具有聚合数据的状态和具有原始数据的状态:聚合(具有聚合数据):通常,聚合作业基于聚合函数(例如min、max、 count等。本例中选择WordCount作为典型的聚合作业。在测试中,每个实例的状态大小约为 500 MB(中等大小状态),每个字 200 字节。 窗口(包含原始数据):窗口状态保留来自流的原始输入,并在其上滑动。在这种情况下,会选择滑动窗口作业。每个实例的状态大小约为 2 GB,每个记录长度为 100 字节。 3.2 更快的检查点 该实验使用已完成的检查点数量(表 1)和检查点持续时间(表 2)来衡量检查点完成的速度。他们的结果表明,GIC 可以大大加快检查点的完成速度。表 1 中启用 GIC 后,WordCount 作业完成的检查点数量增加了 4.23 倍,而滑动窗口作业则增加了近 40 倍。在表 2 中,启用 GIC 后,wordCount 和窗口作业的平均检查点持续时间分别下降了 89.7% 和 98.8%。对于状态大小比较大的滑动窗口作业,检查点持续时间可以从分钟减少到秒。表1:12小时内完成的检查点数量。GIC启用GIC禁用增加比例Wordcount1893636214.23次Window1158029438.39次表2:检查点持续时间与或不带 GIC.P50P99P99.9Wordcount-89.7%(10.21s -> 1.05s)-79.5%(16.08s -> 3.30s)-72.3%(17.76s -> 4.92s)Window-98.8%(129.47s -> 1.58s) )-98.8%(383.63s -> 4.47s)-98.8%(408.79s -> 4.96s)由于检查点异步阶段上传到DFS的数据量减少,检查点加速。

启用GIC后增量检查点的大小减少了95%以上。这是因为变更日志数据的不断写入和上传。因此,当检查点被触发时,只有尚未上传的状态变化(小于5MB)需要刷新并传输到DFS,这比没有GIC的大小要小得多。

wordCount的增量检查点大小。

更快的检查点会缩短支持事务的接收器的端到端延迟。事务接收器仅在检查点完成后才完成提交。因此,对于Exactly-once Sinks,GIC可以显着降低端到端延迟(从分钟到秒)。 3.3更稳定和可预测的检查点检查点持续时间(完成一个检查点的持续时间)的波动范围用于衡量稳定性本实验中的检查点。启用 GIC 后,wordCount 和窗口作业的检查点持续时间保持在 5 秒以内。如果没有 GIC,久期的波动范围就会更大。在滑动窗口的情况下,范围超过100秒,极端情况下超过400秒。

检查点稳定性提高的主要原因是解耦检查点过程中状态后端的具体化。本次实验使用RocksDB。增量检查点大小取决于RocksDB快照机制,RocksDB内部compaction会影响RocksDB快照生成的增量文件大小,导致上传时间变化范围较大。 GIC 只需要在异步阶段上传增量状态变更日志,有效避免了 RocksDB Compaction 的负面影响,使得检查点持续时间在小范围内波动。

3.4 故障转移后较少的数据重播我们使用 Sources 的 lag 来粗略估计数据量故障转移后重播的数据,由 Kafka Source 中的 currentEmitEventTimeLag[3] 测量。在这种情况下使用相同的滑动窗口作业,并通过在源中注入特殊记录来触发失败。如图11所示,使用GI后数据回放时间从180秒减少到105秒C 启用,减少了 41.7%。图 11:窗口作业中 Kafka 源的 CurrentEmitEventTimeLagspan> 指标。故障转移后源的延迟也受到恢复速度的影响,所以让我们看看这部分进行相同的实验。表 3 显示了 P99 在恢复上花费的时间。启用 GIC 后,需要额外 16 秒才能重新应用状态更改日志。由于启用了本地恢复,因此节省了状态下载所花费的时间。将表3所示的结果与图11所示的源滞后相结合,我们可以得出结论,重放的数据量显着减少。数据重放会影响非事务性接收器的数据重复。

作者 east

上一 1 … 5 6 7 … 19 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.