gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

年度归档2023

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 2023
  • ( 页面7 )
Flink 8月 4,2023

Flink SQL 连接 – 第 2 部分

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了额外的工具,用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器,以及操作生成的长时间运行的查询。我们已经看到 Flink SQL 有很多用例,我们很高兴看到什么你将用它来建造。在这个由三部分组成的博客文章系列中,我们将向您展示 Flink SQL 中不同类型的联接以及如何使用它们以多种方式处理数据。

在本文中,您将学习:什么是非压缩和压缩 Kafka 主题什么是时态表如何执行非压缩和压缩 Kafka 主题之间的时态表连接什么是实时星型模式反规范化如何执行实时星型模式反规范化时态表连接与非压缩和压缩 Kafka 主题Apache Kafka 最基本的组织单元是主题,它类似于关系数据库中的表。 Kafka 主题可以是压缩的,也可以是非压缩的。压缩主题会在特定时间段内保留所有消息,即使它们已被删除。非压缩主题仅保留未删除的消息。为了连接压缩和非压缩主题,您可以使用时态表连接。时态表是随时间变化的表。这在 Flink 中也称为动态表。时态/动态表中的行与一个或多个时态周期相关联。时态表包含一个或多个版本化表快照。时态表联接是一项功能,允许将两个不同时态表中的数据通过公共键联接在一起,并将第二个表中的数据自动插入到第一个表中在适当的时间段或版本化表中的相关版本。当集成来自多个源的数据或处理随时间变化的数据时,这非常有用。这也意味着可以通过不断变化的元数据来丰富表,并在某一时间点检索其值。如何在非压缩和压缩 Kafka 主题之间执行时态表连接此示例将展示如何正确丰富一个 Kafka 主题中的记录当事件顺序很重要时,与另一个 Kafka 主题的相应记录相结合。临时表连接采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 SQL 语法 FOR SYSTEM_TIME AS OF 来执行此操作。在本节中,您将把每笔交易(transactions)连接到截至交易发生时的正确货币汇率(currency_rates,一个版本化表)。

类似的示例是将每个订单与订单发生时的客户详细信息连接起来。这正是事件时间时态表连接的作用。 Flink SQL 中的时态表连接在两个表之间存在无序和任意时间偏差的情况下提供了正确的、确定性的结果。transactions 和currency_rates 表都由 Kafka 主题支持,但在利率的情况下,该主题被压缩(例如当更新的费率流入时,仅保留给定密钥的最新消息)。事务中的记录仅被解释为插入,因此该表由标准 Kafka 连接器 (connector=kafka) 支持;而currency_rates中的记录需要解释为基于主键的upsert,这需要Upsert Kafka连接器(connector=upsert-kafka)。Flink SQL:

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  t.total,
  c.currency_code,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;

Data Generators

这两个主题也使用 Flink SQL 作业填充。我们使用faker连接器根据Java Faker表达式在内存中生成行并将它们写入相应的Kafka主题。

currency_rates Topic

Flink SQL

CREATE TEMPORARY TABLE currency_rates_faker
WITH (
  'connector' = 'faker',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
  'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE currency_rates (EXCLUDING OPTIONS);

INSERT INTO currency_rates SELECT * FROM currency_rates_faker;

Kafka Topic

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topiccurrency_rates –property print.key=true –property key.separator=” – ”
HTG – {“currency_code”:”HTG”,”eur_rate”:0.0136,”rate_time”:”2020-12-16 22:22:02″}
BZD – {“currency_code”:”BZD”,”eur_rate”:1.6545,”rate_time”:”2020-12-16 22:22:03″}
BZD – {“currency_code”:”BZD”,”eur_rate”:3.616,”rate_time”:”2020-12-16 22:22:10″}
BHD – {“currency_code”:”BHD”,”eur_rate”:4.5308,”rate_time”:”2020-12-16 22:22:05″}
KHR – {“currency_code”:”KHR”,”eur_rate”:1.335,”rate_time”:”2020-12-16 22:22:06″}

交易

TopicFlink SQL

CREATE TEMPORARY TABLE transactions_faker
WITH (
  'connector' = 'faker',
  'fields.id.expression' = '#{Internet.UUID}',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
  'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE transactions (EXCLUDING OPTIONS);

INSERT INTO transactions SELECT * FROM transactions_faker;

Kafka Topic

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic transactions –property print.key=true –property key.separator=” – ”
e102e91f-47b9-434e-86e1-34fb1196d91d – {“id”:”e102e91f-47b9-434e-86e1-34fb1196d91d”,”currency_code”:”SGD”,”total”:494.07,”transaction_time”:”2020-12- 16 22:18:46″}
bf028363-5ee4-4a5a-9068-b08392d59f0b – {“id”:”bf028363-5ee4-4a5a-9068-b08392d59f0b”,”currency_code”:”EEK”,”total”:906.8,”transaction_time”:”2020-12- 16 22:18:46″}
e22374b5-82da-4c6d-b4c6-f27a818a58ab – {“id”:”e22374b5-82da-4c6d-b4c6-f27a818a58ab”,”currency_code”:”GYD”,”total”:80.66,”transaction_time”:”2020-12- 16 22:19:02″}
81b2ce89-26c2-4df3-b12a-8ca921902ac4 – {“id”:”81b2ce89-26c2-4df3-b12a-8ca921902ac4″,”currency_code”:”EGP”,”total”:521.98,”transaction_time”:”2020-12- 16 22:18:57″}
53c4fd3f-af6e-41d3-a677-536f4c86e010 – {“id”:”53c4fd3f-af6e-41d3-a677-536f4c86e010″,”currency_code”:”UYU”,”total”:936.26,”transaction_time”:”2020-12- 16 22:18:59″}

实时星型模式非规范化(N 路联接)实时星型模式非规范化是在星型模式中连接两个或多个表的过程,以便结果表中的数据非规范化。这样做可以是出于性能方面的原因,也可以是为了更轻松地查询数据,或者两者兼而有之。通过减少需要执行的联接数量,非规范化可用于提高联接大量表的查询的性能。它还可以通过提供一个包含所有数据的表来更轻松地查询数据,否则这些数据将分布在多个表中。

非规范化过程可以应用于任何模式,但最常用于星型模式,这些模式具有一个中心事实表,周围有许多维度表。事实表包含正在分析的数据,维度表包含可用于描述事实表中数据的数据。对星型模式进行非规范化时,维度表中的数据将合并到事实表中。如何执行实时星型模式非规范化此示例将展示如何使用 n 路时态表连接对简单的星型模式进行非规范化。星型模式是数据仓库中数据标准化的一种流行方法。星型模式的中心是一个事实表,其行包含度量、测量和有关世界的其他事实。周围的事实表是一个或多个维度表,这些表具有在计算查询时可用于丰富事实的元数据。想象一下,您正在为一家铁路公司运行一个小型数据仓库,其中包含一个事实表 (train_activity) 和三个维度表(车站、预订通道和乘客) )。对事实表的所有插入以及对维度表的所有更新都将镜像到 Apache Kafka。

事实表中的记录仅被解释为插入,因此该表由标准 Kafka 连接器 (connector=kafka) 支持;。相反,维度表中的记录是基于 prim 的更新插入ary 键,这需要 Upsert Kafka 连接器(connector=upsert-kafka)。使用 Flink SQL,您现在可以使用 5 路时态表连接轻松地将所有维度连接到我们的事实表。临时表连接采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 SQL 语法 FOR SYSTEM_TIME AS OF 来执行此操作。将事实表与更多(缓慢)变化的维度表连接时,使用时态表连接可以获得一致、可重现的结果。每个事件(事实表中的行)根据事件在现实世界中发生的时间连接到每个维度的相应值。

CREATE TEMPORARY TABLE passengers (
  passenger_key STRING,
  first_name STRING,
  last_name STRING,
  update_time TIMESTAMP(3),
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (passenger_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'passengers',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE stations (
  station_key STRING,
  update_time TIMESTAMP(3),
  city STRING,
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (station_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'stations',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE booking_channels (
  booking_channel_key STRING,
  update_time TIMESTAMP(3),
  channel STRING,
  WATERMARK FOR update_time AS update_time - INTERVAL '10' SECONDS,
  PRIMARY KEY (booking_channel_key) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'booking_channels',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE train_activities (
  scheduled_departure_time TIMESTAMP(3),
  actual_departure_date TIMESTAMP(3),
  passenger_key STRING,
  origin_station_key STRING,
  destination_station_key STRING,
  booking_channel_key STRING,
  WATERMARK FOR actual_departure_date AS actual_departure_date - INTERVAL '10' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'train_activities',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

SELECT
  t.actual_departure_date,
  p.first_name,
  p.last_name,
  b.channel,
  os.city AS origin_station,
  ds.city AS destination_station
FROM train_activities t
LEFT JOIN booking_channels FOR SYSTEM_TIME AS OF t.actual_departure_date AS b
ON t.booking_channel_key = b.booking_channel_key;
LEFT JOIN passengers FOR SYSTEM_TIME AS OF t.actual_departure_date AS p
ON t.passenger_key = p.passenger_key
LEFT JOIN stations FOR SYSTEM_TIME AS OF t.actual_departure_date AS os
ON t.origin_station_key = os.station_key
LEFT JOIN stations FOR SYSTEM_TIME AS OF t.actual_departure_date AS ds
ON t.destination_station_key = ds.station_key;

Kafka 主题

➜ bin ./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic train_activities –property print.key=true –property key.separator=” – ”

null – {“scheduled_departure_time”:“2020-12-19 13:52:37”,“actual_departure_date”:“2020-12-19 13:52:16”,“passenger_key”:7014937,“origin_station_key”:577,” destination_station_key”:862,”booking_channel_key”:2}
null – {“scheduled_departure_time”:“2020-12-19 13:52:38”,“actual_departure_date”:“2020-12-19 13:52:23”,“passenger_key”:2244807,“origin_station_key”:735,” destination_station_key”:739,”booking_channel_key”:2}
null – {“scheduled_departure_time”:“2020-12-19 13:52:46”,“actual_departure_date”:“2020-12-19 13:52:18”,“passenger_key”:2605313,“origin_station_key”:216,” destination_station_key”:453,”booking_channel_key”:3}
null – {“scheduled_departure_time”:”2020-12-19 13:53:13″,”actual_departure_date”:”2020-12-19 13:52:19″,”passenger_key”:7111654,”origin_station_key”:234,”德stination_station_key”:833,”booking_channel_key”:5}
null – {“scheduled_departure_time”:“2020-12-19 13:52:22”,“actual_departure_date”:“2020-12-19 13:52:17”,“passenger_key”:2847474,“origin_station_key”:763,” destination_station_key”:206,”booking_channel_key”:3}

client Topic

Flink SQL

CREATE TEMPORARY TABLE passengers_faker
WITH (
  'connector' = 'faker',
  'fields.passenger_key.expression' = '#{number.numberBetween ''0'',''10000000''}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'fields.first_name.expression' = '#{Name.firstName}',
  'fields.last_name.expression' = '#{Name.lastName}',
  'rows-per-second' = '1000'
) LIKE passengers (EXCLUDING OPTIONS);

INSERT INTO passengers SELECT * FROM passengers_faker;

Kafka Topic


➜  bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic passengers --property print.key=true --property key.separator=" - "
749049 - {"passenger_key":"749049","first_name":"Booker","last_name":"Hackett","update_time":"2020-12-19 14:02:32"}
7065702 - {"passenger_key":"7065702","first_name":"Jeramy","last_name":"Breitenberg","update_time":"2020-12-19 14:02:38"}
3690329 - {"passenger_key":"3690329","first_name":"Quiana","last_name":"Macejkovic","update_time":"2020-12-19 14:02:27"}
1212728 - {"passenger_key":"1212728","first_name":"Lawerence","last_name":"Simonis","update_time":"2020-12-19 14:02:27"}
6993699 - {"passenger_key":"6993699","first_name":"Ardelle","last_name":"Frami","update_time":"2020-12-19 14:02:19"}
stations Topic

Flink SQL

CREATE TEMPORARY TABLE stations_faker
WITH (
  'connector' = 'faker',
  'fields.station_key.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.city.expression' = '#{Address.city}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE stations (EXCLUDING OPTIONS);

INSERT INTO stations SELECT * FROM stations_faker;

Kafka Topic

➜  bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stations --property print.key=true --property key.separator=" - "
80 - {"station_key":"80","update_time":"2020-12-19 13:59:20","city":"Harlandport"}
33 - {"station_key":"33","update_time":"2020-12-19 13:59:12","city":"North Georgine"}
369 - {"station_key":"369","update_time":"2020-12-19 13:59:12","city":"Tillmanhaven"}
580 - {"station_key":"580","update_time":"2020-12-19 13:59:12","city":"West Marianabury"}
616 - {"station_key":"616","update_time":"2020-12-19 13:59:09","city":"West Sandytown"}

Flink SQL

CREATE TEMPORARY TABLE booking_channels_faker
WITH (
  'connector' = 'faker',
  'fields.booking_channel_key.expression' = '#{number.numberBetween ''0'',''7''}',
  'fields.channel.expression' = '#{regexify ''(bahn\.de|station|retailer|app|lidl|hotline|joyn){1}''}',
  'fields.update_time.expression' = '#{date.past ''10'',''5'',''SECONDS''}',
  'rows-per-second' = '100'
) LIKE booking_channels (EXCLUDING OPTIONS);

INSERT INTO booking_channels SELECT * FROM booking_channels_faker;

Kafka Topic➜ bin

./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic booking_channels –property print.key=true –property key.separator=” – ”
1 – {“booking_channel_key”:”1″,”update_time”:”2020-12-19 13:57:05″,”channel”:”joyn”}
0 – {“booking_channel_key”:”0″,”update_time”:”2020-12-19 13:57:17″,”channel”:”station”}
4 – {“booking_channel_key”:”4″,”update_time”:”2020-12-19 13:57:15″,”channel”:”joyn”}
2 – {“booking_channel_key”:”2″,”update_time”:”2020-12-19 13:57:02″,”channel”:”app”}
1 – {“booking_channel_key”:”1″,”update_time”:”2020-12-19 13:57:06″,”channel”:”retailer”}

摘要在本文中,您了解了非时态表之间的连接压缩和压缩的 Kafka 主题,以及实时星型模式反规范化。您还了解了如何使用 Flink SQL 为这两种类型的场景编写查询。我们鼓励您在 Ververica Platform 上运行这些示例。您可以按照以下简单步骤安装平台。要了解有关 Flink SQL 的更多信息,请查看以下资源:Flink SQL Cookbook 入门 – Ververica Platform 上的 Flink SQL Flink SQL 官方文档 Flink Forward Talk: One SQL, Unified AnalyticsOnly SQL: Empower data Analyst end – 使用 Flink SQL 结束

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 4,2023

Flink CDC v2.3 发布

Flink CDC 是一种基于数据库变更日志的变更数据捕获(CDC)技术。它是一个数据集成框架,支持读取数据库快照并平滑切换到读取binlog(包含数据库中数据和结构的所有更改的记录的二进制日志)。这对于捕获提交的更改并将其从数据库传播到下游使用者非常有用,并有助于保持多个数据存储同步并避免双重写入。凭借强大的 Flink pipeline 及其丰富的上下游生态系统,Flink CDC 可以高效实现海量数据的实时集成。Flink CDC 作为下一代实时数据集成框架,具有无锁读取、并行读取等技术优势、表模式自动同步、分布式架构。它还有自己的独立文档,您可以在这里找到。

Flink CDC 开源 2 年多来,Flink CDC 社区发展迅速,目前已有 76 名贡献者,7 名维护者,钉钉用户群超过 7800 名用户。在社区的共同努力下,Flink CDC 2.3.0从代码分布来看,我们可以看到MySQL CDC、MongoDB CDC、Oracle CDC、增量快照框架(flink-cdc-base)、文档模块等方面的新特性和改进。特性,这篇博文将回顾此版本中的主要改进和核心特性以及未来的发展。关键特性和改进出于本文的目的,我们将探讨此版本的四个最重要的特性。

DB2 CDC 简介

ConnectorDB2是IBM开发的关系型数据库管理系统。 DB2 CDC 连接器可以捕获 DB2 数据库中表的行级更改。 DB2 基于 ASN Capture/Apply 代理启用 SQL Replication,它为捕获模式下的表生成变更数据表,并将变更事件存储在变更数据表中。 DB2 CDC Connector 首先通过 JDBC 读取表中的历史数据,然后从变更数据表中读取增量变更数据。 MongoDB CDC 和 Oracle CDC Connector 的增量快照算法支持在 Flink CDC 2.3 版本中,MongoDB CDC Connector 和 Oracle CDC Connector 对接 Flink CDC 增量快照框架,实现增量快照算法。这意味着现在它们支持无锁读取、并行读取和检查点。现在,我们有更多支持增量快照算法的 Flink CDC 源。社区还计划未来将更多的连接器迁移到增量快照框架。 MySQL CDC Connector 的稳定性改进作为 Flink CDC 项目中最受欢迎的连接器,MySQL CDC Connector 在 2.3 版本中引入了许多高级功能,并且具有许多性能和稳定性方面的改进。支持从特定偏移量开始此连接器现在支持从 binlog 的指定位置开始作业。您可以通过时间戳、binlog 偏移量或 binlog gtid 指定起始 binlog 位置。您还可以将其设置为从最早的binlog偏移量开始。 chunk分割算法的优化您现在可以在快照阶段优化chunk分割算法。当前的同步算法改为异步,并且可以选择主键中的一列作为 chunk 分割算法的分割列。拆分过程支持检查点,解决了快照阶段同步分块阻塞导致的性能问题。稳定性改进连接器现在支持将所有字符集映射到 Flink SQL,解锁更多用户场景。可以处理不同类型的默认值,提高作业对不规则DDL的容忍度,并自动获取数据库服务器的时区,解决时区问题。性能改进该版本重点优化内存和读取性能,减少JobManager的内存占用TaskManager 通过 JobManager 中的元复用和 TaskManager 中的流读取进行改进。同时,通过优化 binlog 解析逻辑,提高 binlog 读取性能。 其他改进 Flink CDC 2.3 版本兼容 Flink 四大版本(1.13、1.14、1.15、1.16)。这大大降低了用户的升级和维护成本。OceanBase CDC Connector修复了时区问题,将所有数据类型映射到Flink SQL,并提供更多选项以实现更灵活的配置,例如新增的“table-list”配置用于读取多个 OceanBase 表。MongoDB CDC 连接器支持更多数据类型,并优化捕获表的过滤过程。TiDB CDC 连接器修复快照阶段后切换的数据丢失问题,并支持读取期间的区域切换。Postgres CDC 连接器支持几何类型,更多添加了选项,可以配置changelog模式来过滤数据。SQL Server CDC连接器支持更多的SQL Server版本,并完善了文档。MySQL CDC和OceanBase CDC连接器包括中文文档以及OceanBase CDC连接器的视频教程。未来计划Flink CDC的开发可以如果没有社区的贡献和反馈以及维护者的开源精神,就不可能实现这一目标。目前,Flink CDC 社区已经在制定 2.4 版本的计划。欢迎所有用户和贡献者参与并提供反馈。该项目的主要方向将来自以下几个方面:完善的数据源 – 我们计划支持更多的数据源,并将更多的连接器迁移到增量快照框架,以解锁无锁读取和并行读取。可观察性改进 – 我们希望提供读取限速功能,减少快照阶段数据库的查询压力。新版本将提供更丰富的监控指标,让用户获取任务进度相关指标,监控任务状态。 性能改进——新版本中快照阶段支持使用批处理模式,这将提高快照阶段和发布的性能快照阶段后自动空闲读者的资源。 可用性改进 – 提高连接器的易用性,例如简化开箱即用的选项并在 DataStream API 中提供示例。Ververica Platform 计划在 2.11 版本中支持 Flink CDC。

致谢:感谢为 Flink CDC 2.3 版本做出贡献的所有 49 位社区贡献者,特别是社区的四位维护者(阮航、孙家宝、龚忠强、任庆生)为本次发布做出了出色的工作。贡献者名单: 01410172, Amber Moe, Dezhi Cai, Enoch, Hang Ruan, He Wang, Jiajia, Jiabao Sun, Junwang Zhao, Kyle Dong, Leonard Xu, Matrix42, Paul Lin, Qingsheng Ren, Qishangzhong, Rinka, Sergey Nuyanzin, Tigran Manasyan, Camelus 、dujie、ehui、embcl、fbad、gongzhongqiang、hehuiyuan、hele.kc、hsldymq、jiabao.sun、legendtkl、leixin、leozlliang、lidoudou1993、lincoln lee、lxxawfl、lzshlzsh、molsion、molsionmo、pacino、rookiegao、天际线、晴朗、 vanliu、wangminchao、wangxiaojing、xieyi888、yurunchuan、zhmin、阿阳、莫贤斌

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 4,2023

关于 PyFlink 你需要了解的一切

PyFlink 作为 Apache Flink 的 Python API,为用户提供了用 Python 开发 Flink 程序并将其部署到 Flink 集群上的媒介。在这篇文章中,我们将从以下几个方面介绍 PyFlink:PyFlink 基本作业的结构和相关的一些基础知识PyFlink作业的运行机制、高层架构及其内部工作原理PyFlink的基本性能优化策略PyFlink的未来预测通过本文的结束,您应该对PyFlink及其潜在应用有一个牢固的掌握。发现自己需要实时计算解决方案,例如实时 ETL、实时特征工程、实时数据仓库、实时预测,并且您熟悉 Python 语言或想要使用一些方便的 Python 库在此过程中,PyFlink 是一个很好的起点,因为它融合了 Flink 和 Python 的世界。PyFlink 于 2019 年在 Flink 1.9 中首次引入 Flink。这个首个版本仅提供有限的功能。从那时起,Flink 社区一直致力于不断增强 PyFlink。经过近四年的努力发展,已日趋成熟。目前,它包含 Flink Java API 中的大部分功能。此外,PyFlink 还专门提供了多种功能,例如 Python 用户定义函数支持等。 PyFlink 入门PyFlink 已集成到当前版本的 Ververica Platform 中。如果您想体验 PyFlink 的功能并在支持 Kuberbetes 的环境中工作,您可以免费下载社区版并在几分钟内启动 aminikubeplayground。如果您更喜欢使用普通 Flink,那么您可以从 PyPI 安装 PyFlink: $ pip install apache-flink 对于最新的 Flink 1.17,您需要高于 Python 3.6 的 Python 版本,最高可达 Python 3.10; Flink 1.16 支持 Python 3.6 到 3.9 版本。请注意,Python/PyFlink 必须可用于集群中的每个节点。最灵活的方法是在提交 PyFlink 作业时传入 Python 环境,但如果您有很多深度的 Python 依赖项,那么将 Python 环境预安装到每个集群节点可能会更简单。您也可以从源代码构建 PyFlink ,如果您维护自己的 Flink 分支或需要挑选尚未发布的提交,您可能会想要这样做。 PyFlink 的 Flink 基础知识如果您是 Flink 新手,那么有一些基本概念很好理解,其中也与 PyFlink 相关:Flink 提供两种不同的 API,过程性且相对较低级别的 DataStream API 和关系/声明性表 API。不要被它们的名字误导:这两个 API 都可以应用于流处理或批处理,并且都具有 PyFlink API。Flink 是一个分布式计算引擎。除了在处理过程中提供即时上下文的状态之外,它没有任何存储空间。假设数据从外部数据源流向(通常但不是必需的)外部数据接收器。 Flink/PyFlink 作业至少需要一个数据源。任何 Flink/PyFlink 应用程序的核心都是从源数据计算所需结果的数据转换,这可能涉及数据重塑或采样、合并和丰富、比较或建模、处理事务,或者您可能想要对无界数据流或海量数据集执行计算的无数其他方式。定义数据源和接收器任何 PyFlink 作业的第一步都是定义数据源,以及可选的数据接收器执行结果将被写入。PyFlink完全支持Table API和DataStream API。这两个 API 都提供了多种不同的方式来定义源和接收器,单个作业可以组合这两个 API,例如在 Table API 读取和 DataStream API 写入之间进行转换,或者在 DataStream API 读取和 Table API 写入之间进行转换。下面是一个典型的读写示例对于每个 API。这些示例假设 Kafka 流提供源/接收器。

使用 Table API 从 Kafka 读取:

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.create_temporary_table(
    'kafka_source',
    TableDescriptor.for_connector('kafka')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.STRING())
                .build())
        .option('properties.bootstrap.servers', 'localhost:9092')
        .option('properties.group.id', 'my-group')
        .option('topic', 'input-topic')
        .option('scan.startup.mode', 'earliest-offset')
        .option('value.format', 'json')
        .build())

table = t_env.from_path("kafka_source")

使用DataStream API从Kafka读取:

source = KafkaSource.builder() \
    .set_bootstrap_servers("localhost:9092") \
    .set_topics("input-topic") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(
        JsonRowDeserializationSchema.builder()
        .type_info(Types.ROW([Types.LONG(), Types.STRING()]))
        .build()) \
    .build()

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

使用 Table API 写入 Kafka:

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

t_env.create_temporary_table(
    'kafka_sink',
    TableDescriptor.for_connector('kafka')
        .schema(Schema.new_builder()
                .column('id', DataTypes.BIGINT())
                .column('data', DataTypes.STRING())
                .build())
        .option('properties.bootstrap.servers', 'localhost:9092')
        .option('topic', 'output-topic')
        .option('value.format', 'json')
        .build())

table.execute_insert('kafka_sink')

使用DataStream API写入Kafka:

sink = KafkaSink.builder() \
    .set_bootstrap_servers('localhost:9092') \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic("topic-name")
            .set_value_serialization_schema(
                JsonRowSerializationSchema.builder()
                .with_type_info(Types.ROW([Types.LONG(), Types.STRING()]))
                .build())
            .build()
    ) \
    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
    .build()

ds.sink_to(sink)

请参阅 Apache Table API 文档以了解有关表 API 连接器的更多详细信息,并参阅 Apache DataStream API 文档以了解有关 DataStream API 连接器的更多详细信息。 Apache API 转换文档展示了如何组合 Table API/DataStream API 读/写。有几点需要注意:Table API 示例将源/接收器属性定义为键/值对。所有 Table API 连接器都遵循该模式。要使用不同的连接器,或者定义 PyFlink 中未正式支持的新连接器,只需配置适当的键/值对。DataStream API 连接器不太常规;每个连接器都提供一堆完全不同的 API。请参阅特定连接器页面以查看提供了哪些 API。要使用 PyFlink 不支持的连接器,您需要为相应的 Java API 编写 Python 包装器,请参阅支持的连接器以获取示例。 转换 这两个 API 都支持多种转换。 DataStream API 包括以下功能: 映射:将一个元素转换为另一个平面映射:将一个元素作为输入并生成零个、一个或多个元素过滤器:对每个元素计算布尔函数并过滤掉返回 false 的元素聚合:累积多个元素窗口:将元素分组到不同的窗口中并为每个组执行计算连接:连接两个不同的元素流,允许在两个流进程之间共享状态:与平面地图类似,但是更灵活,因为它允许访问低级操作,例如广播:将一个流广播到另一个流的所有子任务 边输出:除了主流之外,还产生额外的边输出结果 流async io:PyFlink 中仍然不支持此功能。Table API 是一种关系型 API,具有类似 SQL 的风格。它包括以下功能: 投影:类似于DataStream中的map API过滤器:类似于DataStream中的过滤器 API聚合:类似于SQL GROUP BY,对分组键上的元素进行分组,并对每个组进行聚合窗口聚合:将元素分组到不同的窗口中并进行聚合对于每个窗口常规连接:与 SQL JOIN 类似,连接两个流查找(流表)连接:使用静态表连接流时间连接:使用版本化表连接流,类似于查找连接,但是,它允许在以下位置连接表时间窗口连接:连接属于同一窗口的两个流的元素间隔连接:在时间限制下连接两个流的元素topn和windowedtopn:按列排序的N个最小或最大值重复数据删除和窗口重复数据删除:删除在一组列上重复的元素模式识别:检测一个流中特定模式的元素同样需要注意一些事项:如果您需要对转换进行细粒度控制或访问低级功能,例如定时器、状态等,选择DataStream API。否则,在大多数情况下,Table API 是一个不错的选择。Table API 还支持直接执行 SQL 查询,提供对当前无法通过 API 提供的功能的访问,例如重复数据删除、模式识别、topn 等。虽然 API 会继续增长,但使用 SQL 提供了立即的解决方案。作业提交Flink 是一个分布式计算引擎,它在独立集群中执行 Flink/PyFlink 作业。Flink 作业是延迟执行的;您必须明确提交作业以供执行。这有点不同来自许多 Python 用户习惯的更具交互性/探索性的脚本风格。例如,如果您有一个由 Python 脚本 word_count.py 定义的 PyFlink 作业,您可以使用 $python word_count.py 通过 Flink 控制台在本地执行它,或者通过在Flink IDE中右键执行。 Flink 将启动一个迷你 Flink 集群,该集群在单个进程中运行并执行 PyFlink 作业。您还可以使用 Flink 的命令行工具将 PyFlink 作业提交到远程集群。下面是一个简单的示例,展示了如何将 PyFlink 作业提交到远程集群。用于执行的 Apache YARN 集群:

./bin/flink run-application -t yarn-application \
      -Djobmanager.memory.process.size=1024m \
      -Dtaskmanager.memory.process.size=1024m \
      -Dyarn.application.name=<ApplicationName> \
      -Dyarn.ship-files=/path/to/shipfiles \
      -pyarch shipfiles/venv.zip \
      -pyclientexec venv.zip/venv/bin/python3 \
      -pyexec venv.zip/venv/bin/python3 \
      -pyfs shipfiles \
      -pym word_count

有关 Flink 中作业提交的更多信息,请参阅 Apache 文档。您可以在 PyFlink 博文的 LINK 中阅读有关如何定义和运行 Python 脚本作为 PyFlink 作业的更多信息。调试和日志记录,一开始会执行 Python 用户定义的函数在作业启动期间启动的单独的 Python 进程中。这并不容易调试,用户必须对 Python 用户定义函数进行一些更改才能实现远程调试。从 Flink 1.14 开始,它支持在客户端的同一个 Python 进程中以本地方式执行 Python 用户定义函数。用户可以在任何想要调试的地方设置断点,例如PyFlink 框架代码、Python 用户定义函数等。这使得调试 PyFlink 作业变得非常容易,就像调试任何其他常用的 Python 程序一样。用户还可以在 Python 用户定义函数中使用日志记录来进行调试。应该注意的是,日志消息将出现在 TaskManager 的日志文件中,而不是 console.import 日志中

import logging

@udf(result_type=DataTypes.BIGINT())
def add(i, j):
    logging.info("i: " + i + ", j: " + j)
    return i + j

此外,它还支持Python用户定义函数中的Metrics。这对于长时间运行的程序非常有用,可用于监视特定的统计信息和配置警报。管理依赖关系对于生产作业,您几乎肯定需要引用第三方 Python 库。您可能还需要使用其 jar 文件不属于 Flink 发行版的数据连接器 – 例如 Kafka、HBase、Hive 和 Elasticsearch 的连接器未捆绑在 Flink 发行版中。因为 PyFlink 作业在分布式集群中执行,依赖关系也需要跨集群进行管理。 PyFlink 提供了多种管理依赖关系的方法。JAR 文件您可以将 JAR 文件包含在 PyFlink 作业中:# Table API
t_env.get_config().set(“pipeline.jars”, “file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar”)

# 数据流API
env.add_jars(“file:///my/jar/path/connector1.jar”, “file:///my/jar/path/connector2.jar”)

您必须包含所有传递依赖项。对于连接器,使用名称通常包含sql的fat JAR,例如flink-sql-connector-kafka-1.16.0.jar,对于Kafka连接器优先使用flink-connector-kafka-1.16.0.jar。第三方Python库添加Python PyFlink venv 虚拟环境的依赖:# Table API
t_env.add_python_file(文件路径)

# 数据流API
env.add_python_file(file_path)包含指定库的环境将在执行期间分布在集群节点上。压缩的 Python 库如果需要包含大量 Python 库,最好将它们以存档形式传递到虚拟环境环境:#表API
t_env.add_python_archive(archive_path=”/path/to/venv.zip”)
t_env.get_config().set_python_executable(“venv.zip/venv/bin/python3”)

# 数据流API
env.add_python_archive(archive_path=”/path/to/venv.zip”)
env.set_python_executable(“venv.zip/venv/bin/python3”)

命令行配置您还可以在命令行上配置依赖项,为您提供额外的灵活性:依赖项类型配置命令行选项Jar Packagepipeline.jarspipeline.classpaths–jarfilePythonlibrariespython.files- pyfsPython

虚拟环境python.archivespython.executablepython.client.executable-pyarch-pyexec-pyclientexecPython 要求python.requirements-pyreq 有关更多详细信息,请参阅 Apache PyFlink 文档中的 Python 依赖管理。 其他提示 与 Python 本身一样,PyFlink 提供了极大的灵活性和适应性。当您探索 API 时,这里有一些有用的提示。使用 Open() 进行初始化如果您的 Python 代码依赖于大量资源,

例如机器学习模型,在作业初始化期间使用 open() 加载一次:

# DataStream API
class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def map(self, value):
        return self.model.predict(value)


# Table API
class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE())

这种简单的方法会导致资源被序列化并与Python函数本身一起分发,并在每次调用时加载;使用 open() 确保它只加载一次。 WatermarksWatermarks 触发特定运算符的计算,例如事件时间启用时的窗口、模式识别等。请务必定义水印生成器,否则您的作业可能没有输出。PyFlink 为您提供了几种不同的方式来定义水印生成器:SQL DDL:请参阅Watermark 部分了解更多详细信息。Table API:请参阅此示例以了解更多详细信息。DataStream API:请参阅有关更多详细信息,请参阅此示例。如果您的水印生成器已正确定义,但水印未按预期前进,则可能您的作业没有足够的数据。如果您的测试样本较小,则在测试过程中可能会出现这种情况。尝试将作业的并行度设置为 1 或配置源空闲以解决测试阶段的问题。有关水印行为的更多信息,请参阅“及时流处理”。Flink Web UI Web UI 是丰富的信息源 – 显示作业运行了多长时间、是否有任何异常、每个算子的输入/输出元素的数量等. 如何访问取决于部署模式: 本地:Web端口随机设置。您可以在日志文件中找到它

/path/to/python-installation-directory/lib/python3.7/site-packages/pyflink/log/.local.log):INFOorg.apache.flink.runtime .dispatcher.DispatcherRestEndpoint [] –

Web 前端监听 http://localhost:55969。

Standalone:通过配置rest.port 配置,默认为 8081。

Apache YARN:从 YARN 资源管理器的 Web Ui 中,找到与PyFlink 作业,然后单击“Tracking UI”列下的链接。Kubernetes:Web UI 可能通过以下任何一项公开:ClusterIP、NodePort 和 LoadBalancer。有关更多详细信息,请参阅 Kubernetes 文档。架构和内部结构一些背景了解可能会帮助您回答以下问题:Python API 和 Java API 之间有什么区别,我应该使用哪一个?如何在 PyFlink 中使用自定义连接器?在哪里可以找到打印的日志消息Python中的用户定义函数?如何调优PyFlink作业的性能?注意,这里我们不会谈论基本的Flink概念,例如Flink的架构、状态流处理、事件时间和水印,这些在Flink官方中都有详细描述架构图PyFlink 由两个主要部分组成:作业编译:将 PyFlink 程序转换为 JobGraph 作业执行:接受 JobGraph 并将其转换为以分布式方式运行的 Flink 算子图 PyFlink 的架构作业编译将 JobGraph 视为之间的协议一个客户端和一个 Flink 集群。它包含执行作业所需的所有必要信息:表示用户想要执行的处理逻辑的转换图作业的名称和配置执行作业所需的依赖项,例如JAR文件、Python依赖等 目前JobGraph还没有多语言支持,仅支持Java。 PyFlink 通过利用 Py4J 复用 Java API 现有的作业编译栈,使运行在 Python 进程中的 Python 程序能够访问 JVM 中的 Java 对象。方法的调用就像 Java 对象驻留在 Python 进程中一样。每个 Java API 都由相应的 Python API 包装。当Python程序进行PyFlink API调用时,会在JVM中创建相应的Java对象并调用其方法。在内部,它会在JVM中创建相应的Java对象,然后在Java对象上调用相应的API。因此它复用了与 Java API 相同的作业编译堆栈。这意味着:如果您使用 PyFlink Table API 但仅执行 Java 代码,那么性能应该与 Java Table API 相同如果您想要使用一个 Java 类,例如自定义连接器,PyFlink 中尚不支持,您可以自己包装它来执行作业,大多数情况下,包装 Java API 效果很好。然而,也有一些例外情况。

我们看下面的例子:

source = KafkaSource.builder() \
    .set_bootstrap_servers("localhost:9092") \
    .set_topics("input-topic") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(
        JsonRowDeserializationSchema.builder()
        .type_info(Types.ROW([Types.LONG(), Types.STRING()]))
        .build()) \
    .build()

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds.map(lambda x: x[1]).print()
env.execute()

这里,除了formap()传递一个lambda函数ds.map(lambda x: x[1])之外,所有Python方法都可以映射到Flink的Java API。 Java 需要一个 Java MapFunction。为了在 Java 中实现此功能,我们需要序列化 ​​lambda x: x[1] 并用 Java 包装器对象包装它,该对象会生成一个 Python 进程以在作业执行期间执行它。Flink 和 PyFlink 运算符在执行期间,Flink 作业由Flink 算子系列。每个运算符接受来自上游运算符的输入,对其进行转换并向下游运算符产生输出。对于处理逻辑为Python的转换,将生成特定的Python算子:在初始化阶段,该算子将生成一个Python进程,并将元数据(即要执行的Python函数)发送到Python进程,在接收到来自上游算子的数据后,操作符会将其发送到Python进程执行。数据异步发送到Python进程;该运算符不会等到接收到一个数据项的执行结果后才发送下一个数据项。该运算符支持访问 Python 状态,但 Python 运算符运行在 JVM 中。与数据通信不同,状态访问是同步的。状态可以缓存在Python进程中以提高性能。Python运算符还支持在Python函数中使用日志记录。日志消息被发送到在 JVM 中运行的 Python 算子,因此消息最终会出现在 TaskManager 的日志文件中。请注意:Python 函数将在作业编译期间进行序列化,并在作业执行期间进行反序列化。保持资源使用较少(请参阅上面有关使用 open() 的注释),并且仅使用可序列化的实例变量。多个 Python 函数将尽可能链接起来,以避免不必要的序列化/反序列化以及通信开销。线程模式下启动 Python 函数在大多数情况下,单独的进程运行良好,但也有一些例外情况:额外的序列化/反序列化和通信开销可能是大数据的问题,例如图像处理,其中图像尺寸可能非常大、​​长字符串等。进程间通信也意味着延迟可能更高。另外Python算子通常需要缓冲数据来提高网络性能,这会增加更多的延迟。额外的进程和进程间通信给稳定性带来了挑战。为了解决这些问题,Flink 1.15引入了线程模式作为执行Python函数的选项在 JVM 中。默认情况下线程模式是禁用的;要使用它,请配置 python.execution-mode: thread。启用线程模式后,Python 函数的执行方式与进程模式下非常不同:一次处理一行数据,这会增加延迟。但是,序列化/反序列化和通信开销被消除注意,线程模式有特定的限制,这就是为什么它默认不启用:它只支持CPython解释器,因为它依赖于CPython运行时来执行Python函数。因为CPython运行时只能在进程中加载​​一次,线程模式不能很好地支持会话模式,其中多个作业可能需要使用单独的 Python 解释器,有关线程模式的更多详细信息,请参阅博客文章探索 PyFlink 中的线程模式。状态访问和检查点 Python 函数支持状态访问。本示例使用 state 来计算每组的平均值:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class Average(MapFunction):

    def __init__(self):
        self.sum_state = None
        self.cnt_state = None

    def open(self, runtime_context: RuntimeContext):
        self.sum_state = runtime_context.get_state(ValueStateDescriptor("sum", Types.INT()))
        self.cnt_state = runtime_context.get_state(ValueStateDescriptor("cnt", Types.INT()))

   def map(self, value):
        # access the state value
        sum = self.sum_state.value()
        if sum is None:
            sum = 0

        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        sum += value[1]
        cnt += 1

        # update the state
        self.sum_state.update(sum)
        self.cnt_state.update(cnt)

        return value[0], sum / cnt


env = StreamExecutionEnvironment.get_execution_environment()
env.from_collection([(1, 3), (1, 5), (1, 7), (2, 4), (2, 2)]) \
   .key_by(lambda row: row[0]) \
   .map(Average()) \
   .print()

env.execute()

这里sum_state和cnt_state都是PyFlink状态对象。状态可以在作业执行期间访问,也可以在作业故障转移后恢复:从上图可以看出:状态的真实来源是运行在 JVM 中的 Python Operator 从用户角度来看状态访问是同步的引入了以下优化提高状态访问的性能:异步写入:维护最新状态和状态修改的 LRU 缓存,并将其异步写回到 Python Operator 延迟读取:与 LRU 缓存一样,MapState 也会延迟读取,以避免不必要的状态请求性能调优一般调整 PyFlink 作业与调整 Flink Java 作业相同。一个例外是调整 Python 运算符性能。内存调整Python 运算符启动一个单独的 Python 进程来执行 Python 函数。依赖大量资源的 Python 函数可能会占用大量内存。如果为 Python 进程配置的内存过少,则会影响作业的稳定性。如果 PyFlink 作业运行在严格要求的 Kubernetes 或 Apache YARN 部署中限制内存使用,Python进程可能会因为内存需求超出限制而崩溃。你需要仔细设计你的Python代码。此外,使用以下配置选项来帮助调整 Python 内存使用情况:taskmanager.memory.process.size:TaskExecutors 的总进程内存大小。taskmanager.memory. Managed.fraction:用作托管内存的总内存部分。 (Python 进程的内存也是托管内存的一部分)taskmanager.memory.jvm-overhead.fraction:为 JVM 开销保留的总内存比例。 (未显式使用的保留内存)taskmanager.memory.driven.consumer-weights:不同类型消费者的托管内存权重。此配置可用于调整分配给 Python 进程的托管内存的比例。Bundle Size 在进程模式下,Python 运算符批量向 Python 进程发送数据。为了提高网络性能,它在发送数据之前缓冲数据。t.在检查点期间,它必须等待所有缓冲数据被处理。如果一个batch中有很多元素并且Python处理逻辑效率低下,那么检查点时间将会延长。如果您发现检查点很长甚至失败,请尝试调整包大小配置python.fn-execution.bundle.size。执行模式在数据量很大或需要减少延迟的情况下,线程模式可以提高性能。设置配置 python.execution-mode: thread 来启用它。 PyFlink 的下一步是什么 PyFlink 已经具有丰富的功能。在其发展的下一阶段,社区的重点将是:更好地支持交互式编程,例如仅检索无界表的少数前导行。提高了易用性,例如使 API 更加 Pythonic,改进文档,并添加更多示例。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 3,2023

CDH Yarn WebUI没有显示最近的FINISHED Applications

在跑spark任务时,CDH6.3.2的Yarn WebUI没有显示最近的FINISHED Applications,想查看已跑完的spark看不到,非常不方便。

CDH(Cloudera Distribution for Hadoop)是一个大数据处理平台,其中的YARN(Yet Another Resource Negotiator)是用于资源管理和任务调度的核心组件。您提到在YARN的WEBUI中,”FINISHED Applications”(已完成的应用程序)没有显示最新的记录。这可能由多种原因导致。

  1. 刷新问题: 有时候,Web界面可能由于不正确的缓存或其他问题而未能及时刷新,导致显示的信息不是最新的。
  2. 日志滚动策略: YARN的日志滚动策略可能会导致已完成的应用程序的日志被压缩或删除,从而影响了Web界面的展示。
  3. 数据清理策略: 可能在CDH配置中设置了数据清理策略,这可能会导致一些旧的已完成应用程序被自动清理,从而在Web界面中不再显示。

解决方案:

以下是一些可能的解决方案,您可以尝试进行排查和解决问题:

  1. 手动刷新界面: 首先尝试手动刷新Web界面,看看是否能够获取到最新的数据。有时候,简单的刷新就可以解决问题。
  2. 检查日志设置: 查看CDH集群中YARN的日志滚动和保留策略。您可以调整这些设置,确保日志文件得到正确保留并且不会被过早清理。
  3. 调整数据清理策略: 如果CDH集群中配置了自动数据清理策略,您可以考虑调整清理策略,以便保留更多的已完成应用程序数据。
  4. 查看日志和错误信息: 查看YARN的日志文件以及相关的错误信息,看看是否有任何与已完成应用程序信息显示相关的问题。
  5. 升级和修复: 如果问题仍然存在,可能需要考虑升级CDH版本或者应用程序,以获取更稳定的Web界面功能。
  6. 联系支持: 如果您尝试了以上方法仍无法解决问题,可以考虑联系CDH的技术支持团队寻求进一步帮助。

看了Yarn的配置,觉得都正常,重启了Yarn组件,重新跑任务
FINISHED Applications 有显示最新任务了。仔细查看原来是之前没有用倒序,默认是顺序排列,最新的没有显示在第一页。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 2,2023

spark dataframe 如何从Seq 集合select 所有的列

要从一个 Seq 集合中选择所有的列,你可以使用 Spark DataFrame 的 select 方法,并将 Seq 集合作为参数传递给该方法。假设你已经创建了一个名为 dataframe 的 Spark DataFrame,其中包含多个列,现在你想从一个 Seq 集合中选择所有的列,可以按照以下步骤进行操作:

import org.apache.spark.sql.functions.col 
// 假设你有一个 Seq 集合,其中包含你要选择的列名
val columnsToSelect = Seq("col1", "col2", "col3", ...)
// 使用 select 方法,并将 Seq 集合中的列名转换为对应的列对象
val selectedDataFrame = dataframe.select(columnsToSelect.map(col): _*)

上述代码中,col 是 Spark 的函数,用于将列名转换为列对象。通过将 Seq 集合中的列名映射为列对象,并在 select 方法中使用 : _* 来展开参数,你可以选择所有在 Seq 集合中指定的列。

现在,selectedDataFrame 中将包含来自 dataframe 中指定的所有列。

作者 east
Spark 7月 28,2023

使用日期字段来实现数仓每月算一次的功能

在数仓开发中,为了实现对某个宽表每月执行一次计算的功能,由于宽表是关联日期维度表,有字段 is_end_month 可以判断是否是月末。

为了使调度任务简单,可以每天执行一次,判断当天不是月末是不执行具体计算任务,只有是月末时才执行。

// 假设你已经创建了SparkSession对象,命名为spark
import org.apache.spark.sql.functions.col

// 假设你的DataFrame名为df,包含is_end_month字段
val df = spark.read
  .format("csv")
  .option("header", "true")
  .load("path_to_your_data.csv") // 替换为你的数据路径

// 获取is_end_month字段的第一条数据
val firstRow = df.select(col("is_end_month")).first()

// 从Row对象中获取is_end_month的值,假设该字段为整数类型
val isEndMonthValue = firstRow.getAs[Int]("is_end_month")

// 判断is_end_month的值并执行相应操作
if (isEndMonthValue == 0) {
    spark.stop() // 退出Spark
} else if (isEndMonthValue == 1) {
    // 继续执行其他代码
    // ...
}
注意替换代码中的数据路径为你实际的数据文件路径,同时根据实际的数据类型对getAs方法中的类型进行调整。此代码将首先从DataFrame中选择is_end_month字段的第一行数据,然后将该字段的值提取为整数并进行判断,如果值为0,则停止Spark并退出,否则继续执行其他代码。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 7月 27,2023

spark任务运行出错:Container [pid=6978,containerID=container_1688954893175_0080_04_000001] is running 11698176B beyond the ‘PHYSICAL’ memory limit. Current usage: 1.5 GB of 1.5 GB physical memory used; 3.3 GB of 3.1 GB virtual memory used. Killing container.

运行spark任务报错,查看错误日志如下:
Failing this attempt.Diagnostics: [2023-07-25 09:57:03.987]Container [pid=6978,containerID=container_1688954893175_0080_04_000001] is running 11698176B beyond the ‘PHYSICAL’ memory limit. Current usage: 1.5 GB of 1.5 GB physical memory used; 3.3 GB of 3.1 GB virtual memory used. Killing container. Dump of the process-tree for container_1688954893175_0080_04_000001 :

根据错误日志,可以看出 Spark 任务执行过程中出现了内存问题。错误信息显示容器 container_1688954893175_0080_04_000001 正在超出 ‘PHYSICAL’ 内存限制。当前使用情况显示已使用了 1.5 GB 的物理内存(PHYSICAL memory),而该容器的物理内存限制是 1.5 GB,虚拟内存(virtual memory)使用了 3.3 GB,虚拟内存限制是 3.1 GB。由于超过了物理内存限制,Spark 正在终止该容器。

这种情况通常出现在 Spark 任务运行时,需要更多的内存资源,但配置的内存资源不足以满足任务的需求。这可能由以下原因导致:

  1. 数据量较大:处理的数据量超过了所分配的内存资源,导致内存不足。
  2. 计算复杂度高:Spark 任务涉及复杂的计算逻辑或涉及大量的数据转换操作,导致内存需求增加。
  3. 资源配置不合理:Spark 配置中分配给执行器(Executor)的内存资源设置过小。

解决方案:

  1. 增加物理内存:如果可行,可以在执行 Spark 任务的机器上增加物理内存,这样有更多的内存资源供 Spark 使用。
  2. 优化数据处理:考虑对数据处理逻辑进行优化,减少不必要的计算和数据转换,以降低内存需求。
  3. 调整 Spark 配置:在 Spark 任务提交时,通过 --conf 参数来调整 Executor 的内存分配。可以尝试增加 spark.executor.memory 参数的值来提高每个 Executor 的内存,如果有多个 Executor,可以适当增加 Executor 的数量。
  4. 使用分区技术:合理地对数据进行分区,以减少单个任务需要处理的数据量,从而减少内存压力。
  5. 检查资源使用情况:查看其他正在运行的任务以及系统的资源使用情况,确保没有其他任务占用了过多的资源,导致 Spark 任务无法获取足够的资源。
  6. 调整虚拟内存限制:如果虚拟内存限制过小,可以尝试增加虚拟内存限制,但这不是主要解决方案,因为虚拟内存只是在物理内存不足时充当备用。

根据具体情况选择合适的解决方案,并确保 Spark 任务有足够的内存资源来执行。如果问题持续存在,可能需要进一步分析任务的执行计划和资源使用情况,以找出更深层次的原因并进行针对性的优化。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
未分类 7月 27,2023

CDH重启cloudera-scm-agent报错:No socket could be created — ((‘127.0.0.1’, 9001): [Errno 98] Address already in use)

CDH集群重启 cloudera-scm-agent ,发现重启失败,查看日志如下:


[21/Jul/2023 19:49:10 +0000] 11006 HTTPServer Thread-2 _cplogging ERROR [21/Jul/2023:19:49:10] ENGINE Error in HTTP server: shutting down Traceback (most recent call last): File “/opt/cloudera/cm-agent/lib/python2.7/site-packages/cherrypy/process/servers.py”, line 225, in _start_http_thread self.httpserver.start() File “/opt/cloudera/cm-agent/lib/python2.7/site-packages/cheroot/server.py”, line 1326, in start raise socket.error(msg) error: No socket could be created — ((‘127.0.0.1’, 9001): [Errno 98] Address already in use)

错误表明 cloudera-scm-agent 在启动时无法创建HTTP服务器的套接字,原因是地址 127.0.0.1:9001 已经被占用了。这通常意味着另一个进程已经在监听相同的IP地址和端口号,导致 cloudera-scm-agent 无法绑定到该地址。

解决方案:

要解决这个问题,可以采取以下步骤:

  1. 查找占用端口的进程: 使用以下命令查找占用9001端口的进程:bash复制代码sudo netstat -tlnp | grep 9001 这将显示占用9001端口的进程的相关信息,包括进程ID (PID) 和进程名。
  2. 停止占用端口的进程: 如果上一步找到了占用9001端口的进程,可以尝试停止该进程,释放端口。使用以下命令杀死该进程(请注意替换 <PID> 为实际的进程ID):bash复制代码sudo kill <PID> 或者,如果是已知的服务进程,可以使用该服务的控制命令来停止服务。
  3. 修改端口号: 如果停止占用端口的进程不可行或不适合你的情况,可以考虑修改 cloudera-scm-agent 的配置,将其绑定到一个空闲的端口上。你可以编辑配置文件中的端口号,然后重新启动 cloudera-scm-agent。
  4. 确保正确配置: 确保 cloudera-scm-agent 的配置正确,并没有指定错误的端口号。
  5. 防火墙和安全组规则: 检查服务器的防火墙设置和网络安全组规则,确保端口号没有被阻止。
  6. 检查其他应用程序: 确保没有其他应用程序意外地使用了9001端口。
  7. 重新启动服务: 尝试重新启动 cloudera-scm-agent 服务。



作者 east
运维 7月 26,2023

linux提示 netstat: command not found要怎样安装

如果你在运行 netstat 命令时出现 “netstat: command not found” 的错误提示,这说明你的系统可能没有安装 netstat 工具。在一些最新的Linux发行版中,netstat 命令已经被弃用,取而代之的是其他工具,例如 ss (socket statistics)。

如果你使用的是较新的Linux发行版,可以尝试使用 ss 命令来代替 netstat,它提供了类似的功能。以下是使用 ss 命令来查看监听端口的方法:

bash复制代码sudo ss -tlnp | grep 9001

如果你仍然需要使用传统的 netstat 命令,你可以通过安装net-tools软件包来获得它。根据你的Linux发行版,可以使用以下命令来安装net-tools:

  • Ubuntu / Debian:
bash复制代码sudo apt-get update
sudo apt-get install net-tools
  • CentOS / RHEL:
bash复制代码sudo yum install net-tools
  • Fedora:
bash复制代码sudo dnf install net-tools

请注意,net-tools包含了很多网络工具,其中包括 netstat 命令。安装后,你应该能够正常使用 netstat 命令了。然而,如果你的Linux发行版已经不再支持 netstat 命令,建议使用 ss 命令来查看监听端口和网络连接的信息。

作者 east
bug清单 7月 26,2023

Datart页面报错:Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: GC overhead limit exceeded

错误信息 “Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: GC overhead limit exceeded” 表明datart应用程序在处理请求时发生了Java虚拟机(JVM)的内存溢出错误。具体来说,JVM无法及时回收垃圾对象,导致垃圾回收的开销超过了允许的阈值,从而触发了”GC overhead limit exceeded”错误。

可能的原因:

  1. 内存不足:datart应用程序所在的JVM分配的内存不足,导致垃圾回收无法正常进行,从而引发该错误。
  2. 内存泄漏:datart应用程序可能存在内存泄漏问题,导致大量对象无法被正确回收,最终耗尽了可用内存。
  3. 处理大数据量:datart应用程序在处理大规模数据或复杂查询时,可能导致内存占用过高,超过JVM限制。

解决方案:

针对上述可能的原因,可以采取以下步骤来解决问题:

  1. 增加JVM内存限制:增加datart应用程序所在JVM的内存限制,使其能够处理更大的数据量。这可以通过修改JVM启动参数中的-Xmx和-Xms选项来实现。例如,将-Xmx选项设置为较大的值,比如 “-Xmx4g” 表示最大可用内存为4GB。
  2. 检查内存泄漏:进行内存泄漏分析,查找可能导致内存泄漏的代码,并修复问题。可以使用一些Java内存分析工具(如VisualVM、MAT等)来辅助查找内存泄漏。
  3. 优化查询:对datart应用程序中的复杂查询进行优化,尽量减少内存占用。可以通过索引优化、查询优化等方式来改善查询性能。
  4. 分页查询:如果datart应用程序处理大数据量的查询,可以考虑引入分页查询,避免一次性加载过多数据到内存中。
  5. 升级应用程序:检查datart应用程序是否有已知的内存相关问题,并考虑升级到修复了这些问题的版本。
  6. 监控和警报:设置合适的监控和警报机制,当内存使用超过阈值时及时发出警报,以便及早发现和解决问题。
  7. 分析日志:查看datart应用程序的日志,特别是错误日志,以获取更多有关错误发生时的上下文信息,有助于进一步定位问题。
作者 east
bug清单, Hadoop 7月 25,2023

Transport-level exception trying to monitor health of NameNode at xxx: java.net.SocketTimeoutException: 45000 millis timeout while waiting for the channel to be ready for read

表明CDH 6.3.2中的某个组件(可能是其他节点的Datanode或NodeManager)在尝试监视位于CDH节点上的NameNode时,发生了Socket超时异常。这意味着在连接到NameNode时花费的时间超过了45秒,导致连接失败。

可能的原因:

  1. 网络问题:有可能是网络连接不稳定或者网络延迟导致连接超时。
  2. 资源不足:CDH的NameNode可能资源不足,导致响应变慢,从而引发超时异常。
  3. 防火墙或安全设置:防火墙或其他安全设置可能限制了节点之间的通信,导致连接超时。

解决方案:

针对上述可能的原因,可以采取以下步骤逐一排查和解决问题:

  1. 检查网络连接:确保所有节点之间的网络连接稳定,并且没有阻止节点之间通信的防火墙或其他网络限制。
  2. 检查资源:确认CDH上的NameNode是否具有足够的资源(CPU、内存、磁盘空间等)来处理请求。如果资源不足,可以考虑增加资源或优化配置。
  3. 检查防火墙和安全设置:确保防火墙或其他安全设置不会阻止节点之间的通信。可以检查防火墙规则和CDH安全配置。
  4. 检查NameNode日志:查看CDH上NameNode的日志,了解是否存在其他异常或错误信息,这可能有助于进一步定位问题。
  5. 调整超时时间:可以尝试增加超时时间,从而允许更长的连接时间。但这并不是根本解决问题的方法,只是一个临时调整。
  6. 更新或升级:如果发现该问题是由于已知的CDH或Hadoop bug引起的,可以尝试升级CDH版本或应用相关的补丁和更新。
  7. 联系支持:如果上述步骤无法解决问题,可以联系CDH或Hadoop的支持团队寻求进一步的帮助和调查。
作者 east
bug清单, Hadoop 7月 25,2023

CDH节点报“Role not started due to unhealthy host”,重启不了角色

CDH集群的 Datanode 挂掉了,要重新启动报错“Role not started due to unhealthy host”。查了一下,这表示主机处于不健康状态 。这个错误大概有下面的原因:

可能的原因:

  1. 主机故障:Datanode所在的主机可能存在硬件故障或者网络问题,导致主机处于不可用状态,从而Datanode无法正常启动。
  2. 资源不足:主机资源(例如CPU、内存、磁盘空间)不足,导致Datanode启动失败。
  3. 防火墙或安全设置:防火墙或其他安全设置可能会阻止Datanode与其他节点进行通信,导致启动失败。
  4. CDH组件问题:CDH组件可能出现问题,导致Datanode无法启动。

解决方案:

针对上述可能的原因,可以采取以下步骤逐一排查和解决问题:

  1. 检查主机状态:确保Datanode所在的主机处于健康状态,没有硬件故障或网络问题。可以通过运行系统命令或者在CDH管理界面查看主机状态。
  2. 检查资源:确认主机具有足够的资源(CPU、内存、磁盘空间等)来运行Datanode。如果资源不足,可以考虑升级主机或释放资源。
  3. 检查防火墙和安全设置:确保防火墙或其他安全设置不会阻止Datanode与其他节点进行通信。可以检查防火墙规则和CDH安全设置。
  4. 检查CDH组件状态:检查CDH的其他组件是否正常运行,特别是与Datanode相关的组件(如HDFS)。如果其他组件也出现问题,可能是由于CDH整体环境的故障。
  5. 查看日志:检查Datanode日志,通常在CDH的日志目录下,查看是否有相关错误信息提供更多线索。
  6. 重启服务:尝试重启Datanode服务,以便它重新连接到集群并解决任何临时问题。
  7. 联系支持:如果上述步骤无法解决问题,可以联系CDH或Hadoop支持团队寻求帮助。

后来还发现奇怪现象,jps查到datanode、namenode进程想要kill掉,一直kill不掉。经过排查,发现是服务器多块硬盘中其中一块坏了,导致CDH一直报
“Role not started due to unhealthy host ”。

作者 east

上一 1 … 6 7 8 … 19 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.