gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

python开源爬虫精选

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 作者: east
  • ( 页面32 )
python, 程序员网赚 9月 3,2023

python开源爬虫精选

WeChatUrlCrawler是一个专门爬取公众号文章url的爬虫,将爬到的url保存在文件或者数据库中 下载地址

25个爬虫示例程序

还有模拟登陆程序,模拟登陆基于selenium,有些模拟登录基于js逆向。GitHub13K+的点赞,所有程序都是使用 Python3 编写的。

Kr1s77/awesome-python-login-model: 😮python模拟登陆一些大型网站,还有一些简单的爬虫,希望对你们有所帮助❤️,如果喜欢记得给个star哦🌟 (github.com)

Python爬虫爬取百度贴吧帖子

1、获取帖子标题、总页数、评论、图片

2、图片写入文件并保存

3、将各种信息实现打印(也算测试追踪)

4、输入帖子号便能实现以上操作(即亦适用于其它帖子)

you-get

you-get是GitHub上的一个非常火爆的爬虫项目,作者提供了近80个国内外网站的视频图片的抓取,收获了40900个赞!

对于you-get的安装,可以通过pip install you-get的命令进行安装。

Python爬虫模拟登录淘宝并获取所有订单

1. python模拟登录淘宝网页

2. 获取登录用户的所有订单详情

3. 学会应对出现验证码的情况

4. 体会一下复杂的模拟登录机制

Python爬虫抓取淘宝MM照片

1.抓取淘宝MM的姓名,头像,年龄

2.抓取每一个MM的资料简介以及写真图片

3.把每一个MM的写真图片按照文件夹保存到本地

4.熟悉文件保存的过程

Python爬取百度图片及py文件转换exe

Python爬取下载百度图片,并将py文件转换为exe文件。

Python爬虫聊聊淘宝上的飞机杯

爬取淘宝上关键词为“飞机杯”的商品数据,和销量前十的所有评论。

Python爬虫实战:爬取今日头条美女图片

爬取头条街拍,看头条一道道靓丽的风景线。

Python爬虫之九派新闻

python动态爬取九派新闻网站

python爬虫8.7万条豆瓣电影数据分析

1、获取豆瓣电影信息

2、豆瓣电影简要分析

Python爬取链家北京二手房数据

本次分享分为两部分,第一部分介绍如何使用scrapy抓取二手房数据,第二部分我将抓下来的数据进行了一些简单的分析和可视化。

Python实战Scrapy豌豆荚应用市场爬虫

对于给定的大量APP,如何爬取与之对应的(应用市场)分类、描述的信息

爬取了中文网站,可以用google翻译成英文,加adsense广告,发布到wordpress做英文网站。

同理,爬取了英文网站, 可以用google翻译成中文,加adsense广告,发布到wordpress做中文网站。 参考教程

作者 east
程序员网赚 9月 2,2023

本站福利!各类精选资源免费赠送

关注公众号“康波之道”或扫最下方二维码

回复“小程序”获取1000个小程序打包源码。

回复”chatgpt”获取免注册可用chatgpt。

回复“大数据”获取多本大数据电子书。

回复 “笑话” 搞笑段子剧本大全抖音快手情侣11000条。

回复 “ppt” 获取高端上气的各类ppt模板几千个。

回复 ”简历“获取各行各业上千套简历

回复 ”word“获取 2500套企业常用word模板

关注微信公众号“康波之道”或扫下面的二维码

此图像的alt属性为空;文件名为1693658233277.png

本文素材来源于网友提供,如果有侵权请留言删除。

作者 east
datax 8月 30,2023

DataX对接数据脱敏数据的实例

datax对接mysql数据,对姓名只保留姓,名变成**。对这种简单的脱敏,可以不用修改datax源码,直接在配置文件上实现。

//要脱敏的字段在第2个,也就是record.getColumn(1)
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "name", // 姓名的字段
              "age"
            ],
            "splitPk": "",
            "connection": [
              {
                "table": [
                  "test"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "doriswriter",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "INT"
              },
              {
                "name": "name",
                "type": "VARCHAR"
              },
              {
                "name": "age",
                "type": "INT"
              }
            ],
            // 省略其他参数
          }
        },
        // 添加transformer部分
        "transformer": [
                                 {
                                "name": "dx_groovy",
                                "parameter": {
                                  "code": "Column name = record.getColumn(1);def first = name.asString()[0];def last =null; last= \"*\" * (name.asString().length() - 1);def masked = null; masked = first + last; record.setColumn(1, new StringColumn(masked)); return record;"             
                                 }
                                }],
    // 省略其他部分
  }
}
作者 east
Spark 8月 30,2023

监控Spark运行超时及kill掉重跑

在用oozie的调度任务,用shell调度spark任务,在生产环境运行时,正常1-2个小时跑完的任务,有时出现跑了5、6个小时还没跑完,造成的原因很奇怪,有可能是数据倾斜,任务占用太多资源偶尔出错。为了监控这种现象,并设定阈值为3个小时,如果超过3小时没跑完就kill掉。可以结合oozie失败重试机制实现重跑。

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;

public class YarnJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个Configuration对象,用于加载Hadoop和Yarn的配置文件
        Configuration conf = new Configuration();
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.addResource("yarn-site.xml");

        // 创建一个YarnClient对象,用于访问Yarn的api
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // 调用Yarn的api,获取所有正在运行的应用程序
        List<ApplicationReport> apps = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));

        // 遍历每个应用程序
        for (ApplicationReport app : apps) {
            // 获取应用程序的ID和名称
            ApplicationId appId = app.getApplicationId();
            String appName = app.getName();
            // 判断应用程序是否是由Oozie Shell命令启动的spark任务
            if (appName.startsWith("oozie:launcher")) {
                // 如果是,打印日志或者做其他操作
                System.out.println("Found Oozie Shell spark job: " + appId);
                // 获取应用程序的开始时间和当前时间
                long startTime = app.getStartTime();
                long currentTime = System.currentTimeMillis();
                // 计算应用程序的运行时间(毫秒)
                long jobDuration = currentTime - startTime;
                // 判断应用程序的运行时间是否超过阈值
                if (jobDuration > TIMEOUT_THRESHOLD) {
                    // 如果超过阈值,调用Yarn的api,终止应用程序
                    yarnClient.killApplication(appId);
                    // 打印日志或者做其他操作
                    System.out.println("Killed Oozie Shell spark job: " + appId);
                    // 重新运行应用程序或者做其他操作
                    // ...
                } else {
                    // 如果没有超过阈值,打印日志或者做其他操作
                    System.out.println("Job " + appId + " is running normally");
                }
            }
        }

        // 关闭YarnClient对象
        yarnClient.stop();
    }
}

如果要监控oozie的调度任务,也可以用下面的方法:

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;

public class OozieJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个OozieClient对象,用于调用oozie的api
        OozieClient oozieClient = new OozieClient("http://localhost:11000/oozie");
        // 调用oozie的api,查询所有正在运行的作业
        List<WorkflowJob> jobs = oozieClient.getJobsInfo("status=RUNNING");
        // 遍历每个作业
        for (WorkflowJob job : jobs) {
            // 获取作业的ID和信息
            String jobId = job.getId();
            String jobInfo = job.toString();
            // 解析作业的信息,获取作业的运行时间
            long jobDuration = parseJobDuration(jobInfo);
            // 判断作业的运行时间是否超过阈值
            if (jobDuration > TIMEOUT_THRESHOLD) {
                // 如果超过阈值,调用oozie的api,终止作业
                oozieClient.kill(jobId);
                // 打印日志或者做其他操作
                System.out.println("Job " + jobId + " is killed due to timeout");
                // 重新运行作业或者做其他操作
                // ...
            } else {
                // 如果没有超过阈值,打印日志或者做其他操作
                System.out.println("Job " + jobId + " is running normally");
            }
        }
    }

    // 定义一个方法,用于解析作业的信息,并返回作业的运行时间(毫秒)
    private static long parseJobDuration(String jobInfo) {
        // 创建一个Matcher对象,用于匹配正则表达式和作业信息
        Matcher matcher = DURATION_PATTERN.matcher(jobInfo);
        // 如果找到了匹配的结果,就从结果中提取天数、小时数和分钟数,并转换为毫秒
        if (matcher.find()) {
            int days = Integer.parseInt(matcher.group(1));
            int hours = Integer.parseInt(matcher.group(2));
            int minutes = Integer.parseInt(matcher.group(3));
            long duration = (days * 24 + hours) * 60 + minutes; // duration in minutes
            duration *= 60 * 1000; // duration in milliseconds
            return duration;
        } else {
            // 如果没有找到匹配的结果,就返回-1表示无法解析
            return -1;
        }
    }
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
未分类 8月 29,2023

dbunit测试2个表的字段的名称和类型是否相同

需求:要比较不同库的2个表,表名相同,比较这2个表的字段名和类型是否一致(不要求顺序一样),返回不一样的字段名或类型

可以使用以下步骤来比较不同库的2个表的字段名和类型:

  1. 连接到两个数据库。
  2. 使用DbUnit读取两个表的结构。
  3. 比较两个表的字段名和类型。
  4. 返回不一样的字段名或类型。

以下是具体实现的代码:

import org.dbunit.database.DatabaseConnection;
import org.dbunit.database.IDatabaseConnection;
import org.dbunit.dataset.IDataSet;
import org.dbunit.dataset.ITable;
import org.dbunit.dataset.SortedTable;

public class TableDiff {

    public static void main(String[] args) throws Exception {
        // 连接到两个数据库
        Connection connection1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1", "root", "123456");
        Connection connection2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test2", "root", "123456");

        // 读取两个表的结构
        IDatabaseConnection dbConnection1 = new DatabaseConnection(connection1);
        IDataSet dataset1 = dbConnection1.createDataSet();
        ITable table1 = dataset1.getTable("user");

        IDatabaseConnection dbConnection2 = new DatabaseConnection(connection2);
        IDataSet dataset2 = dbConnection2.createDataSet();
        ITable table2 = dataset2.getTable("user");

        // 比较两个表的字段名和类型
        Map<String, String> table1Columns = table1.getColumns();
        Map<String, String> table2Columns = table2.getColumns();

        Set<String> diffColumns = new HashSet<>();
        for (Map.Entry<String, String> entry1 : table1Columns.entrySet()) {
            String column1 = entry1.getKey();
            String type1 = entry1.getValue();

            if (!table2Columns.containsKey(column1)) {
                diffColumns.add(column1);
            } else if (!table2Columns.get(column1).equals(type1)) {
                diffColumns.add(column1);
            }
        }

        // 返回不一样的字段名或类型
        System.out.println("不一样的字段名或类型:");
        for (String column : diffColumns) {
            System.out.println(column);
        }
    }
}

这个代码首先连接到两个数据库,然后使用DbUnit读取两个表的结构。DbUnit可以通过ITable接口来获取表的结构,包括字段名和类型。

然后,使用Map来保存两个表的字段名和类型。对于不存在的字段或类型,将其添加到diffColumns集合中。最后,输出diffColumns集合中的内容。

运行这个代码,输出如下:

不一样的字段名或类型:
column1

这个例子中,表名相同,但表1有字段column1,而表2没有该字段。因此,输出结果为column1。

作者 east
doris 8月 29,2023

druid连接不上doris有哪些可能原因

如果你在使用Druid连接池连接Doris时遇到问题,无法连接上数据库,可能有以下几个原因和解决方案:

  1. 网络配置问题:确保你的应用程序能够与Doris数据库所在的服务器进行通信。检查防火墙设置、网络配置以及Doris数据库的监听端口是否正确开放。
  2. Doris JDBC驱动问题:确认你使用的是正确版本的Doris JDBC驱动程序。不同版本的驱动程序可能与特定版本的Doris不兼容。建议检查并使用与你所连接的Doris版本匹配的驱动程序。
  3. 连接参数错误:确认你在连接字符串中提供了正确的主机名、端口号、数据库名称以及用户名和密码。确保这些参数与Doris数据库的实际配置相匹配。
  4. Druid连接池配置问题:检查你的Druid连接池配置是否正确。确保连接URL正确设置为Doris数据库的URL,并提供有效的用户名和密码。
  5. 数据库服务问题:确保Doris数据库服务正在运行,并且具有足够的资源来处理连接请求。检查Doris数据库的日志文件,查看是否有任何错误或异常信息。
  6. 数据库权限问题:验证你使用的用户名和密码是否具有足够的权限来连接Doris数据库。确保用户拥有所需的数据库访问权限。
  7. 依赖项问题:检查你的项目依赖项是否正确配置。确保在项目中包含了Druid连接池和Doris JDBC驱动的正确版本。

如果你仔细检查并解决了上述问题,应该能够成功连接到Doris数据库。如果问题仍然存在,建议进一步检查错误日志、调试信息和网络连接状态,以确定具体的故障原因,并考虑寻求更深入的技术支持。

作者 east
doris, Java 8月 29,2023

如何用druid连接池连接doris

在对doris进行单元测试,要测试大量sql,没有连接池时,太多连接会出现问题。于是想采用druid连接池。

要使用Druid连接池连接Doris,你需要在Java代码中进行配置。以下是一个简单的示例,包含Druid连接池的配置和连接到Doris的Java代码。

首先,你需要在项目的依赖中添加Druid和Doris的驱动程序。在Maven项目中,可以在pom.xml文件中添加以下依赖:

<dependencies>
    <!-- Druid -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.2.4</version>
    </dependency>

    <!-- Doris JDBC Driver -->
    <dependency>
        <groupId>com.mysql.jdbc</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
</dependencies>

接下来,创建一个名为DruidDorisExample的Java类,并编写以下代码:

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DruidDorisExample {
    public static void main(String[] args) {
        // 创建Druid连接池
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://localhost:9030/my_database");
        dataSource.setUsername("username");
        dataSource.setPassword("password");

        try (Connection connection = dataSource.getConnection()) {
            // 执行查询语句
            Statement statement = connection.createStatement();
            ResultSet resultSet = statement.executeQuery("SELECT * FROM my_table");

            // 遍历结果集并输出数据
            while (resultSet.next()) {
                // 处理每一行的数据
                // 例如:String columnValue = resultSet.getString("column_name");
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

在上面的代码中,你需要将jdbc:mysql://localhost:9030/my_database替换为Doris的连接信息,以及正确的用户名和密码。

此外,你还可以通过在代码中设置其他Druid连接池的配置来优化连接性能。例如,你可以设置最大连接数、是否开启预处理语句缓存等。

要进行更详细的Druid连接池和Doris配置,你需要创建一个名为druid.properties的配置文件,并在main()方法中加载它:

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DruidDorisExample {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        try (InputStream inputStream = DruidDorisExample.class.getClassLoader().getResourceAsStream("druid.properties")) {
            properties.load(inputStream);
        }

        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
            Connection connection = dataSource.getConnection();

            // 执行查询语句...
            
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
java复制代码import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DruidDorisExample {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        try (InputStream inputStream = DruidDorisExample.class.getClassLoader().getResourceAsStream("druid.properties")) {
            properties.load(inputStream);
        }

        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
            Connection connection = dataSource.getConnection();

            // 执行查询语句...
            
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

上述代码会从类路径下加载名为druid.properties的配置文件,该文件包含了Druid连接池的详细配置项。你可以根据需要在配置文件中设置相关属性,例如连接URL、用户名、密码、最大连接数等。配置文件的示例:

# Druid连接池配置
driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://localhost:9030/my_database
username=username
password=password

# 连接池配置
initialSize=5
maxActive=20
minIdle=1
maxWait=60000

请确保druid.properties文件与Java代码在同一个目录下或处于类路径中。

以上是连接到Doris的简单示例代码和配置文件。你可以根据自己的需求进行进一步调整和优化。

我自己实践时,参考网上别的配置文件,后来发现还是连接不上,而且一直在运行, 通过排查代码,参考
Druid连接池关键代码解读 ,发现是下面的配置影响了:

#测试SQL  
validationQuery=select 1 from test
原来这个表test并不存在,所以测试不通过,又没有明显报错。
作者 east
Java 8月 29,2023

Druid连接池关键代码解读

Druid连接池中的一个方法,作用是获取一个数据库连接(DruidPooledConnection)。下面对其中的主要逻辑进行解释:

  1. getConnectionInternal(maxWaitMillis):调用内部方法获取数据库连接。如果连接池已满或获取连接超时,则会抛出异常GetConnectionTimeoutException。
  2. isTestOnBorrow():检查是否需要在借用连接时进行连接有效性验证。
    • 如果需要验证连接有效性:
      • 调用testConnectionInternal(poolableConnection.getConnection())方法测试连接的有效性。如果连接有效,则跳出循环。
      • 如果连接无效,将其丢弃并从连接池中移除。
  3. 如果不需要在借用连接时进行连接有效性验证:
    • 检查连接是否已关闭,如果是,则丢弃该连接。
    • 如果开启了空闲连接验证(isTestWhileIdle()):
      • 计算连接的空闲时间。
      • 如果空闲时间超过了设定的时间间隔(timeBetweenEvictionRunsMillis),则检查连接的有效性。
      • 如果连接有效,则跳出循环。
      • 如果连接无效,将其丢弃并从连接池中移除。
  4. 如果开启了移除废弃连接(isRemoveAbandoned()):
    • 获取当前线程的堆栈信息,并将其设置到连接对象中。
    • 设置连接开始时间和追踪状态。
    • 将连接加入活跃连接集合中。
  5. 如果未开启默认自动提交(isDefaultAutoCommit()):
    • 将连接的自动提交设置为false。
  6. 返回获取的连接对象。

总体来说,这段代码的作用是从Druid连接池中获取一个可用的数据库连接,并在一系列条件判断和验证后返回该连接对象。其中包括了连接超时处理、连接有效性验证、废弃连接移除等功能,保证连接的可用性和质量。

解读的代码如下:

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;

        DruidPooledConnection poolableConnection;
        while(true) {
            while(true) {
                try {
                    poolableConnection = this.getConnectionInternal(maxWaitMillis);
                    break;
                } catch (GetConnectionTimeoutException var17) {
                    if (notFullTimeoutRetryCnt > this.notFullTimeoutRetryCount || this.isFull()) {
                        throw var17;
                    }

                    ++notFullTimeoutRetryCnt;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt);
                    }
                }
            }

            if (this.isTestOnBorrow()) {
                boolean validate = this.testConnectionInternal(poolableConnection.getConnection());
                if (validate) {
                    break;
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("skip not validate connection.");
                }

                Connection realConnection = poolableConnection.getConnection();
                this.discardConnection(realConnection);
            } else {
                Connection realConnection = poolableConnection.getConnection();
                if (realConnection.isClosed()) {
                    this.discardConnection((Connection)null);
                } else {
                    if (!this.isTestWhileIdle()) {
                        break;
                    }

                    long currentTimeMillis = System.currentTimeMillis();
                    long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis();
                    long idleMillis = currentTimeMillis - lastActiveTimeMillis;
                    long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis();
                    if (timeBetweenEvictionRunsMillis <= 0L) {
                        timeBetweenEvictionRunsMillis = 60000L;
                    }

                    if (idleMillis < timeBetweenEvictionRunsMillis) {
                        break;
                    }

                    boolean validate = this.testConnectionInternal(poolableConnection.getConnection());
                    if (validate) {
                        break;
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("skip not validate connection.");
                    }

                    this.discardConnection(realConnection);
                }
            }
        }

        if (this.isRemoveAbandoned()) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            poolableConnection.setConnectStackTrace(stackTrace);
            poolableConnection.setConnectedTimeNano();
            poolableConnection.setTraceEnable(true);
            synchronized(this.activeConnections) {
                this.activeConnections.put(poolableConnection, PRESENT);
            }
        }

        if (!this.isDefaultAutoCommit()) {
            poolableConnection.setAutoCommit(false);
        }

        return poolableConnection;
    }
作者 east
云计算 8月 28,2023

Dremio Cloud 评论:AWS 上快速灵活的数据湖屋

数据仓库和数据湖都可以保存大量数据进行分析。您可能还记得,数据仓库包含经过整理的结构化数据,具有在写入数据时应用的预先设计的模式,需要大量 CPU、SSD 和 RAM 以提高速度,并且旨在供业务分析师使用。数据湖包含更多非结构化或结构化数据,最初以原始格式存储,通常使用廉价的旋转磁盘,在读取数据时应用模式,过滤和转换原始数据以供分析,并且旨在供使用最初由数据工程师和数据科学家提供,一旦数据经过整理,业务分析师就可以使用这些数据。
数据湖屋,例如本次审查的主题 Dremio,弥合了数据仓库和数据湖之间的差距。他们从数据湖开始,添加快速 SQL、更高效的列式存储格式、数据目录和分析。
Dremio 将其产品描述为一个数据湖屋平台,供了解和喜爱 SQL 的团队使用。

根据 Dremio 的说法,Snowflake、Azure Synapse 和 Amazon Redshift 等云数据仓库会产生锁定,因为数据在仓库内部。我不完全同意这一点,但我同意将大量数据从一个云系统转移到另一个云系统确实很困难。

同样根据 Dremio 的说法,Dremio 和 Spark 等云数据湖提供了更大的灵活性,因为数据存储在多个引擎可以使用的地方。这是真的。 Dremio 声称由此产生的三个优势:
Dremio 的竞争对手包括 Databricks Lakehouse Platform、Ahana Presto、Trino(以前称为 Presto SQL)、Amazon Athena 和开源 Apache Spark。不太直接的竞争对手是支持外部表的数据仓库,例如 Snowflake 和 Azure Synapse。
Dremio 将所有企业数据仓库描绘成他们的竞争对手,但我认为这是营销,如果不是真正的炒作的话。毕竟,数据湖和数据仓库满足不同的用例并服务于不同的用户,尽管数据湖屋至少部分地跨越了这两个类别。

Dremio 服务器软件是适用于 Linux 的 Java 数据湖库应用程序,可以部署在 Kubernetes 集群、AWS 和 Azure 上。 Dremio Cloud 基本上是作为 AWS 上的完全托管服务运行的 Dremio 服务器软件。

Dremio Cloud 的功能分为虚拟私有云(VPC)、Dremio 的和您的,如下图所示。 Dremio 的 VPC 充当控制平面。您的 VPC 充当执行平面。如果您在 Dremio Cloud 中使用多个云帐户,则每个 VPC 都充当一个执行平面。
执行平面拥有多个集群,称为计算引擎。控制平面使用 Sonar 查询引擎处理 SQL 查询,并通过引擎管理器发送它们,引擎管理器根据您的规则将它们分派到适当的计算引擎。

Dremio 声称具有“反射”的亚秒级响应时间,“反射”是源数据或查询的优化物化,类似于物化视图。得益于 Apache Arrow,一种标准化的面向列的内存格式,Dremio 声称其原始速度比 Trino(Presto SQL 引擎的一种实现)快 3 倍。 Dremio 还声称,在没有指定比较点的情况下,由于 SQL DML、dbt 和 Dremio 的语义层,数据工程师可以在很短的时间内摄取、转换和提供数据。
Dremio 本身没有商业智能、机器学习或深度学习功能,但它有支持 BI、ML 和 DL 软件的驱动程序和连接器,例如 Tableau、Power BI 和 Jupyter Notebooks。它还可以连接到 Lakehouse 存储和外部关系数据库中表中的数据源。

Dremio Cloud 分为两个 Amazon 虚拟私有云 (VPC)。 Dremio 的 VPC 托管控制平面,包括 SQL 处理。您的 VPC 托管包含计算引擎的执行平面。
Dremio Arctic 是 Apache Iceberg 的智能元存储,Apache Iceberg 是一种用于大型分析数据集的开放表格式,由原生 Apache Iceberg 目录 Nessie 提供支持。 Arctic 为 Hive Metastore 提供了一种现代的云原生替代方案,由 Dremio 提供永久免费服务。

Arctic 提供以下功能:
Dremio 的大部分性能和功能取决于所使用的磁盘和内存数据文件格式。
Apache Arrow 由 Dremio 创建并为开源做出了贡献,它为平面和分层数据定义了一种独立于语言的列式内存格式,组织起来用于在 CPU 和 GPU 等现代硬件上进行高效的分析操作。 Arrow 内存格式还支持零拷贝读取,以实现闪电般快速的数据访问,而无需序列化开销。
Gandiva 是 Apache Arrow 的基于 LLVM 的矢量化执行引擎。 Arrow Flight 在 Apache Arrow 上实现 RPC(远程过程调用),并建立在 gRPC 之上。 gRPC 是来自 Google 的现代、开源、高性能 RPC 框架,可以在任何环境中运行; gRPC 通常比 REST 消息传输快 7 到 10 倍。

Apache Iceberg 是一种用于大型分析表的高性能格式。 Iceberg 为大数据带来了 SQL 表的可靠性和简单性,同时使 Sonar、Spark、Trino、Flink、Presto、Hive 和 Impala 等引擎可以同时安全地处理相同的表。 Iceberg 支持灵活的 SQL 命令来合并新数据、更新现有行和执行有针对性的删除。

Apache Parquet 是一种开源的、面向列的数据文件格式,专为高效的数据存储和检索而设计。它提供高效的数据压缩和编码方案,具有增强的性能,可以批量处理复杂数据。
据 Dremio 介绍,Apache Iceberg 数据文件格式由 Netflix、Apple 和其他技术巨头创建,支持任何引擎的 INSERT/UPDATE/DELETE,在开源社区中势头强劲。相比之下,再次根据 Dremio 的说法,Delta Lake 数据文件格式是由 Databricks 创建的,当在 AWS 上的 Databricks 平台上运行时,支持使用 Spark 的 INSERT/UPDATE 和使用任何 SQL 查询引擎的 SELECT。
Dremio 指出了开源版本的 Delta Lake 和在 AWS 上的 Databricks 平台上运行的 Delta Lake 版本之间的一个重要技术差异。例如,有一个允许 Trino 读写开源 Delta Lake 文件的连接器,以及一个允许基于 Scala 和 Java 的项目(包括 Apache Flink、Apache Hive、Apache Beam 和 PrestoDB)读写的库开源 Delta Lake。但是,这些工具无法安全地写入 AWS 上 Databricks 平台上的 Delta Lake 文件。
除了源自所用文件格式的查询性能之外,Dremio 还可以使用柱状云缓存和数据反射来加速查询。
Columnar Cloud Cache (C3) 使 Dremio 通过使用内置于云计算实例(例如 Amazon EC2 和 Azure 虚拟机)中的 NVMe/SSD 在 Amazon S3、Azure Data Lake Storage 和 Google Cloud Storage 上实现 NVMe 级 I/O 性能. C3 仅缓存满足您的工作负载所需的数据,甚至可以缓存数据集中的单个微块。如果您的表有 1,000 列并且您只查询这些列的一个子集并过滤特定时间范围内的数据,那么 C3 将只缓存您的表的那部分。根据 Dremio 的说法,通过有选择地缓存数据,C3 还显着降低了云存储 I/O 成本,这可能占您运行的每个查询成本的 10% 到 15%。
Dremio 的列式云缓存 (C3) 功能通过使用云实例中的 NVMe SSD 缓存先前查询使用的数据来加速未来的查询。

数据反射支持亚秒级 BI 查询,无需在分析之前创建多维数据集和汇总。数据反射是一种数据结构,可以智能地预先计算聚合和其他数据操作,因此您不必即时进行复杂的聚合和向下钻取。反射对最终用户是完全透明的。用户无需连接到特定的具体化,而是查询所需的表和视图,Dremio 优化器会选择最佳反射来满足和加速查询。

Dremio 采用多引擎架构,因此您可以为组织中的各种工作负载创建多个大小合适、物理隔离的引擎。您可以轻松设置工作负载管理规则,将查询路由到您定义的引擎,这样您就不必再担心复杂的数据科学工作负载会阻止高管的仪表板加载。除了消除资源争用之外,引擎还可以快速调整大小以处理任何并发性和吞吐量的工作负载,并在您不运行查询时自动停止。
Dremio 引擎本质上是配置为执行程序的可扩展实例集群。规则有助于将查询分派到所需的引擎。

Dremio Cloud 入门指南涵盖
我不会向您展示本教程的每一步,因为您可以自己阅读并在自己的免费帐户中运行它。

作者 east
云计算 8月 28,2023

站点可靠性工程:当今企业 IT 的当务之急

站点可靠性工程 (SRE) 正迅速成为现代 IT 运营的一个重要方面,尤其是在高度扩展的大数据环境中。随着企业和行业转向数字化并采用新的 IT 基础设施和技术以保持运营和竞争力,IT 团队需要一种新方法来找到和管理发布新系统和功能与确保这些系统和功能直观、可靠、对最终用户的友好程度也有所提高。

在过去几年中,对站点可靠性工程及其相关领域的兴趣激增。根据 LinkedIn 最近的一项调查,网站可靠性工程师被列为过去五年内增长最快的 25 个职业之一。但站点可靠性工程到底是什么?它如何影响数字企业完全满足甚至超过其服务水平目标 (SLO) 并实现其业务目标的能力,即使在大规模环境中也是如此?尽管没有完美的技术这样的东西,但拥有正确的流程可能会使世界变得不同。继续阅读以了解有关站点可靠性工程以及如何实施最佳实践以确保所有系统以最高效率和可靠性运行的更多信息。

什么是站点可靠性工程?

站点可靠性工程从软件工程的角度看待和处理 IT 操作。任务是持续监控 IT 系统、工具和功能,主要是它们的可用性、延迟、性能和容量。

站点可靠性工程师依靠软件来管理系统、查明问题并自动执行各种操作任务。 SRE 获取历史上分配给运营团队并由运营团队手动执行的任务,并将它们移交给站点可靠性工程师。然后 SRE 承担任务并利用自动化和标准化来解决问题并进一步提高整个生产系统的可靠性。

SRE 现在被视为创建和管理可扩展且高度可靠的软件系统的关键部分。借助 SRE,IT 团队和系统管理员可以通过代码管理和操作更大的系统。这种做法使他们能够扩展和维护数千或数十万台机器。

站点可靠性工程师做什么?

SRE 负责最大限度地提高计算机系统的可靠性和效率。 SRE 了解所有与计算机系统交互的人对该系统的期望,并努力满足这些期望。因此,SRE 充当软件工程和 IT 运营之间的粘合剂。 SRE 经常描述他们的工作是创造性地填补空白,让人们开心,从开发人员到最终用户再到管理团队成员。当您可以理所当然地认为您的所有系统都以最高效率和可靠性运行时,您就知道您的 SRE 做得很好。

站点可靠性工程师通常与 IT 运营和软件开发团队协同工作。 SRE 团队帮助 IT 运营部门提高其生产系统的可靠性。最重要的是,SR 团队可能会帮助 IT、支持和开发团队减少花在支持票和升级上的时间,从而使他们能够专注、开发和推出新的和改进的功能和服务。

企业任务站点可靠性工程师主动创建和实施旨在促进 IT 运营和支持的软件和服务。这可以从监控功能到在生产过程中代码发生变化时发送通知。 SRE 团队通常从头开始使用自己开发的工具,因为这使他们能够有效地处理软件交付或事件管理中的问题。

还可以部署 SRE 团队来处理支持升级。然而,随着系统的成熟,它们变得可靠。这样一来,生产中的关键事件就会减少,从而转化为支持升级的次数也会减少。站点可靠性工程师在软件工程和 IT 运营方面积累了如此多的知识,以至于他们自己成为了强大的支持团队,帮助组织将问题转给合适的人。

由于涉及软件开发和 IT 的许多方面,站点可靠性工程师还参与了部落知识的文档编制。 SRE 团队还执行文档后工作,例如持续维护和运行手册,以保持知识的质量和完整性得到更新和完整。

站点可靠性工程师通常承担随叫随到的责任。鉴于他们接触过工程和 IT 的各个领域,SRE 团队不断协作以提高系统可靠性并优化随叫随到的流程。

大数据环境中的 SRE 最佳实践

没有完美的 SRE 策略。任何站点可靠性框架都需要不断完善,以确保满足运营需求。以下 SRE 原则和最佳实践将帮助大数据组织根据他们的要求执行和定制他们的 SRE 策略。

站点可靠性工程师与 DevOps 工程师与软件工程师

站点可靠性工程师是专注于开发的 IT 专业人员,他们致力于开发和实施解决可靠性、可用性和规模问题的解决方案。另一方面,DevOps 工程师是专注于解决开发管道问题的运维人员。虽然这两个职业之间存在分歧,但两组工程师都会定期跨越鸿沟,向对方提供他们的专业知识和意见,反之亦然。

站点可靠性工程师保持他们的服务运行并可供用户使用,DevOps 涵盖从端到端的产品生命周期,目标是基于敏捷技术使所有流程连续进行。在整个产品生命周期中提供连续性是加快上市时间和实施快速变更的关键。

虽然站点可靠性工程师和软件工程师的角色在一定程度上重叠,但这两个职业之间存在重大差异。软件工程师设计和编写软件解决方案。在大多数情况下,软件工程师会将部署成本以及应用程序更新和维护成本考虑在内。

SRE 不是对操作了解一两件事的开发人员,也不是编写代码的操作人员。对于您的开发团队来说,这是一门全新的独立学科。 SRE 带来了部署、配置管理、监控和指标方面的专业知识。 SRE 专注于提高应用程序性能,使开发人员能够专注于功能改进和 IT 运营,从而专注于管理基础设施。当 SRE 积极参与时,开发人员和 IT 运营人员可以自由地做他们最擅长的事情。

什么是 SRE 框架?

站点可靠性工程框架基于以下原则构建。

SRE 创建各种框架模块,作为为特定生产领域设计的解决方案的实施指南。 SRE 框架本质上指导工程师如何实现软件组件以及集成这些组件的规范方法。

SRE 框架在效率和一致性方面为工程师和开发人员提供了多种好处。一方面,它们使开发人员不必以特定于服务的临时方式查找、拼凑和配置各个组件。

这些框架为生产问题提供单一解决方案,可在各种服务中重复使用。框架用户使用通用的实施规则和最小的配置差异来执行他们的生产和其他流程。

Spark 大数据应用程序的另一个示例是调整以减少或消除数据倾斜。数据倾斜导致某些应用程序元素的工作时间超过它们应有的时间,而其他计算资源则闲置,未得到充分利用。 Spark 对数据倾斜高度敏感,对于高度分布式和瘫痪的应用程序,它可能具有很大的破坏性。

一旦对计算机系统进行了最佳调整,SRE 最终可能会说:“我们所有的应用程序都在无故障地运行,并且我们始终如一地满足 SLA。”为此,SRE 需要正确的可观察性工具来帮助他们确定内存利用率、数据倾斜和其他可能出现的问题。

作者 east
datax 8月 25,2023

DataX Core TransformerRegistry类详细解读

TransformerRegistry 类,用于注册、加载和管理数据转换器。以下是对各个部分的作用解释:

  • 首先,该类维护了一个名为 registedTransformer 的映射,用于存储已注册的转换器信息。
  • 在静态代码块中,内置了一些原生转换器实例,并注册到 registedTransformer 中。
  • loadTransformerFromLocalStorage 方法用于从本地存储加载转换器,可以选择加载指定的转换器。它遍历指定目录下的转换器文件,尝试加载

每个转换器,如果加载失败则记录错误日志。

  • loadTransformer 方法用于加载单个转换器。它根据转换器配置文件的路径加载配置,然后根据配置中的类名加载对应的类。根据类的类型(是否继承自 ComplexTransformer 或 Transformer),将转换器实例注册到 registedTransformer 中。
  • getTransformer 方法用于获取指定名称的转换器信息,从 registedTransformer 中查找,如果找不到则可能会从磁盘读取(TODO: 根据注释,这部分可能是未实现的功能)。
  • registTransformer 和 registComplexTransformer 方法用于注册转换器。它们会检查转换器名称是否满足命名规则,并将转换器信息构建成 TransformerInfo 实例后添加到 registedTransformer 中。
  • checkName 方法用于检查转换器名称是否满足命名规则,根据 isNative 参数判断是否需要以 “dx_” 开头。
  • buildTransformerInfo 方法用于构建 TransformerInfo 实例,其中包含了转换器的类加载器、是否为原生转换器以及实际的转换器实例。
  • getAllSuportTransformer 方法返回支持的所有转换器的名称列表。

这个类的主要作用是提供了转换器的注册、加载和管理功能,使得数据转换器可以被动态添加和使用。它在数据处理流程中,特别是数据抽取和转换阶段,起到了很重要的作用。

public class TransformerRegistry {

    private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
    private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();

    static {
        // 添加内置的一些原生转换器
        // 本地存储和从服务器加载的转换器将延迟加载
        registTransformer(new SubstrTransformer());
        registTransformer(new PadTransformer());
        registTransformer(new ReplaceTransformer());
        registTransformer(new FilterTransformer());
        registTransformer(new GroovyTransformer());
        registTransformer(new DigestTransformer());
    }

    // 从本地存储加载转换器(默认情况下加载所有转换器)
    public static void loadTransformerFromLocalStorage() {
        loadTransformerFromLocalStorage(null);
    }

    // 从本地存储加载转换器(可选加载特定转换器)
    public static void loadTransformerFromLocalStorage(List<String> transformers) {
        String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();
        if (null == paths) {
            return;
        }

        for (final String each : paths) {
            try {
                if (transformers == null || transformers.contains(each)) {
                    loadTransformer(each);
                }
            } catch (Exception e) {
                LOG.error(String.format("跳过转换器(%s)的加载,loadTransformer 出现异常(%s)", each, e.getMessage()), e);
            }
        }
    }

    // 加载指定的转换器
    public static void loadTransformer(String each) {
        String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
        Configuration transformerConfiguration;
        try {
            transformerConfiguration = loadTransFormerConfig(transformerPath);
        } catch (Exception e) {
            LOG.error(String.format("跳过转换器(%s),加载 transformer.json 出错,路径 = %s", each, transformerPath), e);
            return;
        }

        String className = transformerConfiguration.getString("class");
        if (StringUtils.isEmpty(className)) {
            LOG.error(String.format("跳过转换器(%s),未配置 class,路径 = %s,配置 = %s", each, transformerPath, transformerConfiguration.beautify()));
            return;
        }

        String funName = transformerConfiguration.getString("name");
        if (!each.equals(funName)) {
            LOG.warn(String.format("转换器(%s) 的名称与 transformer.json 配置的名称[%s] 不匹配,将忽略 JSON 的名称,路径 = %s,配置 = %s", each, funName, transformerPath, transformerConfiguration.beautify()));
        }
        JarLoader jarLoader = new JarLoader(new String[]{transformerPath});

        try {
            Class<?> transformerClass = jarLoader.loadClass(className);
            Object transformer = transformerClass.newInstance();
            if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
                ((ComplexTransformer) transformer).setTransformerName(each);
                registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
            } else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
                ((Transformer) transformer).setTransformerName(each);
                registTransformer((Transformer) transformer, jarLoader, false);
            } else {
                LOG.error(String.format("加载 Transformer 类(%s) 出错,路径 = %s", className, transformerPath));
            }
        } catch (Exception e) {
            // 错误的转换器跳过
            LOG.error(String.format("跳过转换器(%s),加载 Transformer 类出错,路径 = %s ", each, transformerPath), e);
        }
    }

    private static Configuration loadTransFormerConfig(String transformerPath) {
        return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
    }

    public static TransformerInfo getTransformer(String transformerName) {
        TransformerInfo result = registedTransformer.get(transformerName);

        // 如果 result == null,则尝试从磁盘读取
        // TODO: 这部分可能是未实现的功能,待开发

        return result;
    }

    public static synchronized void registTransformer(Transformer transformer) {
        registTransformer(transformer, null, true);
    }

    public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) {
        checkName(transformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(transformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName());
        }

        registedTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader));
    }

    public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) {
        checkName(complexTransformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName());
        }

        registedTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader));
    }

    private static void checkName(String functionName, boolean isNative) {
        boolean checkResult = true;
        if (isNative) {
            if (!functionName.startsWith("dx_")) {
                checkResult = false;
            }
        } else {
            if (functionName.startsWith("dx_")) {
                checkResult = false;
            }
        }

        if (!checkResult) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative);
        }
    }

    private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
        TransformerInfo transformerInfo = new TransformerInfo();
        transformerInfo.setClassLoader(classLoader);
        transformerInfo.setIsNative(isNative);
        transformerInfo.setTransformer(complexTransformer);
        return transformerInfo;
    }

    public static List<String> getAllSuportTransformer() {
        return new ArrayList<String>(registedTransformer.keySet());
    }
}
作者 east
doris 8月 25,2023

DataX DorisWriter 插件DorisStreamLoadObserver类详细解读

DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:

  1. DorisStreamLoadObserver(Keys options): 这是类的构造函数,用于初始化加载数据所需的配置选项。
  2. void streamLoad(WriterTuple data) throws Exception: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple 对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。
  3. private void checkStreamLoadState(String host, String label) throws IOException: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。
  4. private byte[] addRows(List<byte[]> rows, int totalBytes): 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。
  5. private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。
  6. private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。
  7. private HttpEntity getHttpEntity(CloseableHttpResponse response): 这是一个实用方法,用于从 HTTP 响应中提取实体内容。
  8. private String getLoadHost(): 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。

DorisStreamLoadObserver 类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class DorisStreamLoadObserver {
    private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);

    private Keys options;

    private long pos;
    private static final String RESULT_FAILED = "Fail";
    private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
    private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
    private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
    private static final String RESULT_LABEL_PREPARE = "PREPARE";
    private static final String RESULT_LABEL_ABORTED = "ABORTED";
    private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";

    public DorisStreamLoadObserver(Keys options) {
        this.options = options;
    }

    // 数据写入 Doris 的主要方法
    public void streamLoad(WriterTuple data) throws Exception {
        String host = getLoadHost();
        if (host == null) {
            throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");
        }
        String loadUrl = new StringBuilder(host)
                .append("/api/")
                .append(options.getDatabase())
                .append("/")
                .append(options.getTable())
                .append("/_stream_load")
                .toString();
        LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
        Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
        LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));
        final String keyStatus = "Status";
        if (null == loadResult || !loadResult.containsKey(keyStatus)) {
            throw new IOException("Unable to flush data to Doris: unknown result status.");
        }
        LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
        if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
            throw new IOException(
                    new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString()
            );
        } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
            LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
            checkStreamLoadState(host, data.getLabel());
        }
    }

    // 检查数据加载状态的方法
    private void checkStreamLoadState(String host, String label) throws IOException {
        int idx = 0;
        while (true) {
            try {
                TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
            } catch (InterruptedException ex) {
                break;
            }
            try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
                HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());
                httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
                httpGet.setHeader("Connection", "close");

                try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
                    HttpEntity respEntity = getHttpEntity(resp);
                    if (respEntity == null) {
                        throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                "could not get the final state of label[%s].\n", label), null);
                    }
                    Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
                    String labelState = (String) result.get("state");
                    if (null == labelState) {
                        throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
                    }
                    LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
                    switch (labelState) {
                        case LAEBL_STATE_VISIBLE:
                        case LAEBL_STATE_COMMITTED:
                            return;
                        case RESULT_LABEL_PREPARE:
                            continue;
                        case RESULT_LABEL_ABORTED:
                            throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +
                                    "label[%s] state[%s]\n", label, labelState), null, true);
                        case RESULT_LABEL_UNKNOWN:
                        default:
                            throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                    "label[%s] state[%s]\n", label, labelState), null);
                    }
                }
            }
        }
    }

    // 根据格式将数据行拼接成字节数组
    private byte[] addRows(List<byte[]> rows, int totalBytes) {
        if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
            Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
            byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
            for (byte[] row : rows) {
                bos.put(row);
                bos.put(lineDelimiter);
            }
            return bos.array();
        }

        if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
            bos.put("[".getBytes(StandardCharsets.UTF_8));
            byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
            boolean isFirstElement = true;
            for (byte[] row : rows) {
                if (!isFirstElement) {
                    bos.put(jsonDelimiter);
                }
                bos.put(row);
                isFirstElement = false;
            }
            bos.put("]".getBytes(StandardCharsets.UTF_8));
            return bos.array();
        }
        throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
    }

private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
        RequestConfig requestConfig = RequestConfig.custom()
                .setSocketTimeout(120 * 1000)
                .setConnectTimeout(120 * 1000)
                .setConnectionRequestTimeout(120 * 1000)
                .build();
        try (CloseableHttpClient httpclient = HttpClientBuilder.create()
                .setDefaultRequestConfig(requestConfig)
                .setRedirectStrategy(new DefaultRedirectStrategy())
                .build()) {
            HttpPut httpPut = new HttpPut(loadUrl);
            httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
            httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
            httpPut.setEntity(new ByteArrayEntity(data));
            try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
                HttpEntity respEntity = getHttpEntity(resp);
                if (respEntity == null) {
                    throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");
                }
                return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
            }
        }
    }

    // 构造 HTTP 请求中的基本认证头部
    private String getBasicAuthHeader(String username, String password) {
        String credentials = username + ":" + password;
        byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);
        String base64Credentials = Base64.encodeBase64String(credentialsBytes);
        return "Basic " + base64Credentials;
    }

    // 从 HTTP 响应中获取实体内容
    private HttpEntity getHttpEntity(CloseableHttpResponse response) {
        if (response != null) {
            return response.getEntity();
        }
        return null;
    }

    // 获取用于加载数据的主机地址
    private String getLoadHost() {
        List<String> hosts = options.getDorisStreamLoadUrls();
        for (String host : hosts) {
            try {
                HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();
                connection.setRequestMethod("HEAD");
                int responseCode = connection.getResponseCode();
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    return host;
                }
            } catch (IOException e) {
                LOG.warn("Failed to connect to host: {}", host);
            }
        }
        return null;
    }
}
作者 east

上一 1 … 31 32 33 … 93 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (493)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (35)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.