关于 PyFlink 你需要了解的一切

PyFlink 作为 Apache Flink 的 Python API,为用户提供了用 Python 开发 Flink 程序并将其部署到 Flink 集群上的媒介。在这篇文章中,我们将从以下几个方面介绍 PyFlink:PyFlink 基本作业的结构和相关的一些基础知识PyFlink作业的运行机制、高层架构及其内部工作原理PyFlink的基本性能优化策略PyFlink的未来预测通过本文的结束,您应该对PyFlink及其潜在应用有一个牢固的掌握。发现自己需要实时计算解决方案,例如实时 ETL、实时特征工程、实时数据仓库、实时预测,并且您熟悉 Python 语言或想要使用一些方便的 Python 库在此过程中,PyFlink 是一个很好的起点,因为它融合了 Flink 和 Python 的世界。PyFlink 于 2019 年在 Flink 1.9 中首次引入 Flink。这个首个版本仅提供有限的功能。从那时起,Flink 社区一直致力于不断增强 PyFlink。经过近四年的努力发展,已日趋成熟。目前,它包含 Flink Java API 中的大部分功能。此外,PyFlink 还专门提供了多种功能,例如 Python 用户定义函数支持等。 PyFlink 入门PyFlink 已集成到当前版本的 Ververica Platform 中。如果您想体验 PyFlink 的功能并在支持 Kuberbetes 的环境中工作,您可以免费下载社区版并在几分钟内启动 aminikubeplayground。如果您更喜欢使用普通 Flink,那么您可以从 PyPI 安装 PyFlink: $ pip install apache-flink 对于最新的 Flink 1.17,您需要高于 Python 3.6 的 Python 版本,最高可达 Python 3.10; Flink 1.16 支持 Python 3.6 到 3.9 版本。请注意,Python/PyFlink 必须可用于集群中的每个节点。最灵活的方法是在提交 PyFlink 作业时传入 Python 环境,但如果您有很多深度的 Python 依赖项,那么将 Python 环境预安装到每个集群节点可能会更简单。您也可以从源代码构建 PyFlink ,如果您维护自己的 Flink 分支或需要挑选尚未发布的提交,您可能会想要这样做。 PyFlink 的 Flink 基础知识如果您是 Flink 新手,那么有一些基本概念很好理解,其中也与 PyFlink 相关:Flink 提供两种不同的 API,过程性且相对较低级别的 DataStream API 和关系/声明性表 API。不要被它们的名字误导:这两个 API 都可以应用于流处理或批处理,并且都具有 PyFlink API。Flink 是一个分布式计算引擎。除了在处理过程中提供即时上下文的状态之外,它没有任何存储空间。假设数据从外部数据源流向(通常但不是必需的)外部数据接收器。 Flink/PyFlink 作业至少需要一个数据源。任何 Flink/PyFlink 应用程序的核心都是从源数据计算所需结果的数据转换,这可能涉及数据重塑或采样、合并和丰富、比较或建模、处理事务,或者您可能想要对无界数据流或海量数据集执行计算的无数其他方式。定义数据源和接收器任何 PyFlink 作业的第一步都是定义数据源,以及可选的数据接收器执行结果将被写入。PyFlink完全支持Table API和DataStream API。这两个 API 都提供了多种不同的方式来定义源和接收器,单个作业可以组合这两个 API,例如在 Table API 读取和 DataStream API 写入之间进行转换,或者在 DataStream API 读取和 Table API 写入之间进行转换。下面是一个典型的读写示例对于每个 API。这些示例假设 Kafka 流提供源/接收器。

使用 Table API 从 Kafka 读取:

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.create_temporary_table(
    'kafka_source',
    TableDescriptor.for_connector('kafka')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.STRING())
                .build())
        .option('properties.bootstrap.servers', 'localhost:9092')
        .option('properties.group.id', 'my-group')
        .option('topic', 'input-topic')
        .option('scan.startup.mode', 'earliest-offset')
        .option('value.format', 'json')
        .build())

table = t_env.from_path("kafka_source")

使用DataStream API从Kafka读取:

source = KafkaSource.builder() \
    .set_bootstrap_servers("localhost:9092") \
    .set_topics("input-topic") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(
        JsonRowDeserializationSchema.builder()
        .type_info(Types.ROW([Types.LONG(), Types.STRING()]))
        .build()) \
    .build()

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

使用 Table API 写入 Kafka:

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.create_temporary_table(
    'kafka_sink',
    TableDescriptor.for_connector('kafka')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.STRING())
                .build())
        .option('properties.bootstrap.servers', 'localhost:9092')
        .option('topic', 'output-topic')
        .option('value.format', 'json')
        .build())

table.execute_insert('kafka_sink')

使用DataStream API写入Kafka:

sink = KafkaSink.builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic("topic-name")
            .set_value_serialization_schema(
                JsonRowSerializationSchema.builder()
                .with_type_info(Types.ROW([Types.LONG(), Types.STRING()]))
                .build())
            .build()
    ) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()

ds.sink_to(sink)

请参阅 Apache Table API 文档以了解有关表 API 连接器的更多详细信息,并参阅 Apache DataStream API 文档以了解有关 DataStream API 连接器的更多详细信息。 Apache API 转换文档展示了如何组合 Table API/DataStream API 读/写。有几点需要注意:Table API 示例将源/接收器属性定义为键/值对。所有 Table API 连接器都遵循该模式。要使用不同的连接器,或者定义 PyFlink 中未正式支持的新连接器,只需配置适当的键/值对。DataStream API 连接器不太常规;每个连接器都提供一堆完全不同的 API。请参阅特定连接器页面以查看提供了哪些 API。要使用 PyFlink 不支持的连接器,您需要为相应的 Java API 编写 Python 包装器,请参阅支持的连接器以获取示例。 转换 这两个 API 都支持多种转换。 DataStream API 包括以下功能: 映射:将一个元素转换为另一个平面映射:将一个元素作为输入并生成零个、一个或多个元素过滤器:对每个元素计算布尔函数并过滤掉返回 false 的元素聚合:累积多个元素窗口:将元素分组到不同的窗口中并为每个组执行计算连接:连接两个不同的元素流,允许在两个流进程之间共享状态:与平面地图类似,但是更灵活,因为它允许访问低级操作,例如广播:将一个流广播到另一个流的所有子任务 边输出:除了主流之外,还产生额外的边输出结果 流async io:PyFlink 中仍然不支持此功能。Table API 是一种关系型 API,具有类似 SQL 的风格。它包括以下功能: 投影:类似于DataStream中的map API过滤器:类似于DataStream中的过滤器 API聚合:类似于SQL GROUP BY,对分组键上的元素进行分组,并对每个组进行聚合窗口聚合:将元素分组到不同的窗口中并进行聚合对于每个窗口常规连接:与 SQL JOIN 类似,连接两个流查找(流表)连接:使用静态表连接流时间连接:使用版本化表连接流,类似于查找连接,但是,它允许在以下位置连接表时间窗口连接:连接属于同一窗口的两个流的元素间隔连接:在时间限制下连接两个流的元素topn和windowedtopn:按列排序的N个最小或最大值重复数据删除和窗口重复数据删除:删除在一组列上重复的元素模式识别:检测一个流中特定模式的元素同样需要注意一些事项:如果您需要对转换进行细粒度控制或访问低级功能,例如定时器、状态等,选择DataStream API。否则,在大多数情况下,Table API 是一个不错的选择。Table API 还支持直接执行 SQL 查询,提供对当前无法通过 API 提供的功能的访问,例如重复数据删除、模式识别、topn 等。虽然 API 会继续增长,但使用 SQL 提供了立即的解决方案。作业提交Flink 是一个分布式计算引擎,它在独立集群中执行 Flink/PyFlink 作业。Flink 作业是延迟执行的;您必须明确提交作业以供执行。这有点不同来自许多 Python 用户习惯的更具交互性/探索性的脚本风格。例如,如果您有一个由 Python 脚本 word_count.py 定义的 PyFlink 作业,您可以使用 $python word_count.py 通过 Flink 控制台在本地执行它,或者通过在Flink IDE中右键执行。 Flink 将启动一个迷你 Flink 集群,该集群在单个进程中运行并执行 PyFlink 作业。您还可以使用 Flink 的命令行工具将 PyFlink 作业提交到远程集群。下面是一个简单的示例,展示了如何将 PyFlink 作业提交到远程集群。用于执行的 Apache YARN 集群:

./bin/flink run-application -t yarn-application \
      -Djobmanager.memory.process.size=1024m \
      -Dtaskmanager.memory.process.size=1024m \
      -Dyarn.application.name=<ApplicationName> \
      -Dyarn.ship-files=/path/to/shipfiles \
      -pyarch shipfiles/venv.zip \
      -pyclientexec venv.zip/venv/bin/python3 \
      -pyexec venv.zip/venv/bin/python3 \
      -pyfs shipfiles \
      -pym word_count

有关 Flink 中作业提交的更多信息,请参阅 Apache 文档。您可以在 PyFlink 博文的 LINK 中阅读有关如何定义和运行 Python 脚本作为 PyFlink 作业的更多信息。调试和日志记录,一开始会执行 Python 用户定义的函数在作业启动期间启动的单独的 Python 进程中。这并不容易调试,用户必须对 Python 用户定义函数进行一些更改才能实现远程调试。从 Flink 1.14 开始,它支持在客户端的同一个 Python 进程中以本地方式执行 Python 用户定义函数。用户可以在任何想要调试的地方设置断点,例如PyFlink 框架代码、Python 用户定义函数等。这使得调试 PyFlink 作业变得非常容易,就像调试任何其他常用的 Python 程序一样。用户还可以在 Python 用户定义函数中使用日志记录来进行调试。应该注意的是,日志消息将出现在 TaskManager 的日志文件中,而不是 console.import 日志中

import logging

@udf(result_type=DataTypes.BIGINT())
def add(i, j):
    logging.info("i: " + i + ", j: " + j)
    return i + j

此外,它还支持Python用户定义函数中的Metrics。这对于长时间运行的程序非常有用,可用于监视特定的统计信息和配置警报。管理依赖关系对于生产作业,您几乎肯定需要引用第三方 Python 库。您可能还需要使用其 jar 文件不属于 Flink 发行版的数据连接器 – 例如 Kafka、HBase、Hive 和 Elasticsearch 的连接器未捆绑在 Flink 发行版中。因为 PyFlink 作业在分布式集群中执行,依赖关系也需要跨集群进行管理。 PyFlink 提供了多种管理依赖关系的方法。JAR 文件您可以将 JAR 文件包含在 PyFlink 作业中:# Table API
t_env.get_config().set(“pipeline.jars”, “file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar”)

# 数据流API
env.add_jars(“file:///my/jar/path/connector1.jar”, “file:///my/jar/path/connector2.jar”)

您必须包含所有传递依赖项。对于连接器,使用名称通常包含sql的fat JAR,例如flink-sql-connector-kafka-1.16.0.jar,对于Kafka连接器优先使用flink-connector-kafka-1.16.0.jar。第三方Python库添加Python PyFlink venv 虚拟环境的依赖:# Table API
t_env.add_python_file(文件路径)

# 数据流API
env.add_python_file(file_path)包含指定库的环境将在执行期间分布在集群节点上。压缩的 Python 库如果需要包含大量 Python 库,最好将它们以存档形式传递到虚拟环境环境:#表API
t_env.add_python_archive(archive_path=”/path/to/venv.zip”)
t_env.get_config().set_python_executable(“venv.zip/venv/bin/python3”)

# 数据流API
env.add_python_archive(archive_path=”/path/to/venv.zip”)
env.set_python_executable(“venv.zip/venv/bin/python3”)

命令行配置您还可以在命令行上配置依赖项,为您提供额外的灵活性:依赖项类型配置命令行选项Jar Packagepipeline.jarspipeline.classpaths–jarfilePythonlibrariespython.files- pyfsPython

虚拟环境python.archivespython.executablepython.client.executable-pyarch-pyexec-pyclientexecPython 要求python.requirements-pyreq 有关更多详细信息,请参阅 Apache PyFlink 文档中的 Python 依赖管理。 其他提示 与 Python 本身一样,PyFlink 提供了极大的灵活性和适应性。当您探索 API 时,这里有一些有用的提示。使用 Open() 进行初始化如果您的 Python 代码依赖于大量资源,

例如机器学习模型,在作业初始化期间使用 open() 加载一次:

# DataStream API
class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def map(self, value):
        return self.model.predict(value)


# Table API
class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE())

这种简单的方法会导致资源被序列化并与Python函数本身一起分发,并在每次调用时加载;使用 open() 确保它只加载一次。 WatermarksWatermarks 触发特定运算符的计算,例如事件时间启用时的窗口、模式识别等。请务必定义水印生成器,否则您的作业可能没有输出。PyFlink 为您提供了几种不同的方式来定义水印生成器:SQL DDL:请参阅Watermark 部分了解更多详细信息。Table API:请参阅此示例以了解更多详细信息。DataStream API:请参阅有关更多详细信息,请参阅此示例。如果您的水印生成器已正确定义,但水印未按预期前进,则可能您的作业没有足够的数据。如果您的测试样本较小,则在测试过程中可能会出现这种情况。尝试将作业的并行度设置为 1 或配置源空闲以解决测试阶段的问题。有关水印行为的更多信息,请参阅“及时流处理”。Flink Web UI Web UI 是丰富的信息源 – 显示作业运行了多长时间、是否有任何异常、每个算子的输入/输出元素的数量等. 如何访问取决于部署模式: 本地:Web端口随机设置。您可以在日志文件中找到它

/path/to/python-installation-directory/lib/python3.7/site-packages/pyflink/log/.local.log):INFOorg.apache.flink.runtime .dispatcher.DispatcherRestEndpoint [] –

Web 前端监听 http://localhost:55969。

Standalone:通过配置rest.port 配置,默认为 8081。

Apache YARN:从 YARN 资源管理器的 Web Ui 中,找到与PyFlink 作业,然后单击“Tracking UI”列下的链接。Kubernetes:Web UI 可能通过以下任何一项公开:ClusterIP、NodePort 和 LoadBalancer。有关更多详细信息,请参阅 Kubernetes 文档。架构和内部结构一些背景了解可能会帮助您回答以下问题:Python API 和 Java API 之间有什么区别,我应该使用哪一个?如何在 PyFlink 中使用自定义连接器?在哪里可以找到打印的日志消息Python中的用户定义函数?如何调优PyFlink作业的性能?注意,这里我们不会谈论基本的Flink概念,例如Flink的架构、状态流处理、事件时间和水印,这些在Flink官方中都有详细描述架构图PyFlink 由两个主要部分组成:作业编译:将 PyFlink 程序转换为 JobGraph 作业执行:接受 JobGraph 并将其转换为以分布式方式运行的 Flink 算子图 PyFlink 的架构作业编译将 JobGraph 视为之间的协议一个客户端和一个 Flink 集群。它包含执行作业所需的所有必要信息:表示用户想要执行的处理逻辑的转换图作业的名称和配置执行作业所需的依赖项,例如JAR文件、Python依赖等 目前JobGraph还没有多语言支持,仅支持Java。 PyFlink 通过利用 Py4J 复用 Java API 现有的作业编译栈,使运行在 Python 进程中的 Python 程序能够访问 JVM 中的 Java 对象。方法的调用就像 Java 对象驻留在 Python 进程中一样。每个 Java API 都由相应的 Python API 包装。当Python程序进行PyFlink API调用时,会在JVM中创建相应的Java对象并调用其方法。在内部,它会在JVM中创建相应的Java对象,然后在Java对象上调用相应的API。因此它复用了与 Java API 相同的作业编译堆栈。这意味着:如果您使用 PyFlink Table API 但仅执行 Java 代码,那么性能应该与 Java Table API 相同如果您想要使用一个 Java 类,例如自定义连接器,PyFlink 中尚不支持,您可以自己包装它来执行作业,大多数情况下,包装 Java API 效果很好。然而,也有一些例外情况。

我们看下面的例子:

source = KafkaSource.builder() \
    .set_bootstrap_servers("localhost:9092") \
    .set_topics("input-topic") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(
        JsonRowDeserializationSchema.builder()
        .type_info(Types.ROW([Types.LONG(), Types.STRING()]))
        .build()) \
    .build()

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds.map(lambda x: x[1]).print()
env.execute()

这里,除了formap()传递一个lambda函数ds.map(lambda x: x[1])之外,所有Python方法都可以映射到Flink的Java API。 Java 需要一个 Java MapFunction。为了在 Java 中实现此功能,我们需要序列化 ​​lambda x: x[1] 并用 Java 包装器对象包装它,该对象会生成一个 Python 进程以在作业执行期间执行它。Flink 和 PyFlink 运算符在执行期间,Flink 作业由Flink 算子系列。每个运算符接受来自上游运算符的输入,对其进行转换并向下游运算符产生输出。对于处理逻辑为Python的转换,将生成特定的Python算子:在初始化阶段,该算子将生成一个Python进程,并将元数据(即要执行的Python函数)发送到Python进程,在接收到来自上游算子的数据后,操作符会将其发送到Python进程执行。数据异步发送到Python进程;该运算符不会等到接收到一个数据项的执行结果后才发送下一个数据项。该运算符支持访问 Python 状态,但 Python 运算符运行在 JVM 中。与数据通信不同,状态访问是同步的。状态可以缓存在Python进程中以提高性能。Python运算符还支持在Python函数中使用日志记录。日志消息被发送到在 JVM 中运行的 Python 算子,因此消息最终会出现在 TaskManager 的日志文件中。请注意:Python 函数将在作业编译期间进行序列化,并在作业执行期间进行反序列化。保持资源使用较少(请参阅上面有关使用 open() 的注释),并且仅使用可序列化的实例变量。多个 Python 函数将尽可能链接起来,以避免不必要的序列化/反序列化以及通信开销。线程模式下启动 Python 函数在大多数情况下,单独的进程运行良好,但也有一些例外情况:额外的序列化/反序列化和通信开销可能是大数据的问题,例如图像处理,其中图像尺寸可能非常大、​​长字符串等。进程间通信也意味着延迟可能更高。另外Python算子通常需要缓冲数据来提高网络性能,这会增加更多的延迟。额外的进程和进程间通信给稳定性带来了挑战。为了解决这些问题,Flink 1.15引入了线程模式作为执行Python函数的选项在 JVM 中。默认情况下线程模式是禁用的;要使用它,请配置 python.execution-mode: thread。启用线程模式后,Python 函数的执行方式与进程模式下非常不同:一次处理一行数据,这会增加延迟。但是,序列化/反序列化和通信开销被消除注意,线程模式有特定的限制,这就是为什么它默认不启用:它只支持CPython解释器,因为它依赖于CPython运行时来执行Python函数。因为CPython运行时只能在进程中加载​​一次,线程模式不能很好地支持会话模式,其中多个作业可能需要使用单独的 Python 解释器,有关线程模式的更多详细信息,请参阅博客文章探索 PyFlink 中的线程模式。状态访问和检查点 Python 函数支持状态访问。本示例使用 state 来计算每组的平均值:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class Average(MapFunction):

    def __init__(self):
        self.sum_state = None
        self.cnt_state = None

    def open(self, runtime_context: RuntimeContext):
        self.sum_state = runtime_context.get_state(ValueStateDescriptor("sum", Types.INT()))
        self.cnt_state = runtime_context.get_state(ValueStateDescriptor("cnt", Types.INT()))

   def map(self, value):
        # access the state value
        sum = self.sum_state.value()
        if sum is None:
            sum = 0

        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        sum += value[1]
        cnt += 1

        # update the state
        self.sum_state.update(sum)
        self.cnt_state.update(cnt)

        return value[0], sum / cnt


env = StreamExecutionEnvironment.get_execution_environment()
env.from_collection([(1, 3), (1, 5), (1, 7), (2, 4), (2, 2)]) \
   .key_by(lambda row: row[0]) \
   .map(Average()) \
   .print()

env.execute()

这里sum_state和cnt_state都是PyFlink状态对象。状态可以在作业执行期间访问,也可以在作业故障转移后恢复:从上图可以看出:状态的真实来源是运行在 JVM 中的 Python Operator 从用户角度来看状态访问是同步的引入了以下优化提高状态访问的性能:异步写入:维护最新状态和状态修改的 LRU 缓存,并将其异步写回到 Python Operator 延迟读取:与 LRU 缓存一样,MapState 也会延迟读取,以避免不必要的状态请求性能调优一般调整 PyFlink 作业与调整 Flink Java 作业相同。一个例外是调整 Python 运算符性能。内存调整Python 运算符启动一个单独的 Python 进程来执行 Python 函数。依赖大量资源的 Python 函数可能会占用大量内存。如果为 Python 进程配置的内存过少,则会影响作业的稳定性。如果 PyFlink 作业运行在严格要求的 Kubernetes 或 Apache YARN 部署中限制内存使用,Python进程可能会因为内存需求超出限制而崩溃。你需要仔细设计你的Python代码。此外,使用以下配置选项来帮助调整 Python 内存使用情况:taskmanager.memory.process.size:TaskExecutors 的总进程内存大小。taskmanager.memory. Managed.fraction:用作托管内存的总内存部分。 (Python 进程的内存也是托管内存的一部分)taskmanager.memory.jvm-overhead.fraction:为 JVM 开销保留的总内存比例。 (未显式使用的保留内存)taskmanager.memory.driven.consumer-weights:不同类型消费者的托管内存权重。此配置可用于调整分配给 Python 进程的托管内存的比例。Bundle Size 在进程模式下,Python 运算符批量向 Python 进程发送数据。为了提高网络性能,它在发送数据之前缓冲数据。t.在检查点期间,它必须等待所有缓冲数据被处理。如果一个batch中有很多元素并且Python处理逻辑效率低下,那么检查点时间将会延长。如果您发现检查点很长甚至失败,请尝试调整包大小配置python.fn-execution.bundle.size。执行模式在数据量很大或需要减少延迟的情况下,线程模式可以提高性能。设置配置 python.execution-mode: thread 来启用它。 PyFlink 的下一步是什么 PyFlink 已经具有丰富的功能。在其发展的下一阶段,社区的重点将是:更好地支持交互式编程,例如仅检索无界表的少数前导行。提高了易用性,例如使 API 更加 Pythonic,改进文档,并添加更多示例。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627