gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

月度归档8月 2023

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  2023   /  
  • 8月
  • ( 页面4 )
CDH 8月 7,2023

CDH启动namenode报错:java.io.IOException: NameNode is not formatted. 请分析原因和给解决方案

CDH的磁盘发生故障,磁盘恢复数据后,重启namenode发生错误:java.io.IOException: NameNode is not formatted.

这个错误说明你的Hadoop NameNode未被正确格式化。当/opt目录故障后重新恢复,可能导致NameNode元数据丢失或损坏,从而导致该错误。

原因:

  • 故障导致Hadoop NameNode的元数据损坏或丢失。
  • 在故障恢复期间可能没有正确备份Hadoop的/opt目录。

解决方案: 为了解决这个问题,你可以采取以下步骤来重新格式化Hadoop NameNode:

  1. 备份数据: 在执行任何恢复操作之前,确保你有/opt目录的备份。如果有最近的备份,可以尝试恢复备份数据。
  2. 格式化NameNode: 格式化NameNode将删除现有的元数据,并创建一个新的空的Hadoop文件系统。请注意,这将清空之前的Hadoop数据,包括HDFS中的所有文件和目录。在启动Hadoop之前,使用以下命令格式化NameNode:bash复制代码hdfs namenode -format
  3. 启动Hadoop: 在成功格式化NameNode后,启动Hadoop集群。bash复制代码start-dfs.sh start-yarn.sh
  4. 验证: 确保Hadoop集群正常运行,并且你的应用程序能够正确访问和使用HDFS中的数据。

在重新格式化NameNode之前,请确保备份你认为重要的数据,因为这将清除所有现有的数据。这个操作是不可逆的。同时,还要检查/opt目录故障的根本原因,以避免类似问题的再次发生。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
CDH 8月 7,2023

启动CDH的zookeeper报错: org.apache.zookeeper.server.persistence.FileTxnSnapLog$DatadirException: Missing data directory /opt/zookeeper/version-2, automatic data directory creation is disabled

CDH的磁盘发生故障,磁盘恢复数据后,重启Zookeeper发生错误:Missing data directory /opt/zookeeper/version-2, automatic data directory creation is disabled

这个错误是由于ZooKeeper没有找到指定的数据目录 /opt/zookeeper/version-2 导致的,而且自动数据目录创建已被禁用。

为了解决这个问题,你可以采取以下步骤:

  1. 检查目录路径: 首先,确保 /opt/zookeeper/version-2 目录路径是正确的,并且在该路径下确实存在 ZooKeeper 数据文件。
  2. 手动创建数据目录: 如果该目录不存在,你可以手动创建它,然后将之前的 ZooKeeper 数据文件移到这个目录下。
  3. 启用自动数据目录创建: 如果你希望 ZooKeeper 能够自动创建数据目录,可以修改 ZooKeeper 配置文件,将 autopurge.snapRetainCount 和 autopurge.purgeInterval 设置为适当的值,以启用自动清理和数据目录创建。示例如下:复制代码autopurge.snapRetainCount=3 autopurge.purgeInterval=1

确保在修改配置文件后重新启动 ZooKeeper 服务。

如果你仍然遇到问题,可能需要检查 ZooKeeper 配置和文件权限,确保其设置正确,并且服务具有足够的权限来访问指定的数据目录。

作者 east
Flink 8月 6,2023

混合 Shuffle 模式的性能分析和调优指南

概述

Apache Flink 社区在 Flink 1.16 中引入了 Hybrid Shuffle 模式,它将传统的 Batch Shuffle 与流处理中的 Pipelined Shuffle 结合起来,赋予 Flink 批处理更强大的能力。

Hybrid Shuffle 的核心思想是打破调度约束,根据资源的可用性来决定是否需要调度下游任务,同时在条件允许的情况下支持内存中的数据交换而不溢出到磁盘。

我们基于 Flink 1.17 在多个场景下对 Hybrid Shuffle 进行了评估。本文将根据评测结果详细分析 Hybrid Shuffle 的优势,并根据我们的经验提供一些调优指南。

Hybrid Shuffle 的优势

与传统的 Batch Blocking Shuffle 相比,Hybrid Shuffle 主要有以下优势:

  • 灵活的调度:Hybrid Shuffle 打破了 Pipelined Shuffle 模式下所有任务必须同时调度,或者 Blocking Shuffle 模式下必须分段调度的前提:当资源充足时,上下游任务可以同时运行。当资源不足时,下游任务可以批量执行。
  • 减少 IO 开销:Hybrid Shuffle 打破了批处理作业的所有数据都必须写入磁盘并从磁盘消费的约束。当上下游任务同时运行时,支持直接从内存中消费数据,在提升作业性能的同时,显着减少了磁盘 IO 的额外开销。

这些独特的优势让 Hybrid Shuffle 具备了传统 Blocking Shuffle 所缺乏的能力。为了验证其有效性,我们进行了一系列的实验和分析,主要分为以下几个方面:

  • 填补资源缺口:资源缺口是指作业执行过程中某些时间点出现的空闲任务槽位,即集群资源没有被充分利用。这种情况在 Flink Blocking Shuffle 中会出现,在某些任务存在数据倾斜的场景下尤为明显。下图是 Blocking Shuffle 和 Hybrid Shuffle 的对比。可以看到,Blocking Shuffle 中的两个任务槽位无法使用,而 Hybrid Shuffle 中的三个任务槽位都在使用中。

数据倾斜是一种普遍存在的现象。以 TPC-DS q4 为例,一个 HashJoin 算子平均读取 204 MB 数据,而一个倾斜任务读取多达 7.03 GB 数据。测试发现,Hybrid Shuffle 相比 Blocking Shuffle 减少了该查询的总执行时间 18.74%。

  • 减少磁盘负载:Flink Blocking Shuffle 将所有中间数据写入磁盘。因此,随机写入和随机读取阶段分别执行磁盘写入和读取操作。这带来了两个主要问题:
    • 磁盘 IO 负载增加,影响整个集群的吞吐量。随着集群上作业数量的增加,磁盘 IO 将成为瓶颈。
    • 大规模批量作业的 Shuffle 数据会占用相当大的磁盘存储空间,且大小难以估计。这个问题在以 Kubernetes 为代表的云原生环境中更为突出:配置值太小,会出现存储空间不足的情况;如果配置的值太大,会浪费存储资源,因为资源在大多数情况下是在 pod 级别隔离的。

Hybrid Shuffle 引入了两种数据溢出策略:

* 选择性溢出策略:当有数据溢出时,只将一部分数据溢出到磁盘,内存空间不足。该策略可以同时减少磁盘读写指令。
* 全溢出策略:所有中间数据都写入磁盘,但下游任务可以直接从内存中消费未释放的数据。该策略可以有效减少磁盘读取指令,同时还可以提高容错能力。

为了比较不同 shuffle 模式和溢出策略对磁盘 IO 负载的影响,我们进行了以下实验:

* 测试磁盘读取和写入的数据与磁盘 IO 负载的比例。
* 不同 shuffle 模式和溢出策略下的总数据。
* 测试 Hybrid Shuffle 选择性溢出策略下不同网络内存大小下磁盘读写数据占总数据的比例。

从实验结果可以看出:

* 与 Blocking Shuffle 相比,Hybrid Shuffle 大大减少了磁盘读取和写入的数据量。

Hybrid Shuffle 的调优指南

基于上面的分析和实验结果,我们总结了以下三个调优指南:

  • **使用 Hybrid Shuffle:**当作业存在数据倾斜或磁盘 I/O 负载高时,使用 Hybrid Shuffle 可以显著提高作业性能。
  • **降低算子的并行度:**当作业并行度设置过高时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以降低算子的并行度来提高作业性能。
  • **增加网络内存的大小:**当网络内存大小设置过小时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以增加网络内存的大小来提高作业性能。

结论

在本文中,我们主要研究了导致 Hybrid Shuffle 优越性能的因素。我们的研究包括对这些因素的综合实验评估和分析。此外,我们还提出了相应的增强实用性的优化指南:

  • **使用 Hybrid Shuffle:**当作业存在数据倾斜或磁盘 I/O 负载高时,使用 Hybrid Shuffle 可以显著提高作业性能。
  • **降低算子的并行度:**当作业并行度设置过高时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以降低算子的并行度来提高作业性能。
  • **增加网络内存的大小:**当网络内存大小设置过小时,可能会导致磁盘 I/O 负载高或作业执行时间长。因此,可以增加网络内存的大小来提高作业性能。

我们相信,Hybrid Shuffle 是 Apache Flink 批处理性能优化的一项重要技术。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
未分类 8月 6,2023

流处理可扩展性:挑战和解决方案

什么是流处理?

流处理是一种处理不断生成的数据的编程范例。它与批处理处理不同,批处理处理的数据是事先收集的并存储在数据库或文件中。流处理更适合处理需要实时处理的数据,例如来自传感器、社交媒体或交易系统的数据。

流处理系统擅长处理大量数据,并且可以实时处理数据,这使得它们非常适合实时分析和决策。例如,流处理系统可用于检测欺诈交易、跟踪客户行为或优化网络性能。

为什么选择流处理?

有许多原因为什么公司选择使用流处理。以下是一些最常见的原因:

  • **实时分析:**流处理允许公司实时分析数据,这对于检测欺诈、跟踪客户行为或优化网络性能等任务非常重要。
  • **决策:**流处理系统可用于实时做出决策,这对于需要快速响应瞬息万变的市场条件的公司非常重要。
  • **响应能力:**流处理系统可用于提高公司对变化的响应能力。例如,流处理系统可用于检测欺诈交易并阻止它们发生,或跟踪客户行为并优化营销活动。
  • **效率:**流处理系统可用于提高公司的效率。例如,流处理系统可用于优化网络性能,减少成本或提高客户服务水平。

流处理的挑战

流处理系统面临一些挑战,包括:

  • **数据量:**流处理系统需要能够处理大量数据。随着产生的数据量的增加,流处理系统变得更加复杂和昂贵。
  • **延迟:**流处理系统必须能够实时处理数据,这可能很困难。
  • **可扩展性:**流处理系统必须能够随着数据量的增加而扩展。
  • **容错性:**流处理系统必须能够处理故障。

流处理的未来

流处理是一项快速发展的技术,随着数据量的增加,它将变得越来越重要。流处理系统将继续发展,以满足企业的需求。未来,流处理系统将变得更加复杂、高效和可扩展。它们也将能够处理更大的数据量,具有更低的延迟。

  • 流处理系统通常使用以下步骤来处理数据:
    • 数据收集:数据来自各种来源,例如传感器、社交媒体和交易系统。
    • 数据预处理:数据在处理之前需要预处理,以使其易于处理。这可能包括数据清理、格式化和标准化。
    • 数据分析:数据分析的目的是从数据中提取洞察力。这可能包括统计分析、机器学习和自然语言处理。
    • 数据可视化:数据可视化是一种将数据以易于理解的方式显示给用户的过程。这可以帮助用户更好地理解数据并做出决策。
  • 流处理系统可用于多种应用,包括:
    • 欺诈检测
    • 客户行为分析
    • 网络性能优化
    • 实时建议
    • 工业控制
  • 流处理是一项强大的工具,可帮助企业从数据中提取洞察力并做出更好的决策。随着数据量的增加,流处理将变得越来越重要。
作者 east
Flink 8月 6,2023

减轻事件时间偏差如何减少检查点故障

简介

Apache Flink 提供对事件时间和有状态事件处理的支持,这使其与其他流处理器区分开来。但是,在各种源以不同速度随时间进展的情况下,使用事件时间可能会导致检查点失败。本文描述了导致这些问题的原因,如何检查您的 Flink 作业是否存在这种情况,并提出了缓解该问题的解决方案。

了解检查点

当定期触发检查点时,系统会生成关于 Flink 作业中所有有状态运算符的一致快照。这需要制定执行计划。有状态运算符的快照彼此对齐这一事实使其保持一致。此对齐过程从 Flink 在作业源的输入通道中注入所谓的检查点屏障开始。

了解水印

除了流中的常规数据事件之外,还有 Flink 注入流中的所谓“系统事件”。此类系统事件的示例包括检查点障碍(如上一段所述),以及水印。Apache Flink 使用水印来跟踪事件时间的进度。事件时间是从数据事件的字段之一中提取的,该字段包含最初创建该事件时的时间戳。通常,会生成水印并将其添加到源处的流中。重要的是要认识到,每个单独的源都会生成自己的水印并将其添加到流中。因此,在任何给定的时间点,系统中不只有一个事件时间,而是与源(的实例)一样多。

什么会导致事件时间偏差

事件时间偏差是操作符的各个实例的水印随着时间的推移彼此相距较远的结果。事件时间偏差的原因之一是当 Flink 作业需要使用来自具有不同特征的源的事件时。让我们以以下一组源为例:

  • 一个源每小时加载一个事件
  • 另一个源每秒加载 10K 个事件
  • 最后,具有不频繁的高突发事件的源

随着时间的推移,使用此源组合将导致各自的水印进展彼此显着不同。在 Flink 作业失败后,这种情况通常会被放大,作业需要赶上读取事件。

事件时间偏差的另一个原因是数据本身的分布不平衡。例如,某些键可能比其他键更频繁地出现,使得处理此类键的相应运算符只需进行更多处理,这可能会导致水印进展变慢。

由事件时间偏差引起的问题

许多 Flink 作业都使用与窗口相关的函数和/或键控操作,其中计时器在有状态运算符确定发出什么内容以及何时发出内容的过程中发挥着重要作用。这些计时器带有 anonTimer( …)仅当运算符的水印达到或超过这些计时器的时间戳时才调用的方法。考虑到此类操作符使用最低水位线来触发其计时器,很明显,事件时间偏度会导致许多副作用,即:

  • 待处理计时器的数量将会增加,与它们相关的资源量也会增加
  • 背压正在增加
  • 进度检查点屏障的速度会减慢,并且整个检查点过程将花费更长的时间,导致输入流被阻塞,从而进一步减慢事件的处理速度

这些副作用最终可能导致内存不足错误、检查点故障甚至作业崩溃。

减少事件时间偏差

由于事件时间偏差有多种原因,因此有多种解决方案可以应用。从 Flink 1.11 开始,对所谓的空闲检测提供了开箱即用的支持。如果一个主题在较长一段时间内不产生事件并因此导致没有水印进展,则该主题被称为空闲。 Flink 提供的解决方案是,当在可配置的时间段内没有接收到任何事件时,将此输入源标记为“暂时空闲”,从而导致在确定算子的最低水印时忽略此源。

如何检测事件时间偏差

了解此处描述的问题是否发生在您的 Flink 作业中最可靠的方法是查看每个 Flink Kafka 消费者分配的 Kafka 分区的滞后。不幸的是,这些指标不可用。当然可以查看所有 Kafka 分区的滞后,但由于您不知道这些分区是如何分配给所有 Flink Kafka 消费者实例的,因此很难得出任何结论。因此,您需要根据一些间接指标得出自己的结论,例如检查总检查点时间的增长是否快于状态大小的增长,或者状态运算符的各个实例之间的检查点确认时间是否存在差异。后者可能部分是由于您的数据分布不均匀造成的。最可靠的指标可能是 Flink Kafka Consumer 实例的不规则水印进展。

平衡读取来救援

以平衡的方式读取 Kafka 主题中的事件将极大地缓解 Kafka Consumer 的默认行为导致的问题。我们使用的平衡读取算法也被称为 K-way merge。K-way merge(或平衡读取)可以应用于两个级别:第一个是单个 Kafka 消费者实例中指定主题的平衡读取,而第二个包括所有 Kafka 消费者实例所需的协调。我们将在下面的段落中介绍这两个级别。

单个 Kafka 消费者内的平衡读取

单个 Kafka 消费者内的平衡读取可以通过利用其所谓的“消费流控制”机制来实现。此机制使您能够在轮询周期内暂时暂停使用某些分区的事件,然后再恢复使用事件。要知道何时暂停和何时恢复,需要更改 Flink Kafka 消费者的算法,如下所示:

  1. 最初消费者尝试读取所有分配的分区。
  2. 如果在第一个轮询周期中无法读取某些已分配的分区,则已读取的分区将暂停,以确保剩余的已分配分区将在下一个轮询周期或此后的任何轮询周期中读取。
  3. 在此过程中,我们将对来自所有分区的所有事件进行排队,并且此时我们不会发出任何内容。
  4. 只有当所有分区都被读取时,我们才能通过在所有非空闲主题分区上建立最高水印来确定必须发出什么。
  5. 用于组装要发出的事件集合的算法是所谓的 K 路合并算法。该算法使用 K 排序列表作为输入 – 在我们的例子中,来自指定分区的消费事件根据时间戳进行过滤,直到最高的常见水印。结果是所有分区中按事件时间排序的平衡事件列表。发出此排序列表将导致水印逐渐进展。
  6. 然后重复此过程,此时我们可以选择通过让算法计算每个轮询周期的“轮询超时”的最佳值来进一步优化它。

为 Flink 作业中的所有 Kafka 消费者实例平衡读取

到目前为止,我们已经解释了如何为单个 Kafka 消费者实例进行平衡读取。问题仍然是我们如何在 Flink 作业中的所有 Kafka 消费者的所有实例中执行此操作。我们在这里选择默认的 Fink 行为,其中具有多个输入流的 Flink 运算符管理其水印。然后通过背压间接管理,这可能会引入一些延迟,这比引入复杂的节点间通信更好。

结论

我们希望这篇文章能帮助您理解事件时间偏差是如何影响 Apache Flink 作业的,以及您可以采取哪些措施来减轻其影响。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 5,2023

通用的基于日志的增量检查点

自 Flink 1.16 发布以来,基于日志的通用增量检查点(本文简称 GIC)已成为生产就绪的功能。我们之前在题为“基于通用日志的增量检查点 I”的博客文章中讨论了 GIC 的基本概念和底层机制 [1]。在这篇博文中,我们旨在通过深入的实验和分析,全面分析 GIC 的优缺点。 概述 给大家简单介绍一下,Flink 中创建检查点分为两个阶段:同步阶段和异步阶段(sync 阶段和简称异步阶段)。在同步阶段,内存中的状态被刷新到磁盘,而在异步阶段,本地状态文件被上传到远程存储。如果每个任务都成功完成其异步阶段,则检查点成功完成。需要注意的是,对于具有大量任务和大状态的作业,异步阶段的持续时间决定了检查点的速度和稳定性。但是,在未启用 GIC 的情况下,异步阶段存在两个问题:要上传的文件大小严重依赖于 Flink 状态后端的实现,这意味着这些文件可能并不总是很小。异步阶段不会启动直到同步阶段完成。例如,我们以 RocksDB 为例,它是 Flink 中常用的状态后端之一。虽然Flink支持RocksDB增量检查点,但RocksDB的compaction会导致上传文件的大小波动较大。这是因为压缩可能会创建大量新文件,需要在下一个增量检查点上传。因此,某些任务最终可能会在异步阶段花费更多时间上传新生成的文件,这可能导致检查点完成时间更长。对于大量任务,某些任务在异步阶段花费更多时间的概率很高。对于第二个问题,如果没有 GIC,要上传的文件集要到同步阶段结束才准备好。因此,异步阶段无法在此之前开始,从而浪费了两个连续检查点之间的大部分时间。也就是说,异步阶段的文件上传是在没有GIC的情况下在短时间内发生的。如果上传的文件较大,文件上传会在短时间内占用大量的CPU和网络资源。GIC的引入就是为了解决上述问题,将状态后端物化过程与Flink检查点过程解耦。具体来说,状态更新不仅被插入到状态后端,而且还以仅附加模式存储为状态更改日志。状态后端定期拍摄快照并将快照保存到远程 DFS。该过程在 GIC 中表示为物化,并且独立于 Flink 作业的检查点过程。物化的间隔通常比检查点大得多,默认为 10 分钟。同时,状态变更日志不断上传到持久存储。在检查点中,只有尚未上传的部分状态更改日志需要强制刷新到DFS。这样,每个异步阶段上传的数据变得小而稳定。GIC 旨在提高检查点过程的速度和稳定性,其好处包括更稳定和更低的一次性接收器的端到端延迟,更少由于检查点持续时间更短,并且集群资源利用率更稳定,因此故障转移后可以重放数据。另一方面,GIC 稍微降低了最大处理能力并控制了资源消耗的增加(远程 DFS 存储)。本博客分析了上述GIC的收益和成本,并详细分享了相应的评估结果。2 优点。 2.1 更稳定、更低的端到端延迟端到端处理延迟是 Flink 作业的一个关键性能指标,它代表接收输入数据和产生结果之间的时间。较低的端到端处理延迟可以提高下游系统中的数据新鲜度。 Flink 的检查点过程实现了作业内的恰好一次语义。为了保证端到端的一次性语义,源需要支持重放,而宿需要支持事务。

只有在检查点完成后,事务接收器才能提交已提交的偏移量。因此,创建检查点的速度越快,事务接收器提交的频率就越高,从而提供更低的端到端延迟。

事务接收器的端到端延迟GIC 减少了端到端处理延迟并提高稳定性。如上所述,GIC 将状态后端的物化与检查点过程解耦,从而减轻了由于状态后端实现细节(例如压缩)而导致的上传文件时间不可预测的影响。而且,在c期间只需要上传少量的状态变更日志。检查点程序。这些增强功能加速了检查点的异步阶段,使检查点过程更快,从而降低端到端延迟和更好的数据新鲜度。2.2 故障转移后更少的数据重放如果接收器不支持事务,数据重放会导致向下游重复输出系统。如果下游系统不支持重复数据删除,这会导致数据正确性问题。数据重放引入了输出重复。当从故障转移中恢复时,Flink 会回滚到最新的完整检查点。如图2所示,重放的数据量是从最后一个检查点到作业失败时的数据量。由于 GIC 允许更快地完成检查点,因此减少了重放的数据量,这对于对数据重复敏感的业务(例如 BI 聚合)尤其有利,从而提高了数据质量。

使用 RocksDB 进行故障转移后的数据重放对比。 GIC2.3 更稳定地利用资源触发检查点可能会导致CPU和网络使用量的爆发,特别是对于状态较大的作业。我们以 RocksDB 为例,在没有启用 GIC 的情况下,每个检查点都会触发从内存到磁盘的刷新,从而产生 RocksDB 压缩。压缩通常会消耗大量 IO 和 CPU 资源,这就是为什么我们在检查点触发后立即看到 CPU 和 IO 指标出现峰值的原因。此外,所有实例的状态文件的上传(检查点的异步阶段)是相互重叠的。这会导致网络资源使用量在短时间内激增,有时会导致出站流量饱和,导致整个集群不稳定。

触发检查点后,所有实例几乎同时上传文件到DFS。对于状态较大的作业,CPU 使用率突发意味着可能需要预留更多的 CPU 资源,突发流量可能会导致检查点超时。即使现代容器技术提供隔离,集群方面的作业仍可能共享 CPU 和网络资源。因此,突发的CPU和网络使用可能会导致整个集群不稳定。

RocksDB增量检查点异步过程中的文件上传。GIC可以有效减少和稳定CPU和网络使用。一方面,GIC不断地将增量状态变更日志上传到DFS,从而均匀地分散文件上传工作。另一方面,GIC可以减少状态后端的物化频率,并且每个实例的物化是随机的。如图4所示,启用GIC后,不同实例的文件上传在整个物化间隔内均匀分布。 RocksDB 状态后端的物化变得低频且分散。

GIC 中 RocksDB 状态后端物化过程中的文件上传。 2.4 可预测的开销 双写对性能的影响可以忽略不计(2 ~ 3%)双写表示GIC需要同时将状态更新写入底层状态后端和状态变更日志。直观上,双写会给每个状态更新带来额外的处理开销。如图 5 所示,GIC 引入了持久短期日志 (DSTL) 来管理状态变更日志。状态变更日志以仅附加格式顺序批处理。根据评估结果,当网络带宽不是瓶颈时,双写对最大处理能力的影响可以忽略不计(2~3%)。图5:双写流程,使用RocksDB状态后端。可预测的开销网络使用和 DFS 存储启用 GIC 的远程 DFS 中存储的数据由两部分组成:底层状态后端的物化快照和 DSTL 中的状态更改日志。在相应的更改反映在物化快照中后,状态更改日志将被删除,并且不再被任何活动检查点引用。因此,理论上,DSTL 在实现完成之前就达到了最大值,但其相应的变更日志尚未清理。鉴于给定算子的状态更新通常具有规则模式,DSTL 的最大大小是可估计的。据此,我们可以调整参数来控制大小。例如,缩短实现间隔可以减小最大 DSTL 大小。如图 6 所示,物化部分在两次相邻的物化执行之间保持不变,而 DSTL 不断累积状态更改日志。 CHK-7 是从 t1 到 t8 的检查点中最大的,即 CHK-2 到 CHK-9,因为 CHK-7 是 MID-1 之后和 MID-2 之前的最后一个检查点,包括 t1 和 t7 之间的所有状态更改日志。图6:OP-1在检查点进程中使用完整检查点大小的DFS。值得一提的是,DFS的成本仅占整个作业计费的一小部分。例如,Amazon S3 的月费为 0.021 美元/GB [2]。也就是说,10 GB 的每月存储费用不到 0.21 美元,远远低于 CPU 和内存等计算资源的费用。额外的网络开销是由状态变更日志的持续上传引起的。然而,这种开销可以通过较低的实现频率来抵消。而且,检查点通常使用内网流量,成本也可以忽略不计。3 性能评估我们通过以下实验来说明其优点。和缺点。

3.1 安装版本:Flink 1.16,FRocksDB 6.20.3DFS:阿里云 OSSM 内存:任务管理器 1 核 4GB,作业管理器 1 核 4GB 部署模式:K8S 应用并行度:50 状态后端:RocksDB(启用增量检查点)检查点间隔: 1秒材料化间隔:3分钟选择两种广泛使用的状态类型:具有聚合数据的状态和具有原始数据的状态:聚合(具有聚合数据):通常,聚合作业基于聚合函数(例如min、max、 count等。本例中选择WordCount作为典型的聚合作业。在测试中,每个实例的状态大小约为 500 MB(中等大小状态),每个字 200 字节。 窗口(包含原始数据):窗口状态保留来自流的原始输入,并在其上滑动。在这种情况下,会选择滑动窗口作业。每个实例的状态大小约为 2 GB,每个记录长度为 100 字节。 3.2 更快的检查点 该实验使用已完成的检查点数量(表 1)和检查点持续时间(表 2)来衡量检查点完成的速度。他们的结果表明,GIC 可以大大加快检查点的完成速度。表 1 中启用 GIC 后,WordCount 作业完成的检查点数量增加了 4.23 倍,而滑动窗口作业则增加了近 40 倍。在表 2 中,启用 GIC 后,wordCount 和窗口作业的平均检查点持续时间分别下降了 89.7% 和 98.8%。对于状态大小比较大的滑动窗口作业,检查点持续时间可以从分钟减少到秒。表1:12小时内完成的检查点数量。GIC启用GIC禁用增加比例Wordcount1893636214.23次Window1158029438.39次表2:检查点持续时间与或不带 GIC.P50P99P99.9Wordcount-89.7%(10.21s -> 1.05s)-79.5%(16.08s -> 3.30s)-72.3%(17.76s -> 4.92s)Window-98.8%(129.47s -> 1.58s) )-98.8%(383.63s -> 4.47s)-98.8%(408.79s -> 4.96s)由于检查点异步阶段上传到DFS的数据量减少,检查点加速。

启用GIC后增量检查点的大小减少了95%以上。这是因为变更日志数据的不断写入和上传。因此,当检查点被触发时,只有尚未上传的状态变化(小于5MB)需要刷新并传输到DFS,这比没有GIC的大小要小得多。

wordCount的增量检查点大小。

更快的检查点会缩短支持事务的接收器的端到端延迟。事务接收器仅在检查点完成后才完成提交。因此,对于Exactly-once Sinks,GIC可以显着降低端到端延迟(从分钟到秒)。 3.3更稳定和可预测的检查点检查点持续时间(完成一个检查点的持续时间)的波动范围用于衡量稳定性本实验中的检查点。启用 GIC 后,wordCount 和窗口作业的检查点持续时间保持在 5 秒以内。如果没有 GIC,久期的波动范围就会更大。在滑动窗口的情况下,范围超过100秒,极端情况下超过400秒。

检查点稳定性提高的主要原因是解耦检查点过程中状态后端的具体化。本次实验使用RocksDB。增量检查点大小取决于RocksDB快照机制,RocksDB内部compaction会影响RocksDB快照生成的增量文件大小,导致上传时间变化范围较大。 GIC 只需要在异步阶段上传增量状态变更日志,有效避免了 RocksDB Compaction 的负面影响,使得检查点持续时间在小范围内波动。

3.4 故障转移后较少的数据重播我们使用 Sources 的 lag 来粗略估计数据量故障转移后重播的数据,由 Kafka Source 中的 currentEmitEventTimeLag[3] 测量。在这种情况下使用相同的滑动窗口作业,并通过在源中注入特殊记录来触发失败。如图11所示,使用GI后数据回放时间从180秒减少到105秒C 启用,减少了 41.7%。图 11:窗口作业中 Kafka 源的 CurrentEmitEventTimeLagspan> 指标。故障转移后源的延迟也受到恢复速度的影响,所以让我们看看这部分进行相同的实验。表 3 显示了 P99 在恢复上花费的时间。启用 GIC 后,需要额外 16 秒才能重新应用状态更改日志。由于启用了本地恢复,因此节省了状态下载所花费的时间。将表3所示的结果与图11所示的源滞后相结合,我们可以得出结论,重放的数据量显着减少。数据重放会影响非事务性接收器的数据重复。

作者 east
Flink 8月 4,2023

Flink SQL 连接 – 第 2 部分

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了额外的工具,用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器,以及操作生成的长时间运行的查询。我们已经看到 Flink SQL 有很多用例,我们很高兴看到什么你将用它来建造。在这个由三部分组成的博客文章系列中,我们将向您展示 Flink SQL 中不同类型的联接以及如何使用它们以多种方式处理数据。

在本文中,您将学习:什么是非压缩和压缩 Kafka 主题什么是时态表如何执行非压缩和压缩 Kafka 主题之间的时态表连接什么是实时星型模式反规范化如何执行实时星型模式反规范化时态表连接与非压缩和压缩 Kafka 主题Apache Kafka 最基本的组织单元是主题,它类似于关系数据库中的表。 Kafka 主题可以是压缩的,也可以是非压缩的。压缩主题会在特定时间段内保留所有消息,即使它们已被删除。非压缩主题仅保留未删除的消息。为了连接压缩和非压缩主题,您可以使用时态表连接。时态表是随时间变化的表。这在 Flink 中也称为动态表。时态/动态表中的行与一个或多个时态周期相关联。时态表包含一个或多个版本化表快照。时态表联接是一项功能,允许将两个不同时态表中的数据通过公共键联接在一起,并将第二个表中的数据自动插入到第一个表中在适当的时间段或版本化表中的相关版本。当集成来自多个源的数据或处理随时间变化的数据时,这非常有用。这也意味着可以通过不断变化的元数据来丰富表,并在某一时间点检索其值。如何在非压缩和压缩 Kafka 主题之间执行时态表连接此示例将展示如何正确丰富一个 Kafka 主题中的记录当事件顺序很重要时,与另一个 Kafka 主题的相应记录相结合。临时表连接采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 SQL 语法 FOR SYSTEM_TIME AS OF 来执行此操作。在本节中,您将把每笔交易(transactions)连接到截至交易发生时的正确货币汇率(currency_rates,一个版本化表)。

类似的示例是将每个订单与订单发生时的客户详细信息连接起来。这正是事件时间时态表连接的作用。 Flink SQL 中的时态表连接在两个表之间存在无序和任意时间偏差的情况下提供了正确的、确定性的结果。transactions 和currency_rates 表都由 Kafka 主题支持,但在利率的情况下,该主题被压缩(例如当更新的费率流入时,仅保留给定密钥的最新消息)。事务中的记录仅被解释为插入,因此该表由标准 Kafka 连接器 (connector=kafka) 支持;而currency_rates中的记录需要解释为基于主键的upsert,这需要Upsert Kafka连接器(connector=upsert-kafka)。Flink SQL:

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  t.total,
  c.currency_code,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;

Data Generators

这两个主题也使用 Flink SQL 作业填充。我们使用faker连接器根据Java Faker表达式在内存中生成行并将它们写入相应的Kafka主题。

currency_rates Topic

Flink SQL

CREATE TEMPORARY TABLE currency_rates_faker
WITH (
  'connector' = 'faker',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
  'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE currency_rates (EXCLUDING OPTIONS);

INSERT INTO currency_rates SELECT * FROM currency_rates_faker;

Kafka Topic

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topiccurrency_rates –property print.key=true –property key.separator=” – ”
HTG – {“currency_code”:”HTG”,”eur_rate”:0.0136,”rate_time”:”2020-12-16 22:22:02″}
BZD – {“currency_code”:”BZD”,”eur_rate”:1.6545,”rate_time”:”2020-12-16 22:22:03″}
BZD – {“currency_code”:”BZD”,”eur_rate”:3.616,”rate_time”:”2020-12-16 22:22:10″}
BHD – {“currency_code”:”BHD”,”eur_rate”:4.5308,”rate_time”:”2020-12-16 22:22:05″}
KHR – {“currency_code”:”KHR”,”eur_rate”:1.335,”rate_time”:”2020-12-16 22:22:06″}

交易

TopicFlink SQL

CREATE TEMPORARY TABLE transactions_faker
WITH (
  'connector' = 'faker',
  'fields.id.expression' = '#{Internet.UUID}',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
  'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE transactions (EXCLUDING OPTIONS);

INSERT INTO transactions SELECT * FROM transactions_faker;

Kafka Topic

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic transactions –property print.key=true –property key.separator=” – ”
e102e91f-47b9-434e-86e1-34fb1196d91d – {“id”:”e102e91f-47b9-434e-86e1-34fb1196d91d”,”currency_code”:”SGD”,”total”:494.07,”transaction_time”:”2020-12- 16 22:18:46″}
bf028363-5ee4-4a5a-9068-b08392d59f0b – {“id”:”bf028363-5ee4-4a5a-9068-b08392d59f0b”,”currency_code”:”EEK”,”total”:906.8,”transaction_time”:”2020-12- 16 22:18:46″}
e22374b5-82da-4c6d-b4c6-f27a818a58ab – {“id”:”e22374b5-82da-4c6d-b4c6-f27a818a58ab”,”currency_code”:”GYD”,”total”:80.66,”transaction_time”:”2020-12- 16 22:19:02″}
81b2ce89-26c2-4df3-b12a-8ca921902ac4 – {“id”:”81b2ce89-26c2-4df3-b12a-8ca921902ac4″,”currency_code”:”EGP”,”total”:521.98,”transaction_time”:”2020-12- 16 22:18:57″}
53c4fd3f-af6e-41d3-a677-536f4c86e010 – {“id”:”53c4fd3f-af6e-41d3-a677-536f4c86e010″,”currency_code”:”UYU”,”total”:936.26,”transaction_time”:”2020-12- 16 22:18:59″}

实时星型模式非规范化(N 路联接)实时星型模式非规范化是在星型模式中连接两个或多个表的过程,以便结果表中的数据非规范化。这样做可以是出于性能方面的原因,也可以是为了更轻松地查询数据,或者两者兼而有之。通过减少需要执行的联接数量,非规范化可用于提高联接大量表的查询的性能。它还可以通过提供一个包含所有数据的表来更轻松地查询数据,否则这些数据将分布在多个表中。

非规范化过程可以应用于任何模式,但最常用于星型模式,这些模式具有一个中心事实表,周围有许多维度表。事实表包含正在分析的数据,维度表包含可用于描述事实表中数据的数据。对星型模式进行非规范化时,维度表中的数据将合并到事实表中。如何执行实时星型模式非规范化此示例将展示如何使用 n 路时态表连接对简单的星型模式进行非规范化。星型模式是数据仓库中数据标准化的一种流行方法。星型模式的中心是一个事实表,其行包含度量、测量和有关世界的其他事实。周围的事实表是一个或多个维度表,这些表具有在计算查询时可用于丰富事实的元数据。想象一下,您正在为一家铁路公司运行一个小型数据仓库,其中包含一个事实表 (train_activity) 和三个维度表(车站、预订通道和乘客) )。对事实表的所有插入以及对维度表的所有更新都将镜像到 Apache Kafka。

事实表中的记录仅被解释为插入,因此该表由标准 Kafka 连接器 (connector=kafka) 支持;。相反,维度表中的记录是基于 prim 的更新插入ary 键,这需要 Upsert Kafka 连接器(connector=upsert-kafka)。使用 Flink SQL,您现在可以使用 5 路时态表连接轻松地将所有维度连接到我们的事实表。临时表连接采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 SQL 语法 FOR SYSTEM_TIME AS OF 来执行此操作。将事实表与更多(缓慢)变化的维度表连接时,使用时态表连接可以获得一致、可重现的结果。每个事件(事实表中的行)根据事件在现实世界中发生的时间连接到每个维度的相应值。

CREATE TEMPORARY TABLE passengers (
  passenger_key STRING,
  first_name STRING,
  last_name STRING,
  update_time TIMESTAMP(3),
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (passenger_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'passengers',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE stations (
  station_key STRING,
  update_time TIMESTAMP(3),
  city STRING,
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (station_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'stations',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE booking_channels (
  booking_channel_key STRING,
  update_time TIMESTAMP(3),
  channel STRING,
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (booking_channel_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'booking_channels',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE train_activities (
  scheduled_departure_time TIMESTAMP(3),
  actual_departure_date TIMESTAMP(3),
  passenger_key STRING,
  origin_station_key STRING,
  destination_station_key STRING,
  booking_channel_key STRING,
  WATERMARK FOR actual_departure_date AS actual_departure_date - INTERVAL '10' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'train_activities',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

SELECT
  t.actual_departure_date,
  p.first_name,
  p.last_name,
  b.channel,
  os.city AS origin_station,
  ds.city AS destination_station
FROM train_activities t
LEFT JOIN booking_channels FOR SYSTEM_TIME AS OF t.actual_departure_date AS b
ON t.booking_channel_key = b.booking_channel_key;
LEFT JOIN passengers FOR SYSTEM_TIME AS OF t.actual_departure_date AS p
ON t.passenger_key = p.passenger_key
LEFT JOIN stations FOR SYSTEM_TIME AS OF t.actual_departure_date AS os
ON t.origin_station_key = os.station_key
LEFT JOIN stations FOR SYSTEM_TIME AS OF t.actual_departure_date AS ds
ON t.destination_station_key = ds.station_key;

Kafka 主题

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic train_activities –property print.key=true –property key.separator=” – ”

null – {“scheduled_departure_time”:“2020-12-19 13:52:37”,“actual_departure_date”:“2020-12-19 13:52:16”,“passenger_key”:7014937,“origin_station_key”:577,” destination_station_key”:862,”booking_channel_key”:2}
null – {“scheduled_departure_time”:“2020-12-19 13:52:38”,“actual_departure_date”:“2020-12-19 13:52:23”,“passenger_key”:2244807,“origin_station_key”:735,” destination_station_key”:739,”booking_channel_key”:2}
null – {“scheduled_departure_time”:“2020-12-19 13:52:46”,“actual_departure_date”:“2020-12-19 13:52:18”,“passenger_key”:2605313,“origin_station_key”:216,” destination_station_key”:453,”booking_channel_key”:3}
null – {“scheduled_departure_time”:”2020-12-19 13:53:13″,”actual_departure_date”:”2020-12-19 13:52:19″,”passenger_key”:7111654,”origin_station_key”:234,”德stination_station_key”:833,”booking_channel_key”:5}
null – {“scheduled_departure_time”:“2020-12-19 13:52:22”,“actual_departure_date”:“2020-12-19 13:52:17”,“passenger_key”:2847474,“origin_station_key”:763,” destination_station_key”:206,”booking_channel_key”:3}

client Topic

Flink SQL

CREATE TEMPORARY TABLE passengers_faker
WITH (
  'connector' = 'faker',
  'fields.passenger_key.expression' = '#{number.numberBetween ''0'',''10000000''}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'fields.first_name.expression' = '#{Name.firstName}',
  'fields.last_name.expression' = '#{Name.lastName}',
  'rows-per-second' = '1000'
) LIKE passengers (EXCLUDING OPTIONS);

INSERT INTO passengers SELECT * FROM passengers_faker;

Kafka Topic


➜  bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic passengers --property print.key=true --property key.separator=" - "
749049 - {"passenger_key":"749049","first_name":"Booker","last_name":"Hackett","update_time":"2020-12-19 14:02:32"}
7065702 - {"passenger_key":"7065702","first_name":"Jeramy","last_name":"Breitenberg","update_time":"2020-12-19 14:02:38"}
3690329 - {"passenger_key":"3690329","first_name":"Quiana","last_name":"Macejkovic","update_time":"2020-12-19 14:02:27"}
1212728 - {"passenger_key":"1212728","first_name":"Lawerence","last_name":"Simonis","update_time":"2020-12-19 14:02:27"}
6993699 - {"passenger_key":"6993699","first_name":"Ardelle","last_name":"Frami","update_time":"2020-12-19 14:02:19"}
stations Topic

Flink SQL

CREATE TEMPORARY TABLE stations_faker
WITH (
  'connector' = 'faker',
  'fields.station_key.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.city.expression' = '#{Address.city}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE stations (EXCLUDING OPTIONS);

INSERT INTO stations SELECT * FROM stations_faker;

Kafka Topic

➜  bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stations --property print.key=true --property key.separator=" - "
80 - {"station_key":"80","update_time":"2020-12-19 13:59:20","city":"Harlandport"}
33 - {"station_key":"33","update_time":"2020-12-19 13:59:12","city":"North Georgine"}
369 - {"station_key":"369","update_time":"2020-12-19 13:59:12","city":"Tillmanhaven"}
580 - {"station_key":"580","update_time":"2020-12-19 13:59:12","city":"West Marianabury"}
616 - {"station_key":"616","update_time":"2020-12-19 13:59:09","city":"West Sandytown"}

Flink SQL

CREATE TEMPORARY TABLE booking_channels_faker
WITH (
  'connector' = 'faker',
  'fields.booking_channel_key.expression' = '#{number.numberBetween ''0'',''7''}',
  'fields.channel.expression' = '#{regexify ''(bahn\.de|station|retailer|app|lidl|hotline|joyn){1}''}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE booking_channels (EXCLUDING OPTIONS);

INSERT INTO booking_channels SELECT * FROM booking_channels_faker;

Kafka Topic➜ bin

./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic booking_channels –property print.key=true –property key.separator=” – ”
1 – {“booking_channel_key”:”1″,”update_time”:”2020-12-19 13:57:05″,”channel”:”joyn”}
0 – {“booking_channel_key”:”0″,”update_time”:”2020-12-19 13:57:17″,”channel”:”station”}
4 – {“booking_channel_key”:”4″,”update_time”:”2020-12-19 13:57:15″,”channel”:”joyn”}
2 – {“booking_channel_key”:”2″,”update_time”:”2020-12-19 13:57:02″,”channel”:”app”}
1 – {“booking_channel_key”:”1″,”update_time”:”2020-12-19 13:57:06″,”channel”:”retailer”}

摘要在本文中,您了解了非时态表之间的连接压缩和压缩的 Kafka 主题,以及实时星型模式反规范化。您还了解了如何使用 Flink SQL 为这两种类型的场景编写查询。我们鼓励您在 Ververica Platform 上运行这些示例。您可以按照以下简单步骤安装平台。要了解有关 Flink SQL 的更多信息,请查看以下资源:Flink SQL Cookbook 入门 – Ververica Platform 上的 Flink SQL Flink SQL 官方文档 Flink Forward Talk: One SQL, Unified AnalyticsOnly SQL: Empower data Analyst end – 使用 Flink SQL 结束

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 4,2023

Flink CDC v2.3 发布

Flink CDC 是一种基于数据库变更日志的变更数据捕获(CDC)技术。它是一个数据集成框架,支持读取数据库快照并平滑切换到读取binlog(包含数据库中数据和结构的所有更改的记录的二进制日志)。这对于捕获提交的更改并将其从数据库传播到下游使用者非常有用,并有助于保持多个数据存储同步并避免双重写入。凭借强大的 Flink pipeline 及其丰富的上下游生态系统,Flink CDC 可以高效实现海量数据的实时集成。Flink CDC 作为下一代实时数据集成框架,具有无锁读取、并行读取等技术优势、表模式自动同步、分布式架构。它还有自己的独立文档,您可以在这里找到。

Flink CDC 开源 2 年多来,Flink CDC 社区发展迅速,目前已有 76 名贡献者,7 名维护者,钉钉用户群超过 7800 名用户。在社区的共同努力下,Flink CDC 2.3.0从代码分布来看,我们可以看到MySQL CDC、MongoDB CDC、Oracle CDC、增量快照框架(flink-cdc-base)、文档模块等方面的新特性和改进。特性,这篇博文将回顾此版本中的主要改进和核心特性以及未来的发展。关键特性和改进出于本文的目的,我们将探讨此版本的四个最重要的特性。

DB2 CDC 简介

ConnectorDB2是IBM开发的关系型数据库管理系统。 DB2 CDC 连接器可以捕获 DB2 数据库中表的行级更改。 DB2 基于 ASN Capture/Apply 代理启用 SQL Replication,它为捕获模式下的表生成变更数据表,并将变更事件存储在变更数据表中。 DB2 CDC Connector 首先通过 JDBC 读取表中的历史数据,然后从变更数据表中读取增量变更数据。 MongoDB CDC 和 Oracle CDC Connector 的增量快照算法支持在 Flink CDC 2.3 版本中,MongoDB CDC Connector 和 Oracle CDC Connector 对接 Flink CDC 增量快照框架,实现增量快照算法。这意味着现在它们支持无锁读取、并行读取和检查点。现在,我们有更多支持增量快照算法的 Flink CDC 源。社区还计划未来将更多的连接器迁移到增量快照框架。 MySQL CDC Connector 的稳定性改进作为 Flink CDC 项目中最受欢迎的连接器,MySQL CDC Connector 在 2.3 版本中引入了许多高级功能,并且具有许多性能和稳定性方面的改进。支持从特定偏移量开始此连接器现在支持从 binlog 的指定位置开始作业。您可以通过时间戳、binlog 偏移量或 binlog gtid 指定起始 binlog 位置。您还可以将其设置为从最早的binlog偏移量开始。 chunk分割算法的优化您现在可以在快照阶段优化chunk分割算法。当前的同步算法改为异步,并且可以选择主键中的一列作为 chunk 分割算法的分割列。拆分过程支持检查点,解决了快照阶段同步分块阻塞导致的性能问题。稳定性改进连接器现在支持将所有字符集映射到 Flink SQL,解锁更多用户场景。可以处理不同类型的默认值,提高作业对不规则DDL的容忍度,并自动获取数据库服务器的时区,解决时区问题。性能改进该版本重点优化内存和读取性能,减少JobManager的内存占用TaskManager 通过 JobManager 中的元复用和 TaskManager 中的流读取进行改进。同时,通过优化 binlog 解析逻辑,提高 binlog 读取性能。 其他改进 Flink CDC 2.3 版本兼容 Flink 四大版本(1.13、1.14、1.15、1.16)。这大大降低了用户的升级和维护成本。OceanBase CDC Connector修复了时区问题,将所有数据类型映射到Flink SQL,并提供更多选项以实现更灵活的配置,例如新增的“table-list”配置用于读取多个 OceanBase 表。MongoDB CDC 连接器支持更多数据类型,并优化捕获表的过滤过程。TiDB CDC 连接器修复快照阶段后切换的数据丢失问题,并支持读取期间的区域切换。Postgres CDC 连接器支持几何类型,更多添加了选项,可以配置changelog模式来过滤数据。SQL Server CDC连接器支持更多的SQL Server版本,并完善了文档。MySQL CDC和OceanBase CDC连接器包括中文文档以及OceanBase CDC连接器的视频教程。未来计划Flink CDC的开发可以如果没有社区的贡献和反馈以及维护者的开源精神,就不可能实现这一目标。目前,Flink CDC 社区已经在制定 2.4 版本的计划。欢迎所有用户和贡献者参与并提供反馈。该项目的主要方向将来自以下几个方面:完善的数据源 – 我们计划支持更多的数据源,并将更多的连接器迁移到增量快照框架,以解锁无锁读取和并行读取。可观察性改进 – 我们希望提供读取限速功能,减少快照阶段数据库的查询压力。新版本将提供更丰富的监控指标,让用户获取任务进度相关指标,监控任务状态。 性能改进——新版本中快照阶段支持使用批处理模式,这将提高快照阶段和发布的性能快照阶段后自动空闲读者的资源。 可用性改进 – 提高连接器的易用性,例如简化开箱即用的选项并在 DataStream API 中提供示例。Ververica Platform 计划在 2.11 版本中支持 Flink CDC。

致谢:感谢为 Flink CDC 2.3 版本做出贡献的所有 49 位社区贡献者,特别是社区的四位维护者(阮航、孙家宝、龚忠强、任庆生)为本次发布做出了出色的工作。贡献者名单: 01410172, Amber Moe, Dezhi Cai, Enoch, Hang Ruan, He Wang, Jiajia, Jiabao Sun, Junwang Zhao, Kyle Dong, Leonard Xu, Matrix42, Paul Lin, Qingsheng Ren, Qishangzhong, Rinka, Sergey Nuyanzin, Tigran Manasyan, Camelus 、dujie、ehui、embcl、fbad、gongzhongqiang、hehuiyuan、hele.kc、hsldymq、jiabao.sun、legendtkl、leixin、leozlliang、lidoudou1993、lincoln lee、lxxawfl、lzshlzsh、molsion、molsionmo、pacino、rookiegao、天际线、晴朗、 vanliu、wangminchao、wangxiaojing、xieyi888、yurunchuan、zhmin、阿阳、莫贤斌

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 4,2023

关于 PyFlink 你需要了解的一切

PyFlink 作为 Apache Flink 的 Python API,为用户提供了用 Python 开发 Flink 程序并将其部署到 Flink 集群上的媒介。在这篇文章中,我们将从以下几个方面介绍 PyFlink:PyFlink 基本作业的结构和相关的一些基础知识PyFlink作业的运行机制、高层架构及其内部工作原理PyFlink的基本性能优化策略PyFlink的未来预测通过本文的结束,您应该对PyFlink及其潜在应用有一个牢固的掌握。发现自己需要实时计算解决方案,例如实时 ETL、实时特征工程、实时数据仓库、实时预测,并且您熟悉 Python 语言或想要使用一些方便的 Python 库在此过程中,PyFlink 是一个很好的起点,因为它融合了 Flink 和 Python 的世界。PyFlink 于 2019 年在 Flink 1.9 中首次引入 Flink。这个首个版本仅提供有限的功能。从那时起,Flink 社区一直致力于不断增强 PyFlink。经过近四年的努力发展,已日趋成熟。目前,它包含 Flink Java API 中的大部分功能。此外,PyFlink 还专门提供了多种功能,例如 Python 用户定义函数支持等。 PyFlink 入门PyFlink 已集成到当前版本的 Ververica Platform 中。如果您想体验 PyFlink 的功能并在支持 Kuberbetes 的环境中工作,您可以免费下载社区版并在几分钟内启动 aminikubeplayground。如果您更喜欢使用普通 Flink,那么您可以从 PyPI 安装 PyFlink: $ pip install apache-flink 对于最新的 Flink 1.17,您需要高于 Python 3.6 的 Python 版本,最高可达 Python 3.10; Flink 1.16 支持 Python 3.6 到 3.9 版本。请注意,Python/PyFlink 必须可用于集群中的每个节点。最灵活的方法是在提交 PyFlink 作业时传入 Python 环境,但如果您有很多深度的 Python 依赖项,那么将 Python 环境预安装到每个集群节点可能会更简单。您也可以从源代码构建 PyFlink ,如果您维护自己的 Flink 分支或需要挑选尚未发布的提交,您可能会想要这样做。 PyFlink 的 Flink 基础知识如果您是 Flink 新手,那么有一些基本概念很好理解,其中也与 PyFlink 相关:Flink 提供两种不同的 API,过程性且相对较低级别的 DataStream API 和关系/声明性表 API。不要被它们的名字误导:这两个 API 都可以应用于流处理或批处理,并且都具有 PyFlink API。Flink 是一个分布式计算引擎。除了在处理过程中提供即时上下文的状态之外,它没有任何存储空间。假设数据从外部数据源流向(通常但不是必需的)外部数据接收器。 Flink/PyFlink 作业至少需要一个数据源。任何 Flink/PyFlink 应用程序的核心都是从源数据计算所需结果的数据转换,这可能涉及数据重塑或采样、合并和丰富、比较或建模、处理事务,或者您可能想要对无界数据流或海量数据集执行计算的无数其他方式。定义数据源和接收器任何 PyFlink 作业的第一步都是定义数据源,以及可选的数据接收器执行结果将被写入。PyFlink完全支持Table API和DataStream API。这两个 API 都提供了多种不同的方式来定义源和接收器,单个作业可以组合这两个 API,例如在 Table API 读取和 DataStream API 写入之间进行转换,或者在 DataStream API 读取和 Table API 写入之间进行转换。下面是一个典型的读写示例对于每个 API。这些示例假设 Kafka 流提供源/接收器。

使用 Table API 从 Kafka 读取:

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.create_temporary_table(
    'kafka_source',
    TableDescriptor.for_connector('kafka')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.STRING())
                .build())
        .option('properties.bootstrap.servers', 'localhost:9092')
        .option('properties.group.id', 'my-group')
        .option('topic', 'input-topic')
        .option('scan.startup.mode', 'earliest-offset')
        .option('value.format', 'json')
        .build())

table = t_env.from_path("kafka_source")

使用DataStream API从Kafka读取:

source = KafkaSource.builder() \
    .set_bootstrap_servers("localhost:9092") \
    .set_topics("input-topic") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(
        JsonRowDeserializationSchema.builder()
        .type_info(Types.ROW([Types.LONG(), Types.STRING()]))
        .build()) \
    .build()

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

使用 Table API 写入 Kafka:

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.create_temporary_table(
    'kafka_sink',
    TableDescriptor.for_connector('kafka')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.STRING())
                .build())
        .option('properties.bootstrap.servers', 'localhost:9092')
        .option('topic', 'output-topic')
        .option('value.format', 'json')
        .build())

table.execute_insert('kafka_sink')

使用DataStream API写入Kafka:

sink = KafkaSink.builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic("topic-name")
            .set_value_serialization_schema(
                JsonRowSerializationSchema.builder()
                .with_type_info(Types.ROW([Types.LONG(), Types.STRING()]))
                .build())
            .build()
    ) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()

ds.sink_to(sink)

请参阅 Apache Table API 文档以了解有关表 API 连接器的更多详细信息,并参阅 Apache DataStream API 文档以了解有关 DataStream API 连接器的更多详细信息。 Apache API 转换文档展示了如何组合 Table API/DataStream API 读/写。有几点需要注意:Table API 示例将源/接收器属性定义为键/值对。所有 Table API 连接器都遵循该模式。要使用不同的连接器,或者定义 PyFlink 中未正式支持的新连接器,只需配置适当的键/值对。DataStream API 连接器不太常规;每个连接器都提供一堆完全不同的 API。请参阅特定连接器页面以查看提供了哪些 API。要使用 PyFlink 不支持的连接器,您需要为相应的 Java API 编写 Python 包装器,请参阅支持的连接器以获取示例。 转换 这两个 API 都支持多种转换。 DataStream API 包括以下功能: 映射:将一个元素转换为另一个平面映射:将一个元素作为输入并生成零个、一个或多个元素过滤器:对每个元素计算布尔函数并过滤掉返回 false 的元素聚合:累积多个元素窗口:将元素分组到不同的窗口中并为每个组执行计算连接:连接两个不同的元素流,允许在两个流进程之间共享状态:与平面地图类似,但是更灵活,因为它允许访问低级操作,例如广播:将一个流广播到另一个流的所有子任务 边输出:除了主流之外,还产生额外的边输出结果 流async io:PyFlink 中仍然不支持此功能。Table API 是一种关系型 API,具有类似 SQL 的风格。它包括以下功能: 投影:类似于DataStream中的map API过滤器:类似于DataStream中的过滤器 API聚合:类似于SQL GROUP BY,对分组键上的元素进行分组,并对每个组进行聚合窗口聚合:将元素分组到不同的窗口中并进行聚合对于每个窗口常规连接:与 SQL JOIN 类似,连接两个流查找(流表)连接:使用静态表连接流时间连接:使用版本化表连接流,类似于查找连接,但是,它允许在以下位置连接表时间窗口连接:连接属于同一窗口的两个流的元素间隔连接:在时间限制下连接两个流的元素topn和windowedtopn:按列排序的N个最小或最大值重复数据删除和窗口重复数据删除:删除在一组列上重复的元素模式识别:检测一个流中特定模式的元素同样需要注意一些事项:如果您需要对转换进行细粒度控制或访问低级功能,例如定时器、状态等,选择DataStream API。否则,在大多数情况下,Table API 是一个不错的选择。Table API 还支持直接执行 SQL 查询,提供对当前无法通过 API 提供的功能的访问,例如重复数据删除、模式识别、topn 等。虽然 API 会继续增长,但使用 SQL 提供了立即的解决方案。作业提交Flink 是一个分布式计算引擎,它在独立集群中执行 Flink/PyFlink 作业。Flink 作业是延迟执行的;您必须明确提交作业以供执行。这有点不同来自许多 Python 用户习惯的更具交互性/探索性的脚本风格。例如,如果您有一个由 Python 脚本 word_count.py 定义的 PyFlink 作业,您可以使用 $python word_count.py 通过 Flink 控制台在本地执行它,或者通过在Flink IDE中右键执行。 Flink 将启动一个迷你 Flink 集群,该集群在单个进程中运行并执行 PyFlink 作业。您还可以使用 Flink 的命令行工具将 PyFlink 作业提交到远程集群。下面是一个简单的示例,展示了如何将 PyFlink 作业提交到远程集群。用于执行的 Apache YARN 集群:

./bin/flink run-application -t yarn-application \
      -Djobmanager.memory.process.size=1024m \
      -Dtaskmanager.memory.process.size=1024m \
      -Dyarn.application.name=<ApplicationName> \
      -Dyarn.ship-files=/path/to/shipfiles \
      -pyarch shipfiles/venv.zip \
      -pyclientexec venv.zip/venv/bin/python3 \
      -pyexec venv.zip/venv/bin/python3 \
      -pyfs shipfiles \
      -pym word_count

有关 Flink 中作业提交的更多信息,请参阅 Apache 文档。您可以在 PyFlink 博文的 LINK 中阅读有关如何定义和运行 Python 脚本作为 PyFlink 作业的更多信息。调试和日志记录,一开始会执行 Python 用户定义的函数在作业启动期间启动的单独的 Python 进程中。这并不容易调试,用户必须对 Python 用户定义函数进行一些更改才能实现远程调试。从 Flink 1.14 开始,它支持在客户端的同一个 Python 进程中以本地方式执行 Python 用户定义函数。用户可以在任何想要调试的地方设置断点,例如PyFlink 框架代码、Python 用户定义函数等。这使得调试 PyFlink 作业变得非常容易,就像调试任何其他常用的 Python 程序一样。用户还可以在 Python 用户定义函数中使用日志记录来进行调试。应该注意的是,日志消息将出现在 TaskManager 的日志文件中,而不是 console.import 日志中

import logging

@udf(result_type=DataTypes.BIGINT())
def add(i, j):
    logging.info("i: " + i + ", j: " + j)
    return i + j

此外,它还支持Python用户定义函数中的Metrics。这对于长时间运行的程序非常有用,可用于监视特定的统计信息和配置警报。管理依赖关系对于生产作业,您几乎肯定需要引用第三方 Python 库。您可能还需要使用其 jar 文件不属于 Flink 发行版的数据连接器 – 例如 Kafka、HBase、Hive 和 Elasticsearch 的连接器未捆绑在 Flink 发行版中。因为 PyFlink 作业在分布式集群中执行,依赖关系也需要跨集群进行管理。 PyFlink 提供了多种管理依赖关系的方法。JAR 文件您可以将 JAR 文件包含在 PyFlink 作业中:# Table API
t_env.get_config().set(“pipeline.jars”, “file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar”)

# 数据流API
env.add_jars(“file:///my/jar/path/connector1.jar”, “file:///my/jar/path/connector2.jar”)

您必须包含所有传递依赖项。对于连接器,使用名称通常包含sql的fat JAR,例如flink-sql-connector-kafka-1.16.0.jar,对于Kafka连接器优先使用flink-connector-kafka-1.16.0.jar。第三方Python库添加Python PyFlink venv 虚拟环境的依赖:# Table API
t_env.add_python_file(文件路径)

# 数据流API
env.add_python_file(file_path)包含指定库的环境将在执行期间分布在集群节点上。压缩的 Python 库如果需要包含大量 Python 库,最好将它们以存档形式传递到虚拟环境环境:#表API
t_env.add_python_archive(archive_path=”/path/to/venv.zip”)
t_env.get_config().set_python_executable(“venv.zip/venv/bin/python3”)

# 数据流API
env.add_python_archive(archive_path=”/path/to/venv.zip”)
env.set_python_executable(“venv.zip/venv/bin/python3”)

命令行配置您还可以在命令行上配置依赖项,为您提供额外的灵活性:依赖项类型配置命令行选项Jar Packagepipeline.jarspipeline.classpaths–jarfilePythonlibrariespython.files- pyfsPython

虚拟环境python.archivespython.executablepython.client.executable-pyarch-pyexec-pyclientexecPython 要求python.requirements-pyreq 有关更多详细信息,请参阅 Apache PyFlink 文档中的 Python 依赖管理。 其他提示 与 Python 本身一样,PyFlink 提供了极大的灵活性和适应性。当您探索 API 时,这里有一些有用的提示。使用 Open() 进行初始化如果您的 Python 代码依赖于大量资源,

例如机器学习模型,在作业初始化期间使用 open() 加载一次:

# DataStream API
class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def map(self, value):
        return self.model.predict(value)


# Table API
class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE())

这种简单的方法会导致资源被序列化并与Python函数本身一起分发,并在每次调用时加载;使用 open() 确保它只加载一次。 WatermarksWatermarks 触发特定运算符的计算,例如事件时间启用时的窗口、模式识别等。请务必定义水印生成器,否则您的作业可能没有输出。PyFlink 为您提供了几种不同的方式来定义水印生成器:SQL DDL:请参阅Watermark 部分了解更多详细信息。Table API:请参阅此示例以了解更多详细信息。DataStream API:请参阅有关更多详细信息,请参阅此示例。如果您的水印生成器已正确定义,但水印未按预期前进,则可能您的作业没有足够的数据。如果您的测试样本较小,则在测试过程中可能会出现这种情况。尝试将作业的并行度设置为 1 或配置源空闲以解决测试阶段的问题。有关水印行为的更多信息,请参阅“及时流处理”。Flink Web UI Web UI 是丰富的信息源 – 显示作业运行了多长时间、是否有任何异常、每个算子的输入/输出元素的数量等. 如何访问取决于部署模式: 本地:Web端口随机设置。您可以在日志文件中找到它

/path/to/python-installation-directory/lib/python3.7/site-packages/pyflink/log/.local.log):INFOorg.apache.flink.runtime .dispatcher.DispatcherRestEndpoint [] –

Web 前端监听 http://localhost:55969。

Standalone:通过配置rest.port 配置,默认为 8081。

Apache YARN:从 YARN 资源管理器的 Web Ui 中,找到与PyFlink 作业,然后单击“Tracking UI”列下的链接。Kubernetes:Web UI 可能通过以下任何一项公开:ClusterIP、NodePort 和 LoadBalancer。有关更多详细信息,请参阅 Kubernetes 文档。架构和内部结构一些背景了解可能会帮助您回答以下问题:Python API 和 Java API 之间有什么区别,我应该使用哪一个?如何在 PyFlink 中使用自定义连接器?在哪里可以找到打印的日志消息Python中的用户定义函数?如何调优PyFlink作业的性能?注意,这里我们不会谈论基本的Flink概念,例如Flink的架构、状态流处理、事件时间和水印,这些在Flink官方中都有详细描述架构图PyFlink 由两个主要部分组成:作业编译:将 PyFlink 程序转换为 JobGraph 作业执行:接受 JobGraph 并将其转换为以分布式方式运行的 Flink 算子图 PyFlink 的架构作业编译将 JobGraph 视为之间的协议一个客户端和一个 Flink 集群。它包含执行作业所需的所有必要信息:表示用户想要执行的处理逻辑的转换图作业的名称和配置执行作业所需的依赖项,例如JAR文件、Python依赖等 目前JobGraph还没有多语言支持,仅支持Java。 PyFlink 通过利用 Py4J 复用 Java API 现有的作业编译栈,使运行在 Python 进程中的 Python 程序能够访问 JVM 中的 Java 对象。方法的调用就像 Java 对象驻留在 Python 进程中一样。每个 Java API 都由相应的 Python API 包装。当Python程序进行PyFlink API调用时,会在JVM中创建相应的Java对象并调用其方法。在内部,它会在JVM中创建相应的Java对象,然后在Java对象上调用相应的API。因此它复用了与 Java API 相同的作业编译堆栈。这意味着:如果您使用 PyFlink Table API 但仅执行 Java 代码,那么性能应该与 Java Table API 相同如果您想要使用一个 Java 类,例如自定义连接器,PyFlink 中尚不支持,您可以自己包装它来执行作业,大多数情况下,包装 Java API 效果很好。然而,也有一些例外情况。

我们看下面的例子:

source = KafkaSource.builder() \
    .set_bootstrap_servers("localhost:9092") \
    .set_topics("input-topic") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(
        JsonRowDeserializationSchema.builder()
        .type_info(Types.ROW([Types.LONG(), Types.STRING()]))
        .build()) \
    .build()

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds.map(lambda x: x[1]).print()
env.execute()

这里,除了formap()传递一个lambda函数ds.map(lambda x: x[1])之外,所有Python方法都可以映射到Flink的Java API。 Java 需要一个 Java MapFunction。为了在 Java 中实现此功能,我们需要序列化 ​​lambda x: x[1] 并用 Java 包装器对象包装它,该对象会生成一个 Python 进程以在作业执行期间执行它。Flink 和 PyFlink 运算符在执行期间,Flink 作业由Flink 算子系列。每个运算符接受来自上游运算符的输入,对其进行转换并向下游运算符产生输出。对于处理逻辑为Python的转换,将生成特定的Python算子:在初始化阶段,该算子将生成一个Python进程,并将元数据(即要执行的Python函数)发送到Python进程,在接收到来自上游算子的数据后,操作符会将其发送到Python进程执行。数据异步发送到Python进程;该运算符不会等到接收到一个数据项的执行结果后才发送下一个数据项。该运算符支持访问 Python 状态,但 Python 运算符运行在 JVM 中。与数据通信不同,状态访问是同步的。状态可以缓存在Python进程中以提高性能。Python运算符还支持在Python函数中使用日志记录。日志消息被发送到在 JVM 中运行的 Python 算子,因此消息最终会出现在 TaskManager 的日志文件中。请注意:Python 函数将在作业编译期间进行序列化,并在作业执行期间进行反序列化。保持资源使用较少(请参阅上面有关使用 open() 的注释),并且仅使用可序列化的实例变量。多个 Python 函数将尽可能链接起来,以避免不必要的序列化/反序列化以及通信开销。线程模式下启动 Python 函数在大多数情况下,单独的进程运行良好,但也有一些例外情况:额外的序列化/反序列化和通信开销可能是大数据的问题,例如图像处理,其中图像尺寸可能非常大、​​长字符串等。进程间通信也意味着延迟可能更高。另外Python算子通常需要缓冲数据来提高网络性能,这会增加更多的延迟。额外的进程和进程间通信给稳定性带来了挑战。为了解决这些问题,Flink 1.15引入了线程模式作为执行Python函数的选项在 JVM 中。默认情况下线程模式是禁用的;要使用它,请配置 python.execution-mode: thread。启用线程模式后,Python 函数的执行方式与进程模式下非常不同:一次处理一行数据,这会增加延迟。但是,序列化/反序列化和通信开销被消除注意,线程模式有特定的限制,这就是为什么它默认不启用:它只支持CPython解释器,因为它依赖于CPython运行时来执行Python函数。因为CPython运行时只能在进程中加载​​一次,线程模式不能很好地支持会话模式,其中多个作业可能需要使用单独的 Python 解释器,有关线程模式的更多详细信息,请参阅博客文章探索 PyFlink 中的线程模式。状态访问和检查点 Python 函数支持状态访问。本示例使用 state 来计算每组的平均值:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class Average(MapFunction):

    def __init__(self):
        self.sum_state = None
        self.cnt_state = None

    def open(self, runtime_context: RuntimeContext):
        self.sum_state = runtime_context.get_state(ValueStateDescriptor("sum", Types.INT()))
        self.cnt_state = runtime_context.get_state(ValueStateDescriptor("cnt", Types.INT()))

   def map(self, value):
        # access the state value
        sum = self.sum_state.value()
        if sum is None:
            sum = 0

        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        sum += value[1]
        cnt += 1

        # update the state
        self.sum_state.update(sum)
        self.cnt_state.update(cnt)

        return value[0], sum / cnt


env = StreamExecutionEnvironment.get_execution_environment()
env.from_collection([(1, 3), (1, 5), (1, 7), (2, 4), (2, 2)]) \
   .key_by(lambda row: row[0]) \
   .map(Average()) \
   .print()

env.execute()

这里sum_state和cnt_state都是PyFlink状态对象。状态可以在作业执行期间访问,也可以在作业故障转移后恢复:从上图可以看出:状态的真实来源是运行在 JVM 中的 Python Operator 从用户角度来看状态访问是同步的引入了以下优化提高状态访问的性能:异步写入:维护最新状态和状态修改的 LRU 缓存,并将其异步写回到 Python Operator 延迟读取:与 LRU 缓存一样,MapState 也会延迟读取,以避免不必要的状态请求性能调优一般调整 PyFlink 作业与调整 Flink Java 作业相同。一个例外是调整 Python 运算符性能。内存调整Python 运算符启动一个单独的 Python 进程来执行 Python 函数。依赖大量资源的 Python 函数可能会占用大量内存。如果为 Python 进程配置的内存过少,则会影响作业的稳定性。如果 PyFlink 作业运行在严格要求的 Kubernetes 或 Apache YARN 部署中限制内存使用,Python进程可能会因为内存需求超出限制而崩溃。你需要仔细设计你的Python代码。此外,使用以下配置选项来帮助调整 Python 内存使用情况:taskmanager.memory.process.size:TaskExecutors 的总进程内存大小。taskmanager.memory. Managed.fraction:用作托管内存的总内存部分。 (Python 进程的内存也是托管内存的一部分)taskmanager.memory.jvm-overhead.fraction:为 JVM 开销保留的总内存比例。 (未显式使用的保留内存)taskmanager.memory.driven.consumer-weights:不同类型消费者的托管内存权重。此配置可用于调整分配给 Python 进程的托管内存的比例。Bundle Size 在进程模式下,Python 运算符批量向 Python 进程发送数据。为了提高网络性能,它在发送数据之前缓冲数据。t.在检查点期间,它必须等待所有缓冲数据被处理。如果一个batch中有很多元素并且Python处理逻辑效率低下,那么检查点时间将会延长。如果您发现检查点很长甚至失败,请尝试调整包大小配置python.fn-execution.bundle.size。执行模式在数据量很大或需要减少延迟的情况下,线程模式可以提高性能。设置配置 python.execution-mode: thread 来启用它。 PyFlink 的下一步是什么 PyFlink 已经具有丰富的功能。在其发展的下一阶段,社区的重点将是:更好地支持交互式编程,例如仅检索无界表的少数前导行。提高了易用性,例如使 API 更加 Pythonic,改进文档,并添加更多示例。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 3,2023

CDH Yarn WebUI没有显示最近的FINISHED Applications

在跑spark任务时,CDH6.3.2的Yarn WebUI没有显示最近的FINISHED Applications,想查看已跑完的spark看不到,非常不方便。

CDH(Cloudera Distribution for Hadoop)是一个大数据处理平台,其中的YARN(Yet Another Resource Negotiator)是用于资源管理和任务调度的核心组件。您提到在YARN的WEBUI中,”FINISHED Applications”(已完成的应用程序)没有显示最新的记录。这可能由多种原因导致。

  1. 刷新问题: 有时候,Web界面可能由于不正确的缓存或其他问题而未能及时刷新,导致显示的信息不是最新的。
  2. 日志滚动策略: YARN的日志滚动策略可能会导致已完成的应用程序的日志被压缩或删除,从而影响了Web界面的展示。
  3. 数据清理策略: 可能在CDH配置中设置了数据清理策略,这可能会导致一些旧的已完成应用程序被自动清理,从而在Web界面中不再显示。

解决方案:

以下是一些可能的解决方案,您可以尝试进行排查和解决问题:

  1. 手动刷新界面: 首先尝试手动刷新Web界面,看看是否能够获取到最新的数据。有时候,简单的刷新就可以解决问题。
  2. 检查日志设置: 查看CDH集群中YARN的日志滚动和保留策略。您可以调整这些设置,确保日志文件得到正确保留并且不会被过早清理。
  3. 调整数据清理策略: 如果CDH集群中配置了自动数据清理策略,您可以考虑调整清理策略,以便保留更多的已完成应用程序数据。
  4. 查看日志和错误信息: 查看YARN的日志文件以及相关的错误信息,看看是否有任何与已完成应用程序信息显示相关的问题。
  5. 升级和修复: 如果问题仍然存在,可能需要考虑升级CDH版本或者应用程序,以获取更稳定的Web界面功能。
  6. 联系支持: 如果您尝试了以上方法仍无法解决问题,可以考虑联系CDH的技术支持团队寻求进一步帮助。

看了Yarn的配置,觉得都正常,重启了Yarn组件,重新跑任务
FINISHED Applications 有显示最新任务了。仔细查看原来是之前没有用倒序,默认是顺序排列,最新的没有显示在第一页。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 2,2023

spark dataframe 如何从Seq 集合select 所有的列

要从一个 Seq 集合中选择所有的列,你可以使用 Spark DataFrame 的 select 方法,并将 Seq 集合作为参数传递给该方法。假设你已经创建了一个名为 dataframe 的 Spark DataFrame,其中包含多个列,现在你想从一个 Seq 集合中选择所有的列,可以按照以下步骤进行操作:

import org.apache.spark.sql.functions.col 
// 假设你有一个 Seq 集合,其中包含你要选择的列名
val columnsToSelect = Seq("col1", "col2", "col3", ...)
// 使用 select 方法,并将 Seq 集合中的列名转换为对应的列对象
val selectedDataFrame = dataframe.select(columnsToSelect.map(col): _*)

上述代码中,col 是 Spark 的函数,用于将列名转换为列对象。通过将 Seq 集合中的列名映射为列对象,并在 select 方法中使用 : _* 来展开参数,你可以选择所有在 Seq 集合中指定的列。

现在,selectedDataFrame 中将包含来自 dataframe 中指定的所有列。

作者 east

上一 1 … 3 4

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.