gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面12 )
Java, python 9月 7,2023

用ChatGPT自动生成流程图

我们看别人代码时,总希望有流程图,这样可以一目了然,不过自己写的代码,又不想花几个小时去画流程图。有没有更好的方法呢?

方法就是用ChatGPT等大模型自动生成流程图,并用python等语言实现自动输出流程图。

1、生成流程图的 Mermaid语法

ChatGPT提示语:

对下面的代码生成流程图,并用Mermaid语法输出 。

2、把Mermaid语法的流程图输出图片

要使用Python或Java生成Mermaid语法输出的流程图图片,您可以使用以下方法:

Python 方法:

  1. 使用 mermaid-cli 工具来将Mermaid代码转换为图片。首先,安装mmdc(mermaid-cli的执行程序)。
  2. 使用Python调用mmdc。
import os

def generate_mermaid_image(mermaid_code, output_path):
    with open("temp.mmd", "w") as file:
        file.write(mermaid_code)
    os.system(f"mmdc -i temp.mmd -o {output_path}")
    os.remove("temp.mmd")

mermaid_code = """
graph TD;
    A-->B;
    A-->C;
    B-->D;
    C-->D;
"""

generate_mermaid_image(mermaid_code, "output.png")

Java 方法:

  1. 与Python方法相同,首先安装 mermaid-cli。
  2. 使用Java的 Runtime 类来调用 mmdc。
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

public class MermaidGenerator {

    public static void generateMermaidImage(String mermaidCode, String outputPath) throws IOException, InterruptedException {
        File tempFile = File.createTempFile("temp", ".mmd");
        try (FileWriter writer = new FileWriter(tempFile)) {
            writer.write(mermaidCode);
        }

        Process process = Runtime.getRuntime().exec("mmdc -i " + tempFile.getAbsolutePath() + " -o " + outputPath);
        process.waitFor();

        tempFile.delete();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        String mermaidCode = """
            graph TD;
                A-->B;
                A-->C;
                B-->D;
                C-->D;
            """;
        generateMermaidImage(mermaidCode, "output.png");
    }
}

请注意,这两种方法都需要您在计算机上安装并配置mermaid-cli。此外,这两种方法都是使用临时文件来存储Mermaid代码,并在转换后删除它。这是为了简化调用mmdc的过程,但您可以根据需要进行调整。

作者 east
mysql, 大数据开发, 提示词 9月 7,2023

java批量生成海量测试数据及用ChatGPT提示语一键生成的方法

在做大数据开发时,为了测试性能等,需要上千万,甚至TB或PB级别的,在测试环境可能没有那么多数据,这时可以考虑进行造测试数据。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Random;

public class TestDataGenerator {
    public static void main(String[] args) {
        String url = "jdbc:mysql://localhost:3306/your_database";
        String username = "your_username";
        String password = "your_password";
        int batchSize = 1000; // 每批次插入的数据量
        int totalRecords = 1000000; // 总共要生成的数据量

        try {
            Connection connection = DriverManager.getConnection(url, username, password);
            connection.setAutoCommit(false);

            String insertQuery = "INSERT INTO test (id, callid, type, ...其他列...) VALUES (?, ?, ?, ...其他值...)";
            PreparedStatement preparedStatement = connection.prepareStatement(insertQuery);

            Random random = new Random();

            for (int i = 1; i <= totalRecords; i++) {
                // 设置每个字段的值,根据表结构设置对应的数据生成逻辑
                preparedStatement.setLong(1, i);
                preparedStatement.setString(2, "CallSheet" + i);
                preparedStatement.setString(3, "Type" + (random.nextInt(5) + 1));
                // 设置其他字段的值...

                preparedStatement.addBatch();

                if (i % batchSize == 0) {
                    preparedStatement.executeBatch();
                    connection.commit();
                }
            }

            preparedStatement.executeBatch();
            connection.commit();

            preparedStatement.close();
            connection.close();

            System.out.println("测试数据生成完成!");
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

请将上述示例中的数据库连接信息和插入逻辑根据您的数据库设置和表结构进行相应的修改。此程序将会在数据库中插入海量测试数据。

更方便的方法是在ChatGPT等大模型,输入下面提示语:

根据下面的表结构,生成100万的测试数据,给出详细的java实现代码或存储过程代码:【表结构】

亲测在New Bing是可以生成可以运行的代码。

作者 east
Flink 9月 5,2023

Flink CDC对接数据报错:you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code

这个错误消息表明在运行 Flink CDC 连接到 MySQL 数据库时,出现了权限问题。错误消息中提到需要 “REPLICATION SLAVE privilege” 权限来执行操作,但是当前用户似乎没有该权限。

错误原因:

  • Flink CDC 需要通过 MySQL 的二进制日志来捕获数据库的变更,以便进行实时流式处理。这需要 MySQL 用户具有 REPLICATION SLAVE 权限,以允许 Flink CDC 作为 MySQL 复制从机来读取二进制日志。

解决方案: 为了解决这个问题,您可以采取以下步骤:

  1. 授予 REPLICATION SLAVE 权限:
    • 通过 MySQL 的 root 或具有足够权限的用户登录。
    • 执行以下 SQL 命令,将 REPLICATION SLAVE 权限授予 Flink CDC 使用的用户名(在 Flink 配置中指定的用户名):sql复制代码GRANT REPLICATION SLAVE ON *.* TO 'your_cdc_user'@'%' IDENTIFIED BY 'your_password';
      • your_cdc_user 替换为 Flink CDC 使用的用户名。
      • your_password 替换为 Flink CDC 使用的密码。
  2. 重新启动 Flink CDC 应用:
    • 确保 Flink CDC 应用程序重新启动,并尝试重新连接到 MySQL 数据库。
  3. 检查 Flink CDC 配置:
    • 确保 Flink CDC 配置文件中的连接字符串、用户名和密码正确配置,以匹配 MySQL 数据库的设置。
  4. 检查防火墙和网络配置:
    • 确保 MySQL 数据库的防火墙和网络配置允许 Flink CDC 应用程序连接到数据库端口。
  5. 查看 MySQL 错误日志:
    • 检查 MySQL 错误日志以获取更多关于访问被拒绝的详细信息。可能会提供有关错误原因的更多线索。
  6. 升级或重新配置 Flink CDC:
    • 如果问题仍然存在,考虑升级 Flink CDC 或重新配置其版本,以确保与 MySQL 数据库兼容性。

通过执行上述步骤,您应该能够解决 Flink CDC 连接到 MySQL 数据库时出现的权限问题。确保授予足够的权限,并检查配置以确保准确性。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
大数据开发 9月 4,2023

scala比较日期字符串的大小

使用字符串的compareTo方法:如果您的日期字符串是按照“年-月-日”的格式排列的,那么您可以直接使用字符串的compareTo方法来比较它们,无需转换为日期对象。例如,您可以使用以下的Scala代码来比较两个日期字符串1:

val date1 = "2023-09-03"
val date2 = "2023-08-21"
val result = date1.compareTo(date2)
// result: Int = 1
// result > 0 表示 date1 晚于 date2
// result < 0 表示 date1 早于 date2
// result == 0 表示 date1 等于 date2
作者 east
datax 8月 30,2023

DataX对接数据脱敏数据的实例

datax对接mysql数据,对姓名只保留姓,名变成**。对这种简单的脱敏,可以不用修改datax源码,直接在配置文件上实现。

//要脱敏的字段在第2个,也就是record.getColumn(1)
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "name", // 姓名的字段
              "age"
            ],
            "splitPk": "",
            "connection": [
              {
                "table": [
                  "test"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "doriswriter",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "INT"
              },
              {
                "name": "name",
                "type": "VARCHAR"
              },
              {
                "name": "age",
                "type": "INT"
              }
            ],
            // 省略其他参数
          }
        },
        // 添加transformer部分
        "transformer": [
                                 {
                                "name": "dx_groovy",
                                "parameter": {
                                  "code": "Column name = record.getColumn(1);def first = name.asString()[0];def last =null; last= \"*\" * (name.asString().length() - 1);def masked = null; masked = first + last; record.setColumn(1, new StringColumn(masked)); return record;"             
                                 }
                                }],
    // 省略其他部分
  }
}
作者 east
Spark 8月 30,2023

监控Spark运行超时及kill掉重跑

在用oozie的调度任务,用shell调度spark任务,在生产环境运行时,正常1-2个小时跑完的任务,有时出现跑了5、6个小时还没跑完,造成的原因很奇怪,有可能是数据倾斜,任务占用太多资源偶尔出错。为了监控这种现象,并设定阈值为3个小时,如果超过3小时没跑完就kill掉。可以结合oozie失败重试机制实现重跑。

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;

public class YarnJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个Configuration对象,用于加载Hadoop和Yarn的配置文件
        Configuration conf = new Configuration();
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.addResource("yarn-site.xml");

        // 创建一个YarnClient对象,用于访问Yarn的api
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // 调用Yarn的api,获取所有正在运行的应用程序
        List<ApplicationReport> apps = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));

        // 遍历每个应用程序
        for (ApplicationReport app : apps) {
            // 获取应用程序的ID和名称
            ApplicationId appId = app.getApplicationId();
            String appName = app.getName();
            // 判断应用程序是否是由Oozie Shell命令启动的spark任务
            if (appName.startsWith("oozie:launcher")) {
                // 如果是,打印日志或者做其他操作
                System.out.println("Found Oozie Shell spark job: " + appId);
                // 获取应用程序的开始时间和当前时间
                long startTime = app.getStartTime();
                long currentTime = System.currentTimeMillis();
                // 计算应用程序的运行时间(毫秒)
                long jobDuration = currentTime - startTime;
                // 判断应用程序的运行时间是否超过阈值
                if (jobDuration > TIMEOUT_THRESHOLD) {
                    // 如果超过阈值,调用Yarn的api,终止应用程序
                    yarnClient.killApplication(appId);
                    // 打印日志或者做其他操作
                    System.out.println("Killed Oozie Shell spark job: " + appId);
                    // 重新运行应用程序或者做其他操作
                    // ...
                } else {
                    // 如果没有超过阈值,打印日志或者做其他操作
                    System.out.println("Job " + appId + " is running normally");
                }
            }
        }

        // 关闭YarnClient对象
        yarnClient.stop();
    }
}

如果要监控oozie的调度任务,也可以用下面的方法:

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;

public class OozieJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个OozieClient对象,用于调用oozie的api
        OozieClient oozieClient = new OozieClient("http://localhost:11000/oozie");
        // 调用oozie的api,查询所有正在运行的作业
        List<WorkflowJob> jobs = oozieClient.getJobsInfo("status=RUNNING");
        // 遍历每个作业
        for (WorkflowJob job : jobs) {
            // 获取作业的ID和信息
            String jobId = job.getId();
            String jobInfo = job.toString();
            // 解析作业的信息,获取作业的运行时间
            long jobDuration = parseJobDuration(jobInfo);
            // 判断作业的运行时间是否超过阈值
            if (jobDuration > TIMEOUT_THRESHOLD) {
                // 如果超过阈值,调用oozie的api,终止作业
                oozieClient.kill(jobId);
                // 打印日志或者做其他操作
                System.out.println("Job " + jobId + " is killed due to timeout");
                // 重新运行作业或者做其他操作
                // ...
            } else {
                // 如果没有超过阈值,打印日志或者做其他操作
                System.out.println("Job " + jobId + " is running normally");
            }
        }
    }

    // 定义一个方法,用于解析作业的信息,并返回作业的运行时间(毫秒)
    private static long parseJobDuration(String jobInfo) {
        // 创建一个Matcher对象,用于匹配正则表达式和作业信息
        Matcher matcher = DURATION_PATTERN.matcher(jobInfo);
        // 如果找到了匹配的结果,就从结果中提取天数、小时数和分钟数,并转换为毫秒
        if (matcher.find()) {
            int days = Integer.parseInt(matcher.group(1));
            int hours = Integer.parseInt(matcher.group(2));
            int minutes = Integer.parseInt(matcher.group(3));
            long duration = (days * 24 + hours) * 60 + minutes; // duration in minutes
            duration *= 60 * 1000; // duration in milliseconds
            return duration;
        } else {
            // 如果没有找到匹配的结果,就返回-1表示无法解析
            return -1;
        }
    }
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 8月 29,2023

druid连接不上doris有哪些可能原因

如果你在使用Druid连接池连接Doris时遇到问题,无法连接上数据库,可能有以下几个原因和解决方案:

  1. 网络配置问题:确保你的应用程序能够与Doris数据库所在的服务器进行通信。检查防火墙设置、网络配置以及Doris数据库的监听端口是否正确开放。
  2. Doris JDBC驱动问题:确认你使用的是正确版本的Doris JDBC驱动程序。不同版本的驱动程序可能与特定版本的Doris不兼容。建议检查并使用与你所连接的Doris版本匹配的驱动程序。
  3. 连接参数错误:确认你在连接字符串中提供了正确的主机名、端口号、数据库名称以及用户名和密码。确保这些参数与Doris数据库的实际配置相匹配。
  4. Druid连接池配置问题:检查你的Druid连接池配置是否正确。确保连接URL正确设置为Doris数据库的URL,并提供有效的用户名和密码。
  5. 数据库服务问题:确保Doris数据库服务正在运行,并且具有足够的资源来处理连接请求。检查Doris数据库的日志文件,查看是否有任何错误或异常信息。
  6. 数据库权限问题:验证你使用的用户名和密码是否具有足够的权限来连接Doris数据库。确保用户拥有所需的数据库访问权限。
  7. 依赖项问题:检查你的项目依赖项是否正确配置。确保在项目中包含了Druid连接池和Doris JDBC驱动的正确版本。

如果你仔细检查并解决了上述问题,应该能够成功连接到Doris数据库。如果问题仍然存在,建议进一步检查错误日志、调试信息和网络连接状态,以确定具体的故障原因,并考虑寻求更深入的技术支持。

作者 east
doris, Java 8月 29,2023

如何用druid连接池连接doris

在对doris进行单元测试,要测试大量sql,没有连接池时,太多连接会出现问题。于是想采用druid连接池。

要使用Druid连接池连接Doris,你需要在Java代码中进行配置。以下是一个简单的示例,包含Druid连接池的配置和连接到Doris的Java代码。

首先,你需要在项目的依赖中添加Druid和Doris的驱动程序。在Maven项目中,可以在pom.xml文件中添加以下依赖:

<dependencies>
    <!-- Druid -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.2.4</version>
    </dependency>

    <!-- Doris JDBC Driver -->
    <dependency>
        <groupId>com.mysql.jdbc</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
</dependencies>

接下来,创建一个名为DruidDorisExample的Java类,并编写以下代码:

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DruidDorisExample {
    public static void main(String[] args) {
        // 创建Druid连接池
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://localhost:9030/my_database");
        dataSource.setUsername("username");
        dataSource.setPassword("password");

        try (Connection connection = dataSource.getConnection()) {
            // 执行查询语句
            Statement statement = connection.createStatement();
            ResultSet resultSet = statement.executeQuery("SELECT * FROM my_table");

            // 遍历结果集并输出数据
            while (resultSet.next()) {
                // 处理每一行的数据
                // 例如:String columnValue = resultSet.getString("column_name");
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

在上面的代码中,你需要将jdbc:mysql://localhost:9030/my_database替换为Doris的连接信息,以及正确的用户名和密码。

此外,你还可以通过在代码中设置其他Druid连接池的配置来优化连接性能。例如,你可以设置最大连接数、是否开启预处理语句缓存等。

要进行更详细的Druid连接池和Doris配置,你需要创建一个名为druid.properties的配置文件,并在main()方法中加载它:

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DruidDorisExample {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        try (InputStream inputStream = DruidDorisExample.class.getClassLoader().getResourceAsStream("druid.properties")) {
            properties.load(inputStream);
        }

        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
            Connection connection = dataSource.getConnection();

            // 执行查询语句...
            
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
java复制代码import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DruidDorisExample {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        try (InputStream inputStream = DruidDorisExample.class.getClassLoader().getResourceAsStream("druid.properties")) {
            properties.load(inputStream);
        }

        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
            Connection connection = dataSource.getConnection();

            // 执行查询语句...
            
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

上述代码会从类路径下加载名为druid.properties的配置文件,该文件包含了Druid连接池的详细配置项。你可以根据需要在配置文件中设置相关属性,例如连接URL、用户名、密码、最大连接数等。配置文件的示例:

# Druid连接池配置
driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://localhost:9030/my_database
username=username
password=password

# 连接池配置
initialSize=5
maxActive=20
minIdle=1
maxWait=60000

请确保druid.properties文件与Java代码在同一个目录下或处于类路径中。

以上是连接到Doris的简单示例代码和配置文件。你可以根据自己的需求进行进一步调整和优化。

我自己实践时,参考网上别的配置文件,后来发现还是连接不上,而且一直在运行, 通过排查代码,参考
Druid连接池关键代码解读 ,发现是下面的配置影响了:

#测试SQL  
validationQuery=select 1 from test
原来这个表test并不存在,所以测试不通过,又没有明显报错。
作者 east
Java 8月 29,2023

Druid连接池关键代码解读

Druid连接池中的一个方法,作用是获取一个数据库连接(DruidPooledConnection)。下面对其中的主要逻辑进行解释:

  1. getConnectionInternal(maxWaitMillis):调用内部方法获取数据库连接。如果连接池已满或获取连接超时,则会抛出异常GetConnectionTimeoutException。
  2. isTestOnBorrow():检查是否需要在借用连接时进行连接有效性验证。
    • 如果需要验证连接有效性:
      • 调用testConnectionInternal(poolableConnection.getConnection())方法测试连接的有效性。如果连接有效,则跳出循环。
      • 如果连接无效,将其丢弃并从连接池中移除。
  3. 如果不需要在借用连接时进行连接有效性验证:
    • 检查连接是否已关闭,如果是,则丢弃该连接。
    • 如果开启了空闲连接验证(isTestWhileIdle()):
      • 计算连接的空闲时间。
      • 如果空闲时间超过了设定的时间间隔(timeBetweenEvictionRunsMillis),则检查连接的有效性。
      • 如果连接有效,则跳出循环。
      • 如果连接无效,将其丢弃并从连接池中移除。
  4. 如果开启了移除废弃连接(isRemoveAbandoned()):
    • 获取当前线程的堆栈信息,并将其设置到连接对象中。
    • 设置连接开始时间和追踪状态。
    • 将连接加入活跃连接集合中。
  5. 如果未开启默认自动提交(isDefaultAutoCommit()):
    • 将连接的自动提交设置为false。
  6. 返回获取的连接对象。

总体来说,这段代码的作用是从Druid连接池中获取一个可用的数据库连接,并在一系列条件判断和验证后返回该连接对象。其中包括了连接超时处理、连接有效性验证、废弃连接移除等功能,保证连接的可用性和质量。

解读的代码如下:

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;

        DruidPooledConnection poolableConnection;
        while(true) {
            while(true) {
                try {
                    poolableConnection = this.getConnectionInternal(maxWaitMillis);
                    break;
                } catch (GetConnectionTimeoutException var17) {
                    if (notFullTimeoutRetryCnt > this.notFullTimeoutRetryCount || this.isFull()) {
                        throw var17;
                    }

                    ++notFullTimeoutRetryCnt;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt);
                    }
                }
            }

            if (this.isTestOnBorrow()) {
                boolean validate = this.testConnectionInternal(poolableConnection.getConnection());
                if (validate) {
                    break;
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("skip not validate connection.");
                }

                Connection realConnection = poolableConnection.getConnection();
                this.discardConnection(realConnection);
            } else {
                Connection realConnection = poolableConnection.getConnection();
                if (realConnection.isClosed()) {
                    this.discardConnection((Connection)null);
                } else {
                    if (!this.isTestWhileIdle()) {
                        break;
                    }

                    long currentTimeMillis = System.currentTimeMillis();
                    long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis();
                    long idleMillis = currentTimeMillis - lastActiveTimeMillis;
                    long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis();
                    if (timeBetweenEvictionRunsMillis <= 0L) {
                        timeBetweenEvictionRunsMillis = 60000L;
                    }

                    if (idleMillis < timeBetweenEvictionRunsMillis) {
                        break;
                    }

                    boolean validate = this.testConnectionInternal(poolableConnection.getConnection());
                    if (validate) {
                        break;
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("skip not validate connection.");
                    }

                    this.discardConnection(realConnection);
                }
            }
        }

        if (this.isRemoveAbandoned()) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            poolableConnection.setConnectStackTrace(stackTrace);
            poolableConnection.setConnectedTimeNano();
            poolableConnection.setTraceEnable(true);
            synchronized(this.activeConnections) {
                this.activeConnections.put(poolableConnection, PRESENT);
            }
        }

        if (!this.isDefaultAutoCommit()) {
            poolableConnection.setAutoCommit(false);
        }

        return poolableConnection;
    }
作者 east
datax 8月 25,2023

DataX Core TransformerRegistry类详细解读

TransformerRegistry 类,用于注册、加载和管理数据转换器。以下是对各个部分的作用解释:

  • 首先,该类维护了一个名为 registedTransformer 的映射,用于存储已注册的转换器信息。
  • 在静态代码块中,内置了一些原生转换器实例,并注册到 registedTransformer 中。
  • loadTransformerFromLocalStorage 方法用于从本地存储加载转换器,可以选择加载指定的转换器。它遍历指定目录下的转换器文件,尝试加载

每个转换器,如果加载失败则记录错误日志。

  • loadTransformer 方法用于加载单个转换器。它根据转换器配置文件的路径加载配置,然后根据配置中的类名加载对应的类。根据类的类型(是否继承自 ComplexTransformer 或 Transformer),将转换器实例注册到 registedTransformer 中。
  • getTransformer 方法用于获取指定名称的转换器信息,从 registedTransformer 中查找,如果找不到则可能会从磁盘读取(TODO: 根据注释,这部分可能是未实现的功能)。
  • registTransformer 和 registComplexTransformer 方法用于注册转换器。它们会检查转换器名称是否满足命名规则,并将转换器信息构建成 TransformerInfo 实例后添加到 registedTransformer 中。
  • checkName 方法用于检查转换器名称是否满足命名规则,根据 isNative 参数判断是否需要以 “dx_” 开头。
  • buildTransformerInfo 方法用于构建 TransformerInfo 实例,其中包含了转换器的类加载器、是否为原生转换器以及实际的转换器实例。
  • getAllSuportTransformer 方法返回支持的所有转换器的名称列表。

这个类的主要作用是提供了转换器的注册、加载和管理功能,使得数据转换器可以被动态添加和使用。它在数据处理流程中,特别是数据抽取和转换阶段,起到了很重要的作用。

public class TransformerRegistry {

    private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
    private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();

    static {
        // 添加内置的一些原生转换器
        // 本地存储和从服务器加载的转换器将延迟加载
        registTransformer(new SubstrTransformer());
        registTransformer(new PadTransformer());
        registTransformer(new ReplaceTransformer());
        registTransformer(new FilterTransformer());
        registTransformer(new GroovyTransformer());
        registTransformer(new DigestTransformer());
    }

    // 从本地存储加载转换器(默认情况下加载所有转换器)
    public static void loadTransformerFromLocalStorage() {
        loadTransformerFromLocalStorage(null);
    }

    // 从本地存储加载转换器(可选加载特定转换器)
    public static void loadTransformerFromLocalStorage(List<String> transformers) {
        String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();
        if (null == paths) {
            return;
        }

        for (final String each : paths) {
            try {
                if (transformers == null || transformers.contains(each)) {
                    loadTransformer(each);
                }
            } catch (Exception e) {
                LOG.error(String.format("跳过转换器(%s)的加载,loadTransformer 出现异常(%s)", each, e.getMessage()), e);
            }
        }
    }

    // 加载指定的转换器
    public static void loadTransformer(String each) {
        String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
        Configuration transformerConfiguration;
        try {
            transformerConfiguration = loadTransFormerConfig(transformerPath);
        } catch (Exception e) {
            LOG.error(String.format("跳过转换器(%s),加载 transformer.json 出错,路径 = %s", each, transformerPath), e);
            return;
        }

        String className = transformerConfiguration.getString("class");
        if (StringUtils.isEmpty(className)) {
            LOG.error(String.format("跳过转换器(%s),未配置 class,路径 = %s,配置 = %s", each, transformerPath, transformerConfiguration.beautify()));
            return;
        }

        String funName = transformerConfiguration.getString("name");
        if (!each.equals(funName)) {
            LOG.warn(String.format("转换器(%s) 的名称与 transformer.json 配置的名称[%s] 不匹配,将忽略 JSON 的名称,路径 = %s,配置 = %s", each, funName, transformerPath, transformerConfiguration.beautify()));
        }
        JarLoader jarLoader = new JarLoader(new String[]{transformerPath});

        try {
            Class<?> transformerClass = jarLoader.loadClass(className);
            Object transformer = transformerClass.newInstance();
            if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
                ((ComplexTransformer) transformer).setTransformerName(each);
                registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
            } else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
                ((Transformer) transformer).setTransformerName(each);
                registTransformer((Transformer) transformer, jarLoader, false);
            } else {
                LOG.error(String.format("加载 Transformer 类(%s) 出错,路径 = %s", className, transformerPath));
            }
        } catch (Exception e) {
            // 错误的转换器跳过
            LOG.error(String.format("跳过转换器(%s),加载 Transformer 类出错,路径 = %s ", each, transformerPath), e);
        }
    }

    private static Configuration loadTransFormerConfig(String transformerPath) {
        return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
    }

    public static TransformerInfo getTransformer(String transformerName) {
        TransformerInfo result = registedTransformer.get(transformerName);

        // 如果 result == null,则尝试从磁盘读取
        // TODO: 这部分可能是未实现的功能,待开发

        return result;
    }

    public static synchronized void registTransformer(Transformer transformer) {
        registTransformer(transformer, null, true);
    }

    public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) {
        checkName(transformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(transformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName());
        }

        registedTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader));
    }

    public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) {
        checkName(complexTransformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName());
        }

        registedTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader));
    }

    private static void checkName(String functionName, boolean isNative) {
        boolean checkResult = true;
        if (isNative) {
            if (!functionName.startsWith("dx_")) {
                checkResult = false;
            }
        } else {
            if (functionName.startsWith("dx_")) {
                checkResult = false;
            }
        }

        if (!checkResult) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative);
        }
    }

    private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
        TransformerInfo transformerInfo = new TransformerInfo();
        transformerInfo.setClassLoader(classLoader);
        transformerInfo.setIsNative(isNative);
        transformerInfo.setTransformer(complexTransformer);
        return transformerInfo;
    }

    public static List<String> getAllSuportTransformer() {
        return new ArrayList<String>(registedTransformer.keySet());
    }
}
作者 east
doris 8月 25,2023

DataX DorisWriter 插件DorisStreamLoadObserver类详细解读

DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:

  1. DorisStreamLoadObserver(Keys options): 这是类的构造函数,用于初始化加载数据所需的配置选项。
  2. void streamLoad(WriterTuple data) throws Exception: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple 对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。
  3. private void checkStreamLoadState(String host, String label) throws IOException: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。
  4. private byte[] addRows(List<byte[]> rows, int totalBytes): 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。
  5. private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。
  6. private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。
  7. private HttpEntity getHttpEntity(CloseableHttpResponse response): 这是一个实用方法,用于从 HTTP 响应中提取实体内容。
  8. private String getLoadHost(): 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。

DorisStreamLoadObserver 类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class DorisStreamLoadObserver {
    private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);

    private Keys options;

    private long pos;
    private static final String RESULT_FAILED = "Fail";
    private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
    private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
    private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
    private static final String RESULT_LABEL_PREPARE = "PREPARE";
    private static final String RESULT_LABEL_ABORTED = "ABORTED";
    private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";

    public DorisStreamLoadObserver(Keys options) {
        this.options = options;
    }

    // 数据写入 Doris 的主要方法
    public void streamLoad(WriterTuple data) throws Exception {
        String host = getLoadHost();
        if (host == null) {
            throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");
        }
        String loadUrl = new StringBuilder(host)
                .append("/api/")
                .append(options.getDatabase())
                .append("/")
                .append(options.getTable())
                .append("/_stream_load")
                .toString();
        LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
        Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
        LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));
        final String keyStatus = "Status";
        if (null == loadResult || !loadResult.containsKey(keyStatus)) {
            throw new IOException("Unable to flush data to Doris: unknown result status.");
        }
        LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
        if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
            throw new IOException(
                    new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString()
            );
        } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
            LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
            checkStreamLoadState(host, data.getLabel());
        }
    }

    // 检查数据加载状态的方法
    private void checkStreamLoadState(String host, String label) throws IOException {
        int idx = 0;
        while (true) {
            try {
                TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
            } catch (InterruptedException ex) {
                break;
            }
            try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
                HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());
                httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
                httpGet.setHeader("Connection", "close");

                try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
                    HttpEntity respEntity = getHttpEntity(resp);
                    if (respEntity == null) {
                        throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                "could not get the final state of label[%s].\n", label), null);
                    }
                    Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
                    String labelState = (String) result.get("state");
                    if (null == labelState) {
                        throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
                    }
                    LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
                    switch (labelState) {
                        case LAEBL_STATE_VISIBLE:
                        case LAEBL_STATE_COMMITTED:
                            return;
                        case RESULT_LABEL_PREPARE:
                            continue;
                        case RESULT_LABEL_ABORTED:
                            throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +
                                    "label[%s] state[%s]\n", label, labelState), null, true);
                        case RESULT_LABEL_UNKNOWN:
                        default:
                            throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                    "label[%s] state[%s]\n", label, labelState), null);
                    }
                }
            }
        }
    }

    // 根据格式将数据行拼接成字节数组
    private byte[] addRows(List<byte[]> rows, int totalBytes) {
        if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
            Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
            byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
            for (byte[] row : rows) {
                bos.put(row);
                bos.put(lineDelimiter);
            }
            return bos.array();
        }

        if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
            bos.put("[".getBytes(StandardCharsets.UTF_8));
            byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
            boolean isFirstElement = true;
            for (byte[] row : rows) {
                if (!isFirstElement) {
                    bos.put(jsonDelimiter);
                }
                bos.put(row);
                isFirstElement = false;
            }
            bos.put("]".getBytes(StandardCharsets.UTF_8));
            return bos.array();
        }
        throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
    }

private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
        RequestConfig requestConfig = RequestConfig.custom()
                .setSocketTimeout(120 * 1000)
                .setConnectTimeout(120 * 1000)
                .setConnectionRequestTimeout(120 * 1000)
                .build();
        try (CloseableHttpClient httpclient = HttpClientBuilder.create()
                .setDefaultRequestConfig(requestConfig)
                .setRedirectStrategy(new DefaultRedirectStrategy())
                .build()) {
            HttpPut httpPut = new HttpPut(loadUrl);
            httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
            httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
            httpPut.setEntity(new ByteArrayEntity(data));
            try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
                HttpEntity respEntity = getHttpEntity(resp);
                if (respEntity == null) {
                    throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");
                }
                return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
            }
        }
    }

    // 构造 HTTP 请求中的基本认证头部
    private String getBasicAuthHeader(String username, String password) {
        String credentials = username + ":" + password;
        byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);
        String base64Credentials = Base64.encodeBase64String(credentialsBytes);
        return "Basic " + base64Credentials;
    }

    // 从 HTTP 响应中获取实体内容
    private HttpEntity getHttpEntity(CloseableHttpResponse response) {
        if (response != null) {
            return response.getEntity();
        }
        return null;
    }

    // 获取用于加载数据的主机地址
    private String getLoadHost() {
        List<String> hosts = options.getDorisStreamLoadUrls();
        for (String host : hosts) {
            try {
                HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();
                connection.setRequestMethod("HEAD");
                int responseCode = connection.getResponseCode();
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    return host;
                }
            } catch (IOException e) {
                LOG.warn("Failed to connect to host: {}", host);
            }
        }
        return null;
    }
}
作者 east
doris 8月 25,2023

DataX DorisWriter 插件DorisWriterManager类详细解读

DorisWriterManager 的类,用于将数据写入到 Doris 中。以下是代码的具体作用和功能解释:

  1. 导入必要的包和类: 代码开头导入了所需的包和类,包括日志记录、线程池、字符编码和其他相关工具类。
  2. 类成员变量定义: 下面是一些类的成员变量定义,这些变量在类的不同方法中使用:
    • LOG: 用于记录日志的 Logger 对象。
    • visitor: DorisStreamLoadObserver 类的实例,用于处理数据写入 Doris 的观察者。
    • options: Keys 类的实例,包含了一些配置选项。
    • buffer: 存储待写入 Doris 的数据。
    • batchCount: 当前批次中的记录数量。
    • batchSize: 当前批次中的数据大小。
    • closed: 标志位,表示是否已关闭写入。
    • flushException: 异步刷新数据时可能发生的异常。
    • flushQueue: 用于异步刷新数据的队列。
    • scheduler: 用于定期刷新数据的调度器。
    • scheduledFuture: 用于取消定时任务的句柄。
  3. 构造函数 DorisWriterManager: 构造函数接受一个 Keys 对象作为参数,设置了初始化的配置信息,并初始化了 visitor 和 flushQueue。接着,它调用 startScheduler() 启动定期刷新任务,以及 startAsyncFlushing() 启动异步刷新线程。
  4. startScheduler() 方法: 此方法负责启动定时刷新任务。它首先调用 stopScheduler() 停止之前的定时任务。然后,创建一个单线程的调度器(scheduler),并设置一个定时任务,定期触发数据刷新操作。在定时任务内部,它会检查是否关闭了写入操作,然后根据配置信息进行数据刷新。如果当前批次为空,重新启动定时任务,确保数据持续刷新。
  5. stopScheduler() 方法: 此方法用于停止定时任务。它会取消之前的定时任务并关闭调度器。
  6. writeRecord(String record) 方法: 该方法用于将记录写入缓冲区。它首先调用 checkFlushException() 方法检查是否存在刷新异常。然后,将记录转换成字节数组并添加到缓冲区中,同时更新批次计数和数据大小。如果当前批次的记录数量或数据大小超过了阈值,就会触发数据刷新。
  7. flush(String label, boolean waitUntilDone) 方法: 此方法用于手动触发数据刷新操作。它首先检查是否存在刷新异常,然后根据当前批次的情况决定是否执行刷新。如果当前批次为空,且 waitUntilDone 为真,它会等待之前的异步刷新操作完成。否则,它将当前批次的数据放入刷新队列,并根据 waitUntilDone 参数决定是否等待刷新操作完成。
  8. close() 方法: 此方法用于关闭 DorisWriterManager。它首先检查是否已经关闭,然后触发一次最终的数据刷新操作。如果当前批次有数据,会记录相应日志。最后,它检查是否有刷新异常并抛出相应异常。
  9. createBatchLabel() 方法: 此方法用于创建批次标签,用于标识一批数据。它根据配置的前缀和随机 UUID 生成标签。
  10. startAsyncFlushing() 方法: 此方法启动一个异步刷新线程。线程会循环调用 asyncFlush() 方法,将数据异步刷新到 Doris 中。
  11. waitAsyncFlushingDone() 方法: 该方法用于等待之前的异步刷新操作完成。它向刷新队列添加空的 WriterTuple,以确保之前的刷新操作完成。然后,它检查是否存在刷新异常。
  12. asyncFlush() 方法: 此方法用于异步刷新数据到 Doris。它从刷新队列中取出 WriterTuple,然后根据批次的标签执行数据刷新操作。如果发生异常,它会尝试多次,直到达到最大重试次数。如果需要重新创建批次标签,则生成新的标签。重试之间会休眠一段时间。成功后,重新启动定时任务。
  13. checkFlushException() 方法: 此方法用于检查是否存在刷新异常,如果存在则抛出异常。

这个 DorisWriterManager 类的目的是管理数据写入到 Doris 数据库的操作。它通过定时任务和异步刷新线程来控制数据的批量写入,同时处理异常情况,确保数据的稳定写入。

添加详细注释代码如下:

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DorisWriterManager {

    private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);

    private final DorisStreamLoadObserver visitor;
    private final Keys options;
    private final List<byte[]> buffer = new ArrayList<>(); // 缓冲区,用于存储待写入 Doris 的数据
    private int batchCount = 0; // 当前批次中的记录数量
    private long batchSize = 0; // 当前批次中的数据大小
    private volatile boolean closed = false; // 标志位,表示是否已关闭
    private volatile Exception flushException; // 异步刷新数据时可能发生的异常
    private final LinkedBlockingDeque<WriterTuple> flushQueue; // 用于异步刷新数据的队列
    private ScheduledExecutorService scheduler; // 用于定期刷新数据的调度器
    private ScheduledFuture<?> scheduledFuture;

    public DorisWriterManager(Keys options) {
        this.options = options;
        this.visitor = new DorisStreamLoadObserver(options);
        flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
        this.startScheduler(); // 启动定期刷新调度器
        this.startAsyncFlushing(); // 启动异步刷新线程
    }

    // 启动定期刷新调度器
    public void startScheduler() {
        stopScheduler(); // 停止之前的调度器
        this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder()
                .namingPattern("Doris-interval-flush").daemon(true).build());
        this.scheduledFuture = this.scheduler.schedule(() -> {
            synchronized (DorisWriterManager.this) {
                if (!closed) {
                    try {
                        String label = createBatchLabel();
                        LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label));
                        if (batchCount == 0) {
                            startScheduler(); // 如果当前批次为空,重新启动定时任务
                        }
                        flush(label, false);
                    } catch (Exception e) {
                        flushException = e;
                    }
                }
            }
        }, options.getFlushInterval(), TimeUnit.MILLISECONDS);
    }

    // 停止定期刷新调度器
    public void stopScheduler() {
        if (this.scheduledFuture != null) {
            scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
    }

    // 写入一条记录到缓冲区
    public final synchronized void writeRecord(String record) throws IOException {
        checkFlushException(); // 检查是否有刷新异常
        try {
            byte[] bts = record.getBytes(StandardCharsets.UTF_8);
            buffer.add(bts);
            batchCount++;
            batchSize += bts.length;
            if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
                String label = createBatchLabel();
                LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
                flush(label, false); // 当记录数量或数据大小超过阈值时触发刷新
            }
        } catch (Exception e) {
            throw new IOException("Writing records to Doris failed.", e);
        }
    }

    // 手动触发刷新缓冲区的数据
    public synchronized void flush(String label, boolean waitUntilDone) throws Exception {
        checkFlushException(); // 检查是否有刷新异常
        if (batchCount == 0) {
            if (waitUntilDone) {
                waitAsyncFlushingDone(); // 如果当前批次为空,等待之前的刷新操作完成
            }
            return;
        }
        flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer))); // 将数据放入刷新队列
        if (waitUntilDone) {
            waitAsyncFlushingDone(); // 等待刷新操作完成
        }
        buffer.clear();
        batchCount = 0;
        batchSize = 0;
    }

    // 关闭 DorisWriterManager,触发最后一次刷新操作
    public synchronized void close() {
        if (!closed) {
            closed = true;
            try {
                String label = createBatchLabel();
                if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));
                flush(label, true); // 关闭时触发刷新操作
            } catch (Exception e) {
                throw new RuntimeException("Writing records to Doris failed.", e);
            }
        }
        checkFlushException();
    }

    // 创建批次标签,通常用于标识一批数据
    public String createBatchLabel() {
        StringBuilder sb = new StringBuilder();
        if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {
            sb.append(options.getLabelPrefix());
        }
        return sb.append(UUID.randomUUID().toString()).toString();
    }

    // 启动异步刷新线程
    private void startAsyncFlushing() {
        Thread flushThread = new Thread(new Runnable() {
            public void run() {
                while (true) {
                    try {
                        asyncFlush(); // 异步刷新数据
                    } catch (Exception e) {
                        flushException = e;
                    }
                }
            }
        });
        flushThread.setDaemon(true);
        flushThread.start();
    }

    // 等待之前的刷新操作完成
    private void waitAsyncFlushingDone() throws InterruptedException {
        for (int i = 0; i <= options.getFlushQueueLength(); i++) {
            flushQueue.put(new WriterTuple("", 0L, null));
        }
        checkFlushException();
    }

    // 异步刷新数据到 Doris
    private void asyncFlush() throws Exception {
        WriterTuple flushData = flushQueue.take();
        if (Strings.isNullOrEmpty(flushData.getLabel())) {
            return;
        }
        stopScheduler(); // 停止定时任务
        LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
        for (int i = 0; i <= options.getMaxRetries(); i++) {
            try {
                // 利用 DorisStreamLoadObserver 进行数据刷新
                visitor.streamLoad(flushData);
                LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
                startScheduler(); // 
     break;
            } catch (Exception e) {
                LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);
                if (i >= options.getMaxRetries()) {
                    throw new IOException(e);
                }
                if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {
                    String newLabel = createBatchLabel();
                    LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
                    flushData.setLabel(newLabel);
                }
                try {
                    Thread.sleep(1000l * Math.min(i + 1, 10));
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush, interrupted while doing another attempt", e);
                }
            }
        }
    }

    private void checkFlushException() {
        if (flushException != null) {
            throw new RuntimeException("Writing records to Doris failed.", flushException);
        }
    }
}
作者 east

上一 1 … 11 12 13 … 42 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (494)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (36)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.