gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆
    • json格式化及任意折叠展开
    • PDF常用工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面12 )
doris 10月 17,2023

doris手动添加分区自动消失的问题

在数据仓库开发指标时,需要回溯历史数据,对下面的表改为非动态表并添加更早时间的分区:

-- 改为非动态分区
ALTER TABLE test SET ("dynamic_partition.enable" = "false")
-- 手动添加更早的时间分区
ALTER TABLE test
ADD PARTITION p20230912 VALUES [("2023-09-12"), ("2023-09-13"));
  --改为动态分区
ALTER TABLE test SET ("dynamic_partition.enable" = "true")
CREATE TABLE `test` (
  `id` bigint(20) NOT NULL COMMENT '主键',  
  `dt` date NULL COMMENT '创建时间'
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'olap'
PARTITION BY RANGE(`dt`)
PARTITION p20231010 VALUES [('2023-10-10'), ('2023-10-11')),
PARTITION p20231011 VALUES [('2023-10-11'), ('2023-10-12')),
PARTITION p20231012 VALUES [('2023-10-12'), ('2023-10-13')),
PARTITION p20231013 VALUES [('2023-10-13'), ('2023-10-14')),
PARTITION p20231014 VALUES [('2023-10-14'), ('2023-10-15')),
PARTITION p20231015 VALUES [('2023-10-15'), ('2023-10-16')),
PARTITION p20231016 VALUES [('2023-10-16'), ('2023-10-17')),
PARTITION p20231017 VALUES [('2023-10-17'), ('2023-10-18')),
PARTITION p20231018 VALUES [('2023-10-18'), ('2023-10-19')),
PARTITION p20231019 VALUES [('2023-10-19'), ('2023-10-20')),
PARTITION p20231020 VALUES [('2023-10-20'), ('2023-10-21')))
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 1",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "-1",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);

然而奇怪的事情发生了,创建的p20230912 不见了,后来考虑到可能是动态分区搞的鬼,是由于没有正确动态分区的属性 。

看到表结构的”dynamic_partition.start” = “-7”,这表示
动态分区的起始范围。表示从今天开始向前7天。 而
p20230912 远远超过这个范围,后来把这个值调大后,果然不会凭空消失了。

ALTER TABLE test SET
(    "dynamic_partition.start" = "-75"
);

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
mysql, 大数据开发 10月 12,2023

sql聚合函数嵌套问题 aggregate function cannot contain aggregate parameters

在需求的应用场景,需要对create_time字段求最小值并求和,刚开始理所当然写成像下面这样:

 SUM(COALESCE (CASE WHEN MIN(crl.create_time)  BETWEEN date_add(date_sub(current_date(), 1), -1 * (open_case_day_num % 6)) AND current_date() AND  crl.approval_status in('0','1') and crl.del_flag = '0' THEN  CAST(crl.repay_total AS DECIMAL(10, 2)) END,0))

SUM聚合函数又嵌套MIN函数,果然报错了:
1105 – errCode = 2, detailMessage = aggregate function cannot contain aggregate parameters

这是因为 根据错误日志,问题出在了对于SUM函数中的聚合参数的使用上。在Spark SQL中,聚合函数不能包含聚合参数,也就是不允许在聚合函数内部再使用其他聚合函数。

解决方案是可以先使用子查询,将MIN(create_time)计算结果作为一个新的列,然后在外层查询中对这个新列和其他列进行求和,如下所示:

SELECT SUM(amount), SUM(min_create_time)
FROM (
  SELECT amount, MIN(create_time) AS min_create_time
  FROM your_table
  GROUP BY amount
) t;

其中,我们首先在子查询中计算每个amount的最小create_time,并将结果存储在名为min_create_time的新列中。然后,在外部查询中,我们对amount列和min_create_time列同时进行汇总。

需要注意的是,在子查询中,我们只能按照amount列进行分组,否则会导致结果不正确。如果还有其他需要汇总的列,则需要在子查询中一并处理。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 10月 7,2023

spark sql如何行转列

在数据仓库中,行转列通常称为”变形”(Pivoting) 或 “透视”(Pivoting),可使用Spark SQL的pivot语句实现。下面是一个简单的示例:

假设我们有如下表格:

+-------+-------+------+
| name  | brand | year |
+-------+-------+------+
| Alice | BMW   | 2017 |
| Bob   | Tesla | 2018 |
| Alice | Tesla | 2019 |
| Bob   | BMW   | 2020 |
+-------+-------+------+

我们想要把该表中的品牌列转换为4列,分别表示不同的品牌,而值则为对应品牌的年份。具体操作如下:

SELECT name, 
       COALESCE(BMW, 0) AS BMW, 
       COALESCE(Tesla, 0) AS Tesla 
FROM (
  SELECT name,
         brand,
         year 
  FROM my_table
) T
PIVOT (
  MAX(year) 
  FOR brand IN ('BMW', 'Tesla')
)

运行上面的代码,得到的结果如下所示:

+-------+------+------+
| name  | BMW  | Tesla|
+-------+------+------+
| Alice | 2017 | 2019 |
| Bob   | 2020 | 2018 |
+-------+------+------+

可以看到,原本的品牌列已经被转换成了两个新的列,并且对于每个人名,都有对应的品牌年份信息填充其中。这就是典型的行转列操作。其中,COALESCE函数用于处理可能存在的空值情况。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Java, python 9月 7,2023

用ChatGPT自动生成流程图

我们看别人代码时,总希望有流程图,这样可以一目了然,不过自己写的代码,又不想花几个小时去画流程图。有没有更好的方法呢?

方法就是用ChatGPT等大模型自动生成流程图,并用python等语言实现自动输出流程图。

1、生成流程图的 Mermaid语法

ChatGPT提示语:

对下面的代码生成流程图,并用Mermaid语法输出 。

2、把Mermaid语法的流程图输出图片

要使用Python或Java生成Mermaid语法输出的流程图图片,您可以使用以下方法:

Python 方法:

  1. 使用 mermaid-cli 工具来将Mermaid代码转换为图片。首先,安装mmdc(mermaid-cli的执行程序)。
  2. 使用Python调用mmdc。
import os

def generate_mermaid_image(mermaid_code, output_path):
    with open("temp.mmd", "w") as file:
        file.write(mermaid_code)
    os.system(f"mmdc -i temp.mmd -o {output_path}")
    os.remove("temp.mmd")

mermaid_code = """
graph TD;
    A-->B;
    A-->C;
    B-->D;
    C-->D;
"""

generate_mermaid_image(mermaid_code, "output.png")

Java 方法:

  1. 与Python方法相同,首先安装 mermaid-cli。
  2. 使用Java的 Runtime 类来调用 mmdc。
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

public class MermaidGenerator {

    public static void generateMermaidImage(String mermaidCode, String outputPath) throws IOException, InterruptedException {
        File tempFile = File.createTempFile("temp", ".mmd");
        try (FileWriter writer = new FileWriter(tempFile)) {
            writer.write(mermaidCode);
        }

        Process process = Runtime.getRuntime().exec("mmdc -i " + tempFile.getAbsolutePath() + " -o " + outputPath);
        process.waitFor();

        tempFile.delete();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        String mermaidCode = """
            graph TD;
                A-->B;
                A-->C;
                B-->D;
                C-->D;
            """;
        generateMermaidImage(mermaidCode, "output.png");
    }
}

请注意,这两种方法都需要您在计算机上安装并配置mermaid-cli。此外,这两种方法都是使用临时文件来存储Mermaid代码,并在转换后删除它。这是为了简化调用mmdc的过程,但您可以根据需要进行调整。

作者 east
mysql, 大数据开发, 提示词 9月 7,2023

java批量生成海量测试数据及用ChatGPT提示语一键生成的方法

在做大数据开发时,为了测试性能等,需要上千万,甚至TB或PB级别的,在测试环境可能没有那么多数据,这时可以考虑进行造测试数据。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Random;

public class TestDataGenerator {
    public static void main(String[] args) {
        String url = "jdbc:mysql://localhost:3306/your_database";
        String username = "your_username";
        String password = "your_password";
        int batchSize = 1000; // 每批次插入的数据量
        int totalRecords = 1000000; // 总共要生成的数据量

        try {
            Connection connection = DriverManager.getConnection(url, username, password);
            connection.setAutoCommit(false);

            String insertQuery = "INSERT INTO test (id, callid, type, ...其他列...) VALUES (?, ?, ?, ...其他值...)";
            PreparedStatement preparedStatement = connection.prepareStatement(insertQuery);

            Random random = new Random();

            for (int i = 1; i <= totalRecords; i++) {
                // 设置每个字段的值,根据表结构设置对应的数据生成逻辑
                preparedStatement.setLong(1, i);
                preparedStatement.setString(2, "CallSheet" + i);
                preparedStatement.setString(3, "Type" + (random.nextInt(5) + 1));
                // 设置其他字段的值...

                preparedStatement.addBatch();

                if (i % batchSize == 0) {
                    preparedStatement.executeBatch();
                    connection.commit();
                }
            }

            preparedStatement.executeBatch();
            connection.commit();

            preparedStatement.close();
            connection.close();

            System.out.println("测试数据生成完成!");
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

请将上述示例中的数据库连接信息和插入逻辑根据您的数据库设置和表结构进行相应的修改。此程序将会在数据库中插入海量测试数据。

更方便的方法是在ChatGPT等大模型,输入下面提示语:

根据下面的表结构,生成100万的测试数据,给出详细的java实现代码或存储过程代码:【表结构】

亲测在New Bing是可以生成可以运行的代码。

作者 east
Flink 9月 5,2023

Flink CDC对接数据报错:you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code

这个错误消息表明在运行 Flink CDC 连接到 MySQL 数据库时,出现了权限问题。错误消息中提到需要 “REPLICATION SLAVE privilege” 权限来执行操作,但是当前用户似乎没有该权限。

错误原因:

  • Flink CDC 需要通过 MySQL 的二进制日志来捕获数据库的变更,以便进行实时流式处理。这需要 MySQL 用户具有 REPLICATION SLAVE 权限,以允许 Flink CDC 作为 MySQL 复制从机来读取二进制日志。

解决方案: 为了解决这个问题,您可以采取以下步骤:

  1. 授予 REPLICATION SLAVE 权限:
    • 通过 MySQL 的 root 或具有足够权限的用户登录。
    • 执行以下 SQL 命令,将 REPLICATION SLAVE 权限授予 Flink CDC 使用的用户名(在 Flink 配置中指定的用户名):sql复制代码GRANT REPLICATION SLAVE ON *.* TO 'your_cdc_user'@'%' IDENTIFIED BY 'your_password';
      • your_cdc_user 替换为 Flink CDC 使用的用户名。
      • your_password 替换为 Flink CDC 使用的密码。
  2. 重新启动 Flink CDC 应用:
    • 确保 Flink CDC 应用程序重新启动,并尝试重新连接到 MySQL 数据库。
  3. 检查 Flink CDC 配置:
    • 确保 Flink CDC 配置文件中的连接字符串、用户名和密码正确配置,以匹配 MySQL 数据库的设置。
  4. 检查防火墙和网络配置:
    • 确保 MySQL 数据库的防火墙和网络配置允许 Flink CDC 应用程序连接到数据库端口。
  5. 查看 MySQL 错误日志:
    • 检查 MySQL 错误日志以获取更多关于访问被拒绝的详细信息。可能会提供有关错误原因的更多线索。
  6. 升级或重新配置 Flink CDC:
    • 如果问题仍然存在,考虑升级 Flink CDC 或重新配置其版本,以确保与 MySQL 数据库兼容性。

通过执行上述步骤,您应该能够解决 Flink CDC 连接到 MySQL 数据库时出现的权限问题。确保授予足够的权限,并检查配置以确保准确性。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
大数据开发 9月 4,2023

scala比较日期字符串的大小

使用字符串的compareTo方法:如果您的日期字符串是按照“年-月-日”的格式排列的,那么您可以直接使用字符串的compareTo方法来比较它们,无需转换为日期对象。例如,您可以使用以下的Scala代码来比较两个日期字符串1:

val date1 = "2023-09-03"
val date2 = "2023-08-21"
val result = date1.compareTo(date2)
// result: Int = 1
// result > 0 表示 date1 晚于 date2
// result < 0 表示 date1 早于 date2
// result == 0 表示 date1 等于 date2
作者 east
datax 8月 30,2023

DataX对接数据脱敏数据的实例

datax对接mysql数据,对姓名只保留姓,名变成**。对这种简单的脱敏,可以不用修改datax源码,直接在配置文件上实现。

//要脱敏的字段在第2个,也就是record.getColumn(1)
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "name", // 姓名的字段
              "age"
            ],
            "splitPk": "",
            "connection": [
              {
                "table": [
                  "test"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "doriswriter",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "INT"
              },
              {
                "name": "name",
                "type": "VARCHAR"
              },
              {
                "name": "age",
                "type": "INT"
              }
            ],
            // 省略其他参数
          }
        },
        // 添加transformer部分
        "transformer": [
                                 {
                                "name": "dx_groovy",
                                "parameter": {
                                  "code": "Column name = record.getColumn(1);def first = name.asString()[0];def last =null; last= \"*\" * (name.asString().length() - 1);def masked = null; masked = first + last; record.setColumn(1, new StringColumn(masked)); return record;"             
                                 }
                                }],
    // 省略其他部分
  }
}
作者 east
Spark 8月 30,2023

监控Spark运行超时及kill掉重跑

在用oozie的调度任务,用shell调度spark任务,在生产环境运行时,正常1-2个小时跑完的任务,有时出现跑了5、6个小时还没跑完,造成的原因很奇怪,有可能是数据倾斜,任务占用太多资源偶尔出错。为了监控这种现象,并设定阈值为3个小时,如果超过3小时没跑完就kill掉。可以结合oozie失败重试机制实现重跑。

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;

public class YarnJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个Configuration对象,用于加载Hadoop和Yarn的配置文件
        Configuration conf = new Configuration();
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.addResource("yarn-site.xml");

        // 创建一个YarnClient对象,用于访问Yarn的api
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // 调用Yarn的api,获取所有正在运行的应用程序
        List<ApplicationReport> apps = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));

        // 遍历每个应用程序
        for (ApplicationReport app : apps) {
            // 获取应用程序的ID和名称
            ApplicationId appId = app.getApplicationId();
            String appName = app.getName();
            // 判断应用程序是否是由Oozie Shell命令启动的spark任务
            if (appName.startsWith("oozie:launcher")) {
                // 如果是,打印日志或者做其他操作
                System.out.println("Found Oozie Shell spark job: " + appId);
                // 获取应用程序的开始时间和当前时间
                long startTime = app.getStartTime();
                long currentTime = System.currentTimeMillis();
                // 计算应用程序的运行时间(毫秒)
                long jobDuration = currentTime - startTime;
                // 判断应用程序的运行时间是否超过阈值
                if (jobDuration > TIMEOUT_THRESHOLD) {
                    // 如果超过阈值,调用Yarn的api,终止应用程序
                    yarnClient.killApplication(appId);
                    // 打印日志或者做其他操作
                    System.out.println("Killed Oozie Shell spark job: " + appId);
                    // 重新运行应用程序或者做其他操作
                    // ...
                } else {
                    // 如果没有超过阈值,打印日志或者做其他操作
                    System.out.println("Job " + appId + " is running normally");
                }
            }
        }

        // 关闭YarnClient对象
        yarnClient.stop();
    }
}

如果要监控oozie的调度任务,也可以用下面的方法:

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;

public class OozieJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个OozieClient对象,用于调用oozie的api
        OozieClient oozieClient = new OozieClient("http://localhost:11000/oozie");
        // 调用oozie的api,查询所有正在运行的作业
        List<WorkflowJob> jobs = oozieClient.getJobsInfo("status=RUNNING");
        // 遍历每个作业
        for (WorkflowJob job : jobs) {
            // 获取作业的ID和信息
            String jobId = job.getId();
            String jobInfo = job.toString();
            // 解析作业的信息,获取作业的运行时间
            long jobDuration = parseJobDuration(jobInfo);
            // 判断作业的运行时间是否超过阈值
            if (jobDuration > TIMEOUT_THRESHOLD) {
                // 如果超过阈值,调用oozie的api,终止作业
                oozieClient.kill(jobId);
                // 打印日志或者做其他操作
                System.out.println("Job " + jobId + " is killed due to timeout");
                // 重新运行作业或者做其他操作
                // ...
            } else {
                // 如果没有超过阈值,打印日志或者做其他操作
                System.out.println("Job " + jobId + " is running normally");
            }
        }
    }

    // 定义一个方法,用于解析作业的信息,并返回作业的运行时间(毫秒)
    private static long parseJobDuration(String jobInfo) {
        // 创建一个Matcher对象,用于匹配正则表达式和作业信息
        Matcher matcher = DURATION_PATTERN.matcher(jobInfo);
        // 如果找到了匹配的结果,就从结果中提取天数、小时数和分钟数,并转换为毫秒
        if (matcher.find()) {
            int days = Integer.parseInt(matcher.group(1));
            int hours = Integer.parseInt(matcher.group(2));
            int minutes = Integer.parseInt(matcher.group(3));
            long duration = (days * 24 + hours) * 60 + minutes; // duration in minutes
            duration *= 60 * 1000; // duration in milliseconds
            return duration;
        } else {
            // 如果没有找到匹配的结果,就返回-1表示无法解析
            return -1;
        }
    }
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 8月 29,2023

druid连接不上doris有哪些可能原因

如果你在使用Druid连接池连接Doris时遇到问题,无法连接上数据库,可能有以下几个原因和解决方案:

  1. 网络配置问题:确保你的应用程序能够与Doris数据库所在的服务器进行通信。检查防火墙设置、网络配置以及Doris数据库的监听端口是否正确开放。
  2. Doris JDBC驱动问题:确认你使用的是正确版本的Doris JDBC驱动程序。不同版本的驱动程序可能与特定版本的Doris不兼容。建议检查并使用与你所连接的Doris版本匹配的驱动程序。
  3. 连接参数错误:确认你在连接字符串中提供了正确的主机名、端口号、数据库名称以及用户名和密码。确保这些参数与Doris数据库的实际配置相匹配。
  4. Druid连接池配置问题:检查你的Druid连接池配置是否正确。确保连接URL正确设置为Doris数据库的URL,并提供有效的用户名和密码。
  5. 数据库服务问题:确保Doris数据库服务正在运行,并且具有足够的资源来处理连接请求。检查Doris数据库的日志文件,查看是否有任何错误或异常信息。
  6. 数据库权限问题:验证你使用的用户名和密码是否具有足够的权限来连接Doris数据库。确保用户拥有所需的数据库访问权限。
  7. 依赖项问题:检查你的项目依赖项是否正确配置。确保在项目中包含了Druid连接池和Doris JDBC驱动的正确版本。

如果你仔细检查并解决了上述问题,应该能够成功连接到Doris数据库。如果问题仍然存在,建议进一步检查错误日志、调试信息和网络连接状态,以确定具体的故障原因,并考虑寻求更深入的技术支持。

作者 east
doris, Java 8月 29,2023

如何用druid连接池连接doris

在对doris进行单元测试,要测试大量sql,没有连接池时,太多连接会出现问题。于是想采用druid连接池。

要使用Druid连接池连接Doris,你需要在Java代码中进行配置。以下是一个简单的示例,包含Druid连接池的配置和连接到Doris的Java代码。

首先,你需要在项目的依赖中添加Druid和Doris的驱动程序。在Maven项目中,可以在pom.xml文件中添加以下依赖:

<dependencies>
    <!-- Druid -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.2.4</version>
    </dependency>

    <!-- Doris JDBC Driver -->
    <dependency>
        <groupId>com.mysql.jdbc</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
</dependencies>

接下来,创建一个名为DruidDorisExample的Java类,并编写以下代码:

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DruidDorisExample {
    public static void main(String[] args) {
        // 创建Druid连接池
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://localhost:9030/my_database");
        dataSource.setUsername("username");
        dataSource.setPassword("password");

        try (Connection connection = dataSource.getConnection()) {
            // 执行查询语句
            Statement statement = connection.createStatement();
            ResultSet resultSet = statement.executeQuery("SELECT * FROM my_table");

            // 遍历结果集并输出数据
            while (resultSet.next()) {
                // 处理每一行的数据
                // 例如:String columnValue = resultSet.getString("column_name");
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

在上面的代码中,你需要将jdbc:mysql://localhost:9030/my_database替换为Doris的连接信息,以及正确的用户名和密码。

此外,你还可以通过在代码中设置其他Druid连接池的配置来优化连接性能。例如,你可以设置最大连接数、是否开启预处理语句缓存等。

要进行更详细的Druid连接池和Doris配置,你需要创建一个名为druid.properties的配置文件,并在main()方法中加载它:

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DruidDorisExample {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        try (InputStream inputStream = DruidDorisExample.class.getClassLoader().getResourceAsStream("druid.properties")) {
            properties.load(inputStream);
        }

        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
            Connection connection = dataSource.getConnection();

            // 执行查询语句...
            
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
java复制代码import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DruidDorisExample {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        try (InputStream inputStream = DruidDorisExample.class.getClassLoader().getResourceAsStream("druid.properties")) {
            properties.load(inputStream);
        }

        try {
            DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
            Connection connection = dataSource.getConnection();

            // 执行查询语句...
            
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

上述代码会从类路径下加载名为druid.properties的配置文件,该文件包含了Druid连接池的详细配置项。你可以根据需要在配置文件中设置相关属性,例如连接URL、用户名、密码、最大连接数等。配置文件的示例:

# Druid连接池配置
driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://localhost:9030/my_database
username=username
password=password

# 连接池配置
initialSize=5
maxActive=20
minIdle=1
maxWait=60000

请确保druid.properties文件与Java代码在同一个目录下或处于类路径中。

以上是连接到Doris的简单示例代码和配置文件。你可以根据自己的需求进行进一步调整和优化。

我自己实践时,参考网上别的配置文件,后来发现还是连接不上,而且一直在运行, 通过排查代码,参考
Druid连接池关键代码解读 ,发现是下面的配置影响了:

#测试SQL  
validationQuery=select 1 from test
原来这个表test并不存在,所以测试不通过,又没有明显报错。
作者 east
Java 8月 29,2023

Druid连接池关键代码解读

Druid连接池中的一个方法,作用是获取一个数据库连接(DruidPooledConnection)。下面对其中的主要逻辑进行解释:

  1. getConnectionInternal(maxWaitMillis):调用内部方法获取数据库连接。如果连接池已满或获取连接超时,则会抛出异常GetConnectionTimeoutException。
  2. isTestOnBorrow():检查是否需要在借用连接时进行连接有效性验证。
    • 如果需要验证连接有效性:
      • 调用testConnectionInternal(poolableConnection.getConnection())方法测试连接的有效性。如果连接有效,则跳出循环。
      • 如果连接无效,将其丢弃并从连接池中移除。
  3. 如果不需要在借用连接时进行连接有效性验证:
    • 检查连接是否已关闭,如果是,则丢弃该连接。
    • 如果开启了空闲连接验证(isTestWhileIdle()):
      • 计算连接的空闲时间。
      • 如果空闲时间超过了设定的时间间隔(timeBetweenEvictionRunsMillis),则检查连接的有效性。
      • 如果连接有效,则跳出循环。
      • 如果连接无效,将其丢弃并从连接池中移除。
  4. 如果开启了移除废弃连接(isRemoveAbandoned()):
    • 获取当前线程的堆栈信息,并将其设置到连接对象中。
    • 设置连接开始时间和追踪状态。
    • 将连接加入活跃连接集合中。
  5. 如果未开启默认自动提交(isDefaultAutoCommit()):
    • 将连接的自动提交设置为false。
  6. 返回获取的连接对象。

总体来说,这段代码的作用是从Druid连接池中获取一个可用的数据库连接,并在一系列条件判断和验证后返回该连接对象。其中包括了连接超时处理、连接有效性验证、废弃连接移除等功能,保证连接的可用性和质量。

解读的代码如下:

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;

        DruidPooledConnection poolableConnection;
        while(true) {
            while(true) {
                try {
                    poolableConnection = this.getConnectionInternal(maxWaitMillis);
                    break;
                } catch (GetConnectionTimeoutException var17) {
                    if (notFullTimeoutRetryCnt > this.notFullTimeoutRetryCount || this.isFull()) {
                        throw var17;
                    }

                    ++notFullTimeoutRetryCnt;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt);
                    }
                }
            }

            if (this.isTestOnBorrow()) {
                boolean validate = this.testConnectionInternal(poolableConnection.getConnection());
                if (validate) {
                    break;
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("skip not validate connection.");
                }

                Connection realConnection = poolableConnection.getConnection();
                this.discardConnection(realConnection);
            } else {
                Connection realConnection = poolableConnection.getConnection();
                if (realConnection.isClosed()) {
                    this.discardConnection((Connection)null);
                } else {
                    if (!this.isTestWhileIdle()) {
                        break;
                    }

                    long currentTimeMillis = System.currentTimeMillis();
                    long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis();
                    long idleMillis = currentTimeMillis - lastActiveTimeMillis;
                    long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis();
                    if (timeBetweenEvictionRunsMillis <= 0L) {
                        timeBetweenEvictionRunsMillis = 60000L;
                    }

                    if (idleMillis < timeBetweenEvictionRunsMillis) {
                        break;
                    }

                    boolean validate = this.testConnectionInternal(poolableConnection.getConnection());
                    if (validate) {
                        break;
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("skip not validate connection.");
                    }

                    this.discardConnection(realConnection);
                }
            }
        }

        if (this.isRemoveAbandoned()) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            poolableConnection.setConnectStackTrace(stackTrace);
            poolableConnection.setConnectedTimeNano();
            poolableConnection.setTraceEnable(true);
            synchronized(this.activeConnections) {
                this.activeConnections.put(poolableConnection, PRESENT);
            }
        }

        if (!this.isDefaultAutoCommit()) {
            poolableConnection.setAutoCommit(false);
        }

        return poolableConnection;
    }
作者 east

上一 1 … 11 12 13 … 42 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年12月
  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (497)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (39)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.