gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

月度归档8月 2023

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  2023   /  
  • 8月
  • ( 页面3 )
Flink 8月 16,2023

Flink SQL:重复数据删除

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了额外的工具,用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器,以及操作生成的长时间运行的查询。我们已经看到 Flink SQL 有很多用例,我们很高兴看到什么你将用它来建造。在这篇博文中,我们将解释什么是重复数据删除,并展示如何使用 Flink SQL 来实现这一目标。

什么是流处理中的重复数据删除?重复数据删除是一个从数据集中删除重复数据的过程。这样做通常是为了提高数据的质量。在流处理中,重复数据删除是一个非常重要的过程,因为它可以帮助提高系统的性能。重复数据删除的工作原理是识别并删除数据流中的重复记录。这通常是通过将流中的数据与参考数据集进行比较来完成的。当发现重复记录时,会将其从流中删除。 重复数据删除的好处 对数据进行重复数据删除有很多好处,

包括:

提高性能 – 通过删除重复数据,可以减少需要处理的数据量,从而可以提高性能性能降低存储要求 – 重复数据占用不必要的空间,因此删除它可以释放宝贵的存储空间更高的准确性 – 重复数据可能导致结果不准确,因此删除它可以提高数据分析的准确性提高效率 – 重复数据删除可以使数据处理更加高效通过减少需要处理的数据量来提高效率如何使用 Flink SQL 删除重复数据重复事件可能以多种方式最终出现在数据源中,从人为错误到应用程序错误。无论来源如何,不干净的数据都会对结果的质量(和正确性)产生真正的影响。在某些情况下,数据生产者会为流数据更改生成具有相同 ID 的记录。这些记录可能包括插入、更新和删除记录,并且在与其他流聚合或联接之前,它们可能需要作为管道中业务逻辑的一部分进行重复数据删除。在这种情况下,重复数据删除的目的是确保只处理唯一的记录,并避免重复数据可能引起的任何问题。假设您的订单系统偶尔会生成具有相同order_id的重复事件,但您只想保留用于下游处理的最新事件。第一步,您可以结合使用 COUNT 函数和 HAVING 子句来检查是否以及哪些订单具有多个事件,然后使用 ROW_NUMBER() 过滤掉这些事件。实际上,重复数据删除是 Top-N 聚合的一种特殊情况,其中 N 为 1 (rownum = 1),排序列是事件的处理时间或事件时间。在下面的示例查询中,源表顺序由内置的indatagen连接器,它在内存中不断生成行。

CREATE TABLE orders (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);

--Check for duplicates in the `orders` table
SELECT id AS order_id,
       COUNT(*) AS order_cnt
FROM orders o
GROUP BY id
HAVING COUNT(*) > 1;

--Use deduplication to keep only the latest record for each `order_id`
SELECT
  order_id,
  order_time
FROM (
  SELECT id AS order_id,
         order_time,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
  FROM orders
     )
WHERE rownum = 1;

摘要在本文中,您了解了数据的重复数据删除。您还了解了如何使用 Flink SQL 为此类问题编写查询。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Java 8月 16,2023

当月最后一天的Java工具类

需求要求:
根据年月(YYYY-MM格式),返回该月最后一天(YYYY-MM-dd),例如输入 2023-06,返回2023-06-31

以下是用于根据年月返回该月最后一天的Java工具类实现:

java
import java.time.LocalDate;
import java.time.YearMonth;
import java.time.format.DateTimeFormatter;

public class LastDayOfMonthFinder {

    public static String getLastDayOfMonth(String yearMonth) {
        try {
            YearMonth ym = YearMonth.parse(yearMonth, DateTimeFormatter.ofPattern("yyyy-MM"));
            LocalDate lastDayOfMonth = ym.atEndOfMonth();
            return lastDayOfMonth.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) {
        String yearMonth = "2023-06";
        String lastDay = getLastDayOfMonth(yearMonth);
        if (lastDay != null) {
            System.out.println("Last day of " + yearMonth + ": " + lastDay);
        } else {
            System.out.println("Invalid input or calculation error.");
        }
    }
}
在这个工具类中,我们使用YearMonth类来解析年月字符串,并通过atEndOfMonth()方法获取该月的最后一天。然后,我们使用DateTimeFormatter将最后一天的日期格式化为"yyyy-MM-dd"格式。请注意,在getLastDayOfMonth方法中,我捕获了通用的Exception来处理可能的解析和计算错误。你可以根据需要进一步优化错误处理部分。最后,示例的main方法演示了如何使用这个工具类来获取指定年月的最后一天。
作者 east
Flink 8月 15,2023

Flink 的测试工具揭秘

使用 Flink 的测试工具测试用户定义函数 (UDF)

使用 Apache Flink 时,开发人员在测试利用状态和计时器的用户定义函数 (UDF) 时经常面临挑战。在本文中,我们将回答一个问题“如何使用 Flink 的测试工具来测试用户定义函数 (UDF)”。

使用 Flink 的测试工具测试用户定义函数 (UDF)

测试使用 Flink 状态和计时器的用户定义函数 (UDF) 可能会很复杂,尤其是在使用诸如 KeyedProcessFunction 之类的函数时。 Flink 包含一组专门为简化此任务而设计的测试工具。这些测试工具是在 Flink 1.15 中引入的,并且被认为是实验性的。

测试工具的配置

要使用 Flink 的测试工具,您需要添加一些依赖项到您的项目。要测试 DataStream 作业,您可以在 Maven 项目的 pom.xml 的依赖项块中添加以下内容:

XML
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
</dependency>
Use code with caution. Learn more

要测试 Table/SQL 作业,除了前面提到的 flink-test-utils 之外,您还可以在 Maven 项目的 pom.xml 的依赖项块中添加以下内容:

XML
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-test-utils</artifactId>
    <version>1.17.0</version>
    <scope>test</scope>
</dependency>
Use code with caution. Learn more

通过运算符测试用户函数

通过运算符测试用户函数可能是一种有用的方法,因为它允许您测试函数的功能。需要注意的是,Flink 只为算子提供测试工具,因此您需要手动实例化适当的算子才能使用这些测试工具。这意味着您需要创建运算符的实例并使用必要的输入和参数对其进行配置。完成此操作后,您可以使用测试工具来验证用户功能的正确性。值得注意的是,这种方法可能比使用其他测试方法更复杂,因为它需要您手动设置操作员并使用适当的输入对其进行配置。

Java
@Test
public void testMyProcessFunction() throws Exception {
    KeyedProcessOperator<String, String, String> operator =
        new KeyedProcessOperator<>(new MyKeyedProcessFunction());

    // 设置测试工具
    // 推送数据
    // 验证结果
}
 

测试计时器行为

在 Flink 中测试用户函数时,必须测试其行为的各个方面。测试的一件常见事情是计时器的创建和触发。您可以使用 TestHarness 通过操作员发送测试数据并验证是否创建了计时器。然后,您可以推进水印并验证计时器是否已触发。除了测试计时器之外,您可能还想测试处理元素是否会创建状态并验证此处理的结果。假设我们希望在 20 毫秒内为我们的测试目标触发计时器。下面的示例代码演示了如何使用测试工具对其进行测试。

Java
@Test
public void testTimelyOperator() throws Exception {
    // 设置初始条件
    testHarness.processWatermark(0L);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // 发送一些数据
    testHarness.processElement(3L, 100L);

    // 验证定时器
    assertThat(testHarness.numEventTimeTimers(), is(1));

    // 将时间提前到 20 毫秒以触发计时器。
    testHarness.processWatermark(20);
    assertThat(testHarness.numEventTimeTimers(), is(0));

    // 验证结果
    assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
    assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0));

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink, mysql 8月 15,2023

操作指南:使用 Flink CDC 同步 MySQL 分库分表

线事务处理(OLTP)系统中,为了解决单表数据量大的问题,通常采用分库分表的方法对单张大表进行拆分,以提高系统的吞吐量。但为了方便数据分析,在同步到数据仓库或数据湖时,一般需要将分库分表的数据合并成一张大表。本教程将向您展示如何使用 Flink CDC 为上述场景构建实时数据湖。本文中的示例将全部基于 Docker 并使用 Flink SQL。无需一行 Java/Scala 代码或安装 IDE。本指南的全部内容包含 docker-compose 文件。整个过程将通过从 MySQL 同步数据到 Iceberg 来展示,如下图所示。

步骤1:创建 docker-compose.yml 文件 创建一个 Docker Compose 文件(docker-compose.yml),内容如下:

Version: ‘2.1’

Services:

sql-client: user: flink

image: yuxialuo/flink-sql-client:1.13.2.v1

depends_on:

– jobmanager

– mysql

environment: FLINK_JOBMANAGER_HOST: jobmanager MYSQL_HOST: mysql volumes: – shared tmpfs:/tmp/iceberg jobmanager: user: flink image: flink:1.13.2-scala_2.11 ports: – “8081:8081” command: jobmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: – shared tmpfs:/tmp/iceberg taskmanager: user: flink image: flink:1.13.2-scala_2.11 depends_on: – jobmanager command: taskmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 volumes: – shared tmpfs:/tmp/iceberg mysql: image: debezium/example-mysql:1.1 ports: – “3306:3306” environment: – MYSQL_ROOT_PASSWORD=123456 – MYSQL_USER=mysql用户 – MYSQL_PASSWORD=mysqlpw volumes: – shared tmpfs driver-options: type: “tmpfs” device: “tmpfs”

这个 docker-compose 文件中的容器包括:

  • SQL-Client:Flink SQL Client,用于提交 SQL 查询和查看 SQL 执行结果
  • Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用于执行 Flink SQL
  • MySQL:作为数据源分库分表,存储用户表

注意:如果您想在自己的 Flink 环境中运行本指南,您需要下载下面列出的包并将其放在 Flink 目录的 lib 目录中,即 FLINK_HOME/lib/。

  • flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar
  • flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
  • iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

步骤 2:准备 MySQL 数据库中的数据 进入 MySQL 容器,执行以下命令:

shell复制代码
docker-compose exec mysql mysql -uroot -p123456

然后在 MySQL 中创建数据、表,并填充数据:

sql复制代码
CREATE DATABASE db_1;
USE db_1;
CREATE TABLE user_1 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234","user_110@foo.com");

CREATE TABLE user_2 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (120,"user_120","上海","123567891234","user_120@foo.com");

CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234", NULL);

CREATE TABLE user_2 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","上海","123567891234","user_220@foo.com");

步骤3:使用 Flink DDL 和 Flink SQL CLI 创建表 进入 Flink SQL CLI 容器,执行以下命令:

shell复制代码
docker-compose exec sql-client ./sql-client

在 Flink SQL CLI 中,执行以下命令:

sql复制代码
-- Flink SQL
SET execution.checkpointing.interval = 3s;

-- 创建源表 user_source 来捕获 MySQL 中所有数据库和表的数据并使用正则表达式来匹配这些数据库和表的配置项中使用的表。
-- 而且表还定义了一个元数据列来区分数据来自哪个数据库和表。
CREATE TABLE user_source(
  database_name STRING METADATA VIRTUAL,
  table_name STRING METADATA VIRTUAL,
  `id` DECIMAL(20, 0) NOT NULL,
  name STRING,
  address STRING,
  phone STRING,
  email STRING,
  PRIMARY KEY (`id`)
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'db_[0-9]+',
  'table-name' = 'user_[0-9]+'
);

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 15,2023

如何测试您的 Flink SQL 应用程序

测试 Apache Flink SQL 代码

测试 Apache Flink SQL 代码是确保应用程序顺利运行并提供预期结果的关键步骤。 Flink SQL 应用程序用于广泛的数据处理任务,从复杂的分析到简单的 SQL 作业。全面的测试流程可以帮助您在开发过程的早期发现潜在问题,并确保您的应用程序按预期工作。

这篇文章将介绍 Flink SQL 应用程序的几种测试可能性。

手动测试和自动测试

使用 SQL 客户端是快速测试的有效方法。并轻松测试您的 Flink SQL 代码。 SQL 客户端旨在提供一个交互式环境,您可以在其中运行 SQL 查询并查看结果。这使得您可以轻松测试代码并快速进行更改。但是,您大多只能使用 SQL 客户端执行手动测试。为了进行更全面的测试,您应该使用自动化测试工具。

自动化测试工具可以提供一种使用多个数据集和各种场景测试代码的方法。这可以帮助识别手动测试中可能无法立即看到的问题。自动化测试包括单元测试和集成测试。单元测试用于测试应用程序的各个组件,而集成测试用于测试不同组件之间的集成。它们都有助于识别与数据处理相关的问题,例如不正确的 SQL 语法或不正确的数据转换。

通过单元和集成测试进行测试

此测试方法涉及使用单元和集成测试来验证代码的行为。它很容易实现自动化,这使得它成为测试较小单元(例如查询或函数的一部分)的便捷选择。此外,它是高度可定制的,允许您使用特定的输入数据定义单独的测试场景,而无需更改日志流并控制行时间属性。这种方法的另一个好处是,它可以在开发用户定义的函数时实现快速周转时间。这可以帮助您在开发过程的早期识别类型推断问题和其他问题。但是,这种测试方法涉及使用非 SQL 代码,这可能需要一些 Java 或 Scala 知识才能有效地使用它。这可能包括理解这些编程语言的语法和基本概念,以及与它们一起使用的任何专用库或框架。虽然这可能需要一些前期学习投资,但对于那些熟悉这些语言并希望自动化某些流程或任务的人来说,这可能是一个很好的方法。

Flink 1.12 或更低版本中的单元和集成测试

如果您需要提供输入数据出于测试目的,一种选择是使用 TableEnvironment.fromValues() 方法。但是,需要注意的是,此方法仅支持插入,不允许更改日志流。此外,它不提供对数据顺序的任何控制,这在某些情况下可能是必要的。此外,没有可用的行时间或水印选项。另一种方法是将值连接器与 TestValuesTableFactory 结合使用。这允许您使用表 DDL 中可用的全部选项来定义输入表。这包括指定行时间和水印属性,以及表 DDL 的任何其他特性。但是,需要注意的是,这是一个非公共 API,这意味着它不适合在生产环境中使用,并且可能不会得到供应商的完全支持。

Flink 1.13 或更高版本中的单元和集成测试

如果您需要在 Flink 1.13(或更高版本)中提供输入数据用于测试目的,一种选择是使用 TableEnvironment.fromValues() 方法。不过,需要注意的是,该方法与之前版本的 Flink(1.12 或更早版本)具有相同的限制。另一种方法是使用 StreamTableEnvironment.fromChangelogStream() 方法,该方法允许您将输入定义为 aDataStreamorDataStream 具有 RowKind 属性。此方法提供自动类型转换并在操作之间保留事件时间和水印。此外,它允许您像在表 DDL 中一样定义自定义架构定义,从而提供更大的灵活性和对输入数据的控制。总的来说,在 Flink 1.13(或更高版本)中,使用 fromChangelogStream() 方法可以是一种更强大、更通用的方法来提供用于测试的输入数据。

在表中收集查询或操作的结果

测试 Apache Flink SQL 代码的注意事项

以下是测试 Apache Flink SQL 代码时需要注意的一些事情:

  • 使用正确的测试工具。 有许多不同的测试工具可用于测试 Apache Flink SQL 代码。选择正确的工具取决于您的特定需求。例如,如果您需要测试复杂的应用程序,则需要使用功能更强大的测试工具。
  • 编写可重复的测试。 您的测试应该是可重复的,这意味着您应该能够在不同的环境中运行它们并获得相同的结果。这将有助于您发现代码中的错误并验证您的更改。
  • 编写可维护的测试。 您的测试应该是可维护的,这意味着您应该能够轻松地添加新测试或更改现有的测试。这将有助于您随着代码的更改保持您的测试库的最新状态。
  • 编写可读的测试。 您的测试应该是可读的,这意味着您应该能够轻松地理解它们的目的和输出。这将有助于您在需要时更轻松地调试您的测试。
  • 编写全面的测试。 您的测试应该全面,这意味着您应该测试代码的所有方面。这将有助于您确保您的代码是正确的和可靠的。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 15,2023

优化spark dataframe left join速度

如果 Spark DataFrame 的左连接速度很慢,你可以考虑以下几种方法来提高速度:

  1. 分区优化: 确保参与左连接的两个表都进行了适当的分区,以便能够并行处理数据。可以使用 repartition 或者 partitionBy 方法来调整分区,将数据均匀地分布到集群的节点上。
  2. 广播小表: 如果左连接中的一个表很小,可以将其广播到每个节点上,以减少网络传输开销。使用 broadcast 方法可以将小表广播,以优化连接性能。
  3. 数据倾斜处理: 如果左连接的键存在数据倾斜,即某些键的数据量远大于其他键,这可能导致性能下降。可以考虑使用一些技术来处理数据倾斜,如改变连接键、使用随机前缀、或者使用coalesce 等方法重新平衡数据。
  4. 适当选择算法: Spark 会根据连接表的大小和分区情况选择不同的连接算法。可以尝试在连接操作前使用 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 来强制 Spark 选择适当的连接算法。
  5. 调整资源配置: 确保集群的资源配置足够,以避免资源瓶颈。可以调整 Spark 配置参数,如内存、CPU 核心数等,以适应连接操作的需求。
  6. 内存优化: 使用合适的内存管理策略,确保内存不会出现溢出或者过度分配。可以通过合理设置 Spark 内存分配参数来进行优化。
  7. 索引优化: 如果连接的键有索引,可以优化连接速度。但注意,Spark 并不像传统数据库系统那样直接支持索引优化,因此需要结合其他优化方法来使用。
  8. 避免多次连接: 在同一流程中,避免多次连接相同的表。如果有多个连接操作,考虑将数据缓存起来或者将连接操作分阶段执行。
  9. 持久化中间结果: 如果有复杂的计算逻辑,可以在中间步骤持久化数据,以避免重复计算。
  10. 监控调优: 使用 Spark UI 和相关日志来监控连接操作的执行计划和性能。根据监控结果进行调优。

最终的优化策略可能会因数据量、数据分布、集群配置等因素而有所不同。建议根据实际情况进行测试和调整,以找到最适合你数据和环境的优化方法。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 13,2023

Flink SQL:查询、窗口和时间 – 第 2 部分

如何创建时间窗口

在上一篇文章中,我们讨论了创建时间窗口的必要性以及如何选择时间窗口的长度。在本文中,我们将更深入地了解如何创建时间窗口。

时间窗口 是收集数据的一段时间段。时间窗口的长度将取决于所收集的数据类型和研究的目的。例如,如果您有兴趣研究新产品对消费者行为的影响,则需要收集与有兴趣研究新营销活动对销售影响的消费者行为相比更长的时间段的数据。

在选择适合您案例的时间窗口时,您应该考虑以下属性:

  • 数据的频率:如果每天、每周、每月等收集数据。
  • 数据的季节性:如果数据受季节性影响(例如,销售数据通常在假期期间更高),您需要在选择时间窗口时考虑这一点。
  • 数据的稳定性:如果数据不稳定(例如,股票价格),您需要在选择时间窗口时考虑这一点。
  • 时间窗口的长度:时间窗口的长度将取决于所收集的数据类型和研究的目的。

一旦您考虑了数据的属性,您就可以为您的案例选择最合适的时间窗口。

如何创建时间窗口

有几种不同方法可以创建时间窗口。最常见的方法是使用滚动窗口、滑动窗口 或固定窗口。

  • 滚动窗口:滚动窗口是一组连续的时间段。每个时间段都会随着新数据的到来而向前移动。
  • 滑动窗口:滑动窗口是一组重叠的时间段。每个时间段都会随着新数据的到来而向前移动,但不会覆盖之前的时间段。
  • 固定窗口:固定窗口是一组大小相同的时间段。这些窗口不会随着新数据的到来而移动。

哪种方法最适合您将取决于您的特定需求。例如,如果您想跟踪一段时间内的趋势,则滚动窗口可能是最好的选择。如果您想跟踪一段时间内的变化,则滑动窗口可能是最好的选择。如果您想跟踪一段时间内的固定间隔的数据,则固定窗口可能是最好的选择。

如何使用时间窗口

一旦您创建了时间窗口,您就可以使用它来聚合数据或运行查询。例如,您可以使用时间窗口来计算一段时间内的平均值、最大值或最小值。您还可以使用时间窗口来运行查询以查找特定时间段内的数据。

使用时间窗口的示例

以下是使用时间窗口的示例:

  • 计算一段时间内的平均值:您可以使用时间窗口来计算一段时间内的平均值。例如,您可以使用滚动窗口来计算过去 1 分钟内的平均温度。
  • 查找特定时间段内的数据:您可以使用时间窗口来查找特定时间段内的数据。例如,您可以使用滑动窗口来查找过去 1 小时内发生的所有交易。

结论

时间窗口是用于聚合数据或运行查询的强大工具。了解如何创建和使用时间窗口可以帮助您更好地分析您的数据。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 13,2023

Flink-Kafka连接器的流模式

介绍

这篇博文将介绍 Flink Table API 中提供的 Kafka 连接器。读完这篇博文,您将更好地了解哪种连接器更适合特定的应用程序。

Flink DataStream API 中的 Kafka 连接器

Flink DataStream API 提供了一个 Kafka 连接器,它工作在附加模式下,可以被您用 Scala/Java API 编写的 Flink 程序使用。除了这个,Flink 的 Table API 还提供了两种 Kafka 连接器:

  • Kafka-unboundedsource,对sink使用“append 模式”
  • Upsert Kafka-unboundedsource,对sink使用“upsert 模式”

这篇博文将专注于用于 Table API 的 Kafka 连接器。我还将尝试回答何时使用 Kafka 连接器(追加)或选择 Upsert Kafka 连接器的问题。

简单的 Kafka 连接器 – 追加模式

以下示例是将数据从内存数据流复制到输出 Kafka 主题。在生产场景中,输入数据可以丰富或聚合,但我们将保持这个示例简单,以展示 Flink 在使用第一个 Kafka 连接器时的行为。

首先,创建一个表,其中包含订单作为流数据的来源,这些数据是由数据生成连接器提供的:

CREATE TABLE `orders` (
`id` INT,
`bid` DOUBLE,
`order_time` AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND() * -3 + 5) * -1 AS INTEGER), CURRENT_TIMESTAMP)
)
WITH (
'connector' = 'datagen',
'fields.id.kind' = 'random',
'fields.id.max' = '100',
'fields.id.min' = '1',
'每秒行数' = '100'
);

然后,使用 Kafka 连接器创建一个输出表作为接收器来存储输入数据:

CREATE TABLE `orders_sink_append` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3)
)
WITH (
'connector' = 'kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_sink_append',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'order-receiver-append',
'值.格式' = 'csv'
);

要运行本文中的所有 Flink 代码示例,您需要使用 Ververica Platform (VVP),它可以在任何 Kubernetes 集群上轻松安装:

  • VVP 文档:安装在 Google Kubernetes Engine 上开始使用 Ververica Platform
  • 在 Azure Kubernetes 服务上开始使用 Ververica Platform
  • 在 AWS EKS 上开始使用 Ververica Platform

执行上述表 DDL 以在 VVP 的内置目录中注册新表。这可以通过打开 VVP -> SQL -> 编辑器窗口来完成。然后选择每个“CREATE TABLE … ;”单独声明并单击右侧的“运行选择”。

现在我们可以使用以下 SQL 脚本在 VVP 中创建并启动 Flink SQL 部署。它将生成的数据流连续存储到带有 Kafka 连接器的 Kafka 主题中,即以追加模式运行。

选择 SQL 查询并单击“运行选择”来运行下面的 SQL 查询:

INSERT INTO `orders_sink_append` SELECT * FROM `orders`;

VVP 将引导您完成新的 VVP 部署过程。只需遵循它并单击“开始”按钮即可。

以下是从上面的 SQL 查询创建的 VVP 部署的概述:

Kafka 连接器 – Upsert 模式

让我们看看另一个连接器以及它的不同之处。输入表的定义保持不变,但接收器连接器设置为“upsert-kafka”。为了清楚起见,让我们使用“upsert-kafka”连接器创建一个克隆表。

CREATE TABLE `orders_sink_upserts` (
`id` INT,
`bid` DOUBLE,
`order_time` TIMESTAMP(3),
`PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka',
'key.format' = 'csv',
'key.fields' = 'id',
'properties.bootstrap.servers' = '....kafka.svc.cluster.local:9092',
'主题' = 'orders_upserts',
'properties.group.id' = 'order-upserts-consumers',
'值.格式' = 'csv'
);

与上一节类似,我们创建另一个 VVP 部署将数据存储到表 orders_sink_upserts 中,使用“upsert-kafka”连接器和以下 SQL 语句:

INSERT INTO `orders_sink_upserts` SELECT * FROM `orders`;

VVP 部署的概述和作业图看起来与以前一样:

Flink Job 图的拓扑保持不变:

让我们检查 orders_sink_upserts 主题/表的输出:

SELECT * FROM `orders_sink_upserts`;

您可以看到 VVP SQL 编辑器会话 i 显示 100 个插入 (-I),然后其余更改是更新 (+U、-U)。datagen 中配置了 100 个唯一的订单 ID。这就是为什么仅在此处获取 100 条插入的原因,其余所有都是对这 100 个唯一订单的更新。

当您使用 Kafka 支持的 SQL 表时,这是两种流模式“append”和“upsert”之间的主要区别。Upsert 模式可以轻松获取最新更改或了解流数据是否是新的或是否应视为更新或删除。当特定键的任何值为 NULL 时,就会检测到删除。

“upsert-kafka”如何检测 upsert?

首先,任何使用“upsert-kafka”连接器的表都必须有一个主键。在上面的示例中,它是:

PRIMARY KEY (`id`) NOT ENFORCED

您还可以看到,Flink 在使用“upsert-kafka”表中的数据时又注入了一个运算符“ChangeLogNormalize”。注入的运算符聚合输入数据并返回特定主键的最新记录。

下面是另一个 VVP 部署来展示这一点。它将 upsert 表中的数据打印到标准输出:

CREATE TEMPORARY TABLE SinkTable WITH (‘connector’ = ‘print’) LIKE orders_sink_upserts (EXCLUDING OPTIONS);

INSERT INTO `SinkTable` SELECT * FROM `orders_sink_upserts`;

相反,如果从使用 append 模式工作的 orders_sink_append 读取数据,Flink 不会将 ChangelogNormalize 操作符注入到作业图中:

CREATE TEMPORARY TABLE `SinkTable`
WITH ('connector' = 'print')
LIKE `orders_sink_append`
(EXCLUDING OPTIONS);
INSERT INTO `SinkTable` SELECT * FROM `orders_sink_append`;

连接表:upsert 与追加模式

当多个表连接在一起时,两种不同的流模式会产生很大的差异。这种差异可能会导致数据重复。以下示例展示了如何在连接由 Kafka 主题支持的两个 Flink 表时避免数据重复。

以下是一个关于出租车运行的 Flink SQL 作业示例。我们有一个汽车注册表,每个汽车在第一个表中都有一个“蓝色”或“黑色”类别。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 12,2023

在 Flink SQL 中加入高度倾斜的流

Flink SQL 是一种功能强大的工具,可用于批处理和流式处理。它提供低代码数据分析,并符合 SQL 标准。在生产系统中,我们的客户发现,随着工作负载的扩展,以前运行良好的 SQL 作业可能会显著减慢,甚至失败。数据倾斜是一个常见且重要的原因。

数据倾斜是指变量的概率分布关于其均值的不对称性。换句话说,数据在某些属性上分布不均匀。本文讨论并分析了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。如果您是该领域的新手,或者有兴趣了解更多有关 Flink 或 Flink SQL 的信息,请查看末尾的相关信息。

加入具有键偏差的流

考虑以下场景:我们有一个 Users 表,其中包含有关某些业务应用程序的用户的信息。我们有一个 GenOrders 表,其中包含有关订单(买/卖东西)的信息。我们想知道 Users 表中每个用户的订单数。

(简化的)查询可能如下所示:

SQL
SELECT o.uid, COUNT(o.oid)
FROM GenOrders o
JOIN Users u ON o.uid = u.uid
GROUP BY o.uid;

在流连接的上下文中,了解两个表代表连续的信息流非常重要。在 Flink 中,聚合(例如 COUNT 和 SUM)是由聚合算子执行的。聚合运算符是有状态的,它将中间聚合结果存储在其状态中。默认情况下,流聚合运算符会逐条处理输入记录。当一条记录进来时,操作员会执行以下步骤:

  1. 从状态中检索累加器。
  2. 将记录累加/收回到累加器。
  3. 将累加器存储回状态。

每次读取状态的/写入是有一定成本的。

数据倾斜

在大规模的 Flink 应用中,流通常会根据特定的 key 来划分,并分发到多个任务中进行并行处理。我们将这样的 key 称为“分组 key”。如果记录分布不均匀,则某些任务会比其他任务重。而且较重的任务需要更长的时间才能完成,并且可能成为数据管道的瓶颈。当这种情况发生时,我们说数据出现了偏差。此外,如果数据是基于某些键分布的,我们将其称为“键倾斜”。

对于上述用例,我们可以按用户 ID 将记录分布到聚合任务以进行并行处理。由于我们要查找每个用户的订单数,因此按用户 ID 对数据进行分组是有意义的。下图使用颜色来指示不同用户的数据记录。我们可以看到,Red 用户有 8 条记录,比其他用户多得多。在这种情况下,我们称数据在用户 ID 上存在偏差。如果我们按照用户 ID 分配数据,那么处理红色记录的顶级聚合任务会处理更多的数据,并且可能会比其他任务花费更长的时间。

如何处理它?

MiniBatch 聚合

MiniBatch 聚合将输入记录放入缓冲区并在缓冲区满后或一段时间后执行聚合操作。这样,缓冲区中具有相同 key 值(键值)的记录会一起处理,因此每批每个键值只有一次状态写入。小批量聚合提高了吞吐量,因为聚合运算符的状态访问次数较少,并且输出较少的记录,特别是当有撤回消息且下游算子的性能不佳时。下图演示了这一点。

我们可以使用以下选项来实现:

Properties
table.exec.mini-batch.enabled: true # 启用 mini-批
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000 # [可选] MiniBatch 可以缓冲的最大输入记录数

  • 增量聚合状态

由于明显的键偏差,部分聚合可能会变得很大。一种解决方案是采用三个阶段的聚合。首先,每个上游任务对每个不同的键值进行本地聚合。不维护本地聚合的状态。这些结果按照分组键和桶键进行划分,并发送到第二阶段聚合(称为增量聚合)。增量聚合仅将不同的键存储在其状态中。增量聚合的结果按照分组键进行划分,并发送到第三阶段聚合(最终聚合),从而保持聚合函数值的状态。这种技术称为增量聚合。

我们可以使用以下选项可利用增量聚合以及小批量聚合、本地/全局聚合和拆分不同聚合:

Properties
table.exec.mini-batch.enabled: true # 启用小批量
table.exec.mini-batch.allow-latency: 5s # 将记录放入缓冲区并在 5 秒内进行聚合
table.exec.mini-batch.size: 10000
table.optimizer.agg-phase-strategy: TWO_PHASE/AUTO # 启用本地/全局聚合
table.optimizer.distinct-agg.split.enabled: true # 启用拆分不同聚合
table.optimizer.distinct-agg.split.bucket-num: 1024 # 桶数
table.optimizer.incremental-agg-enabled: true # 启用增量聚合,默认为true

注意:查询也必须支持 MiniBatch 聚合、Local/Global 聚合和 Split Distinct 聚合。

  • 结论

在本文中,我们讨论了数据倾斜对聚合相关案例的流连接的影响,以及潜在的解决方案。我们讨论了三种不同的解决方案:MiniBatch 聚合、本地/全局聚合和增量聚合。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
CDH 8月 8,2023

修复HDFS JournalNode和NameNode硬盘损坏

HDFS JournalNode和NameNode是HDFS文件系统的关键组件。JournalNode负责记录NameNode的所有操作,NameNode负责管理HDFS文件系统中的所有数据。如果JournalNode或NameNode的硬盘损坏,可能会导致HDFS文件系统不可用。

如果部署了Cloudera的HDFS HA,那么在更换了新硬盘后,重启JournalNode会出现”JournalNotFormattedException: Journal Storage Directory /opt/dfs/jn/nameservice1 not formatted”这样的错误。原因是在新的硬盘上的jn目录下没有VERSION文件用来恢复。

解决方法是重新创建对应的目录结构,并拷贝其他JournalNode上的VERSION文件。之后重启JournalNode,会自动进行目录的初始化,并自动同步日志。

以下是修复HDFS JournalNode硬盘损坏的步骤:

  • 重新创建对应的目录结构。

mkdir -p /opt/dfs/jn/nameservice1/current/

  • 拷贝其他JournalNode上的VERSION文件。

scp 目标服务器:/opt/dfs/jn/nameservice1/current/VERSION /opt/dfs/jn/nameservice1/current/VERSION

(3)更改JournalNode的权限。

chown -R hdfs:hdfs /opt/dfs/jn

重启JournalNode。

  • service hadoop-journalnode restart

以下是修复HDFS NameNode硬盘损坏的步骤:

如果启动NameNode出现

We expected txid 266899638, but got txid 267088002.

方法一:可以运行以下命令进行修复:

hadoop namenode -recover

方法二:如果上面的命令出错,如果NameNode是HA的,首先从健康节点的NameNode拷贝到出故障的NameNode,然后重启看是否正常。

如果仍然无法修复,可以联系Cloudera支持获取帮助。

以下是一些额外的提示:

定期备份HDFS JournalNode和NameNode的硬盘。

使用高可用性(HA)配置HDFS JournalNode和NameNode。

监控HDFS JournalNode和NameNode的健康状况。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 7,2023

Flink SQL:查询、窗口和时间 – 第 1 部分

时间是流处理中的一个关键因素,因为数据在到达时就被处理,并且必须快速处理以避免延迟。流处理中时间的普遍性意味着数据处理的设计必须考虑到时间因素。基于时间的窗口是流处理中常用的技术,用于确保数据得到及时处理。流处理可用于各种应用程序,例如监控系统、欺诈检测以及任何想要提供实时数据的应用程序。 -时间洞察。流处理中普遍存在的时间给数据处理带来了挑战和机遇。通过正确的设计,流处理可用于提供对数据流的实时洞察。在这篇文章中,我们将了解在使用 Flink SQL 时如何考虑时间。

时间戳和查询

在流处理中,时间戳是用于记录事件发生的时间。此信息可用于确定处理事件所需的时间,或监视流处理系统的性能。时间戳还可以用于对同时发生的事件进行排序。

示例:

  • 用户交互:点击
  • 应用程序日志:应用程序
  • 机器事务:信用卡、广告服务
  • 传感器:手机、汽车、物联网

流处理中涉及时间的查询通常是用于分析一段时间内的数据。这可能涉及查找数据中的趋势或模式,或比较不同时间段的数据。流处理系统通常提供对数据加窗的方法,以便仅考虑特定时间段的数据。这使得可以对传入的数据进行实时分析,或者对历史数据进行分析。

示例:

  • 最后一分钟的平均值
  • 使用最新汇率加入
  • 在 5 分钟内尝试 3 次失败后发出警报

时间属性

流处理中有多种不同的时间属性。它们是事件时间、处理时间和摄取时间。

  • 事件时间是事件发生时的时间戳。
  • 处理时间是处理事件的时间戳。
  • 摄取时间是事件被摄取到系统中的时间戳。

事件时间是唯一完全由用户控制的时间属性。所有其他时间属性均由系统控制。

事件时间允许用户控制事件发生的时间,这在某些情况下非常重要。处理时间可能会受到系统速度的影响。在某些情况下,系统可能会很慢并且处理时间可能会延迟。在其他情况下,系统可能很快,处理时间可能比预期早。

摄取时间完全不受用户控制。提取时间由系统控制,取决于系统的速度。

事件时间属性

事件时间属性是带有关联水印的 TIMESTAMP 或 TIMESTAMP_LTZ。水印使用有界无序水印策略。

  • TIMESTAMP 是一种记录精确到小数秒的日期和时间的数据类型,而 TIMESTAMP_LTZ(本地时区)是一种存储日期、时间和本地时区的数据类型。

访问 Flink 官方文档网站了解更多信息。

CREATE TABLE clicks (
  user STRING,
  url STRING,
  cTime TIMESTAMP_LTZ(3),
  cTime WITH WATERMARK AS cTime - INTERVAL '2' MINUTES
)

处理时间属性

处理时间属性是一个计算列,不保存数据;每当访问该属性时都会查询本地计算机时间。处理时间属性可以像常规 TIMESTAMP_LTZ 一样使用。

CREATE TABLE clicks (
  user STRING,
  url STRING,
  cTime AS PROCTIME()
)

随着时间推移,状态可能会过期,例如,在长达一小时的窗口中对每个用户的点击进行计数。

  • 事件时间 windows:clicks 将计入其发生的小时。水印触发关闭窗口并丢弃其状态。
  • 处理时间 windows:clicks 将计入当前时间处理时的小时。本地系统时钟触发关闭窗口并丢弃其状态。

如果输入表中没有时间属性,窗口操作员将不知道窗口何时完成。

时间戳与时间属性

在流处理中有两种表示时间的常见方法:时间戳和时间属性。时间戳是与事件关联的时间点,而时间属性可以存在于每个表模式中。时间戳更精确,但时间属性更灵活,可用于表示复杂的时间关系。

时态运算符

时态运算符是流处理中处理基于时间的数据的一种方法。有几种不同类型的时态运算符:

  • 窗口:窗口是数据的集合,在特定的时间范围内被处理。窗口可以是滚动的、跳跃的,或者会话的。
  • 聚合:聚合是对数据的集合进行操作,以产生单个值。聚合可以是平均值、计数或总和等。
  • 连接:连接是将两个数据集合结合起来,以便可以根据它们的时间属性进行比较。连接可以是基于事件时间、处理时间或摄取时间的。
  • 模式匹配:模式匹配是对数据的集合进行操作,以查找满足特定模式的数据。模式可以是简单的,如连续的数字,也可以是复杂的,如识别欺诈交易的模式。

时态运算符及时跟踪进度,以确定输入何时完成。它们发出无法更新的最终结果行,并且能够丢弃不再需要的状态(记录和结果)。

窗口示例

窗口是流处理中处理基于时间的数据的一种常见方法。有几种不同类型的窗口,每个窗口都有其优点和缺点。

  • 滚动窗口:滚动窗口是数据的集合,在特定的时间范围内被处理。滚动窗口随着时间的推移而移动,因此每次窗口中的数据都不同。滚动窗口适用于需要实时分析的数据。
  • 跳跃窗口:跳跃窗口是数据的集合,在特定的时间间隔内被处理。跳跃窗口不随着时间的推移而移动,因此每次窗口中的数据相同。跳跃窗口适用于需要每隔一段时间分析的数据。
  • 会话窗口:会话窗口是数据的集合,在没有活动的情况下保持打开状态。会话窗口适用于需要分析数据流中的活动的数据。

窗口表值函数

窗口表值函数 (WTVF) 是一种特殊函数,可用于从窗口中返回数据。WTVF 可以用于聚合窗口数据、计算窗口统计信息或查找模式。

结论

在流处理中,时间是一个关键因素。时间属性和时态运算符是流处理中处理基于时间的数据的强大工具。通过正确的设计,可以使用这些工具来提供对数据流的实时洞察。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 7,2023

Flink SQL:Join 系列 3(Lateral Joins、LAG 聚合函数)

Flink SQL 已成为低代码数据分析的事实上的标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。

什么是横向连接?

横向连接是一种 SQL 连接类型,允许您在 FROM 子句中指定子查询。然后针对外部查询中的每一行执行该子查询。横向联接可以通过减少表扫描次数来提高 SQL 查询的性能。换句话说,您可以将横向联接视为 SQL 中的 foreach 循环,它迭代集合,在每次迭代上应用一些转换,并且产生输出。横向联接在处理以分层或嵌套格式存储的数据时非常有用。

如何执行横向表联接

此示例将展示如何使用横向联接关联事件。给定一个包含人员地址的表,您需要找到每个州有两个人口最多的城市,并随着人们的流动而不断更新这些排名。

首先,使用连续聚合来计算每个城市的人口。虽然这很简单,但当人们移动时,Flink SQL 的真正威力就会显现出来。通过使用重复数据删除,当一个人搬家时,Flink 会自动为他们的旧城市发出撤回请求。因此,如果约翰从纽约搬到洛杉矶,纽约的人口将自动减少 1。这为我们提供了变更数据捕获的能力,而无需投资于设置它的实际基础设施!

有了这种动态手头有填充表后,您就可以使用 LATERAL 表连接来解决原始问题。与普通联接不同,横向联接允许子查询与 FROM 子句中其他参数的列相关联。与常规子查询不同,作为联接,横向可以返回多行。

sql复制代码
CREATE TABLE People (
    id INTEGER,
    city STRING,
    state STRING,
    arrival_time TIMESTAMP(3),
    arrival_watermark AS arrival_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'fake',
    'fields.id.expression' = '#{number.numberBetween ''1'',''100''}',
    'fields.city.expression' = '#{regexify ''(New York|Newport|Port|Shoesfort|Springfield){1}''}',
    'fields.state.expression' = '#{regexify ''(New York|Illinois|California|Washington){1}''}',
    'fields.arrival_time.expression' = '#{date.past ''15'',''seconds''}',
    'rows-per-second' = '10'
);
sql复制代码
CREATE TEMPORARY VIEW current_population AS
SELECT
    city,
    state,
    COUNT(*) AS population
FROM (
    SELECT
        city,
        state,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
    FROM People
) WHERE rownum = 1
GROUP BY city, state;
sql复制代码
SELECT
    state,
    city,
    population
FROM
    (SELECT DISTINCT state FROM current_population) states,
    LATERAL (
        SELECT city, population
        FROM current_population
        WHERE state = states.state
        ORDER BY population DESC
        LIMIT 2
    );

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east

上一 1 2 3 4 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.