gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面16 )
doris 6月 1,2023

spark jdbc模式写入异常

用spark jdbc模式写入doris报下面错误:

java.sql.BatchUpdateException: Insert has filtered data in strict mode, tracking_url=http://192.168.1.40:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:800)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Insert has filtered data in strict mode, tracking_url=http://10.0.80.54:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1098)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1046)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1371)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:746)
	... 17 more

点开上面的=http://192.168.1.40:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2,发现报下面的错误:

Reason: no partition for this tuple. tuple=

查看要写入doris对应的表,这个表是动态分区的,发现这个表没有插入数据的分区。后来匹配对应的doris表动态分区后,插入数据果然正常了。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 5月 26,2023

datax写入doris报错:Writing records to Doris failed

用datax同步数据到doris,报下面错误:

java.lang.RuntimeException: Writing records to Doris failed.
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.close(DorisWriterManager.java:113)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriter$Task.post(DorisWriter.java:150)
 at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:65)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Writing records to Doris failed.
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.checkFlushException(DorisWriterManager.java:189)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.waitAsyncFlushingDone(DorisWriterManager.java:150)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.flush(DorisWriterManager.java:98)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.close(DorisWriterManager.java:111)
 ... 3 more
Caused by: java.io.IOException: java.io.IOException: Failed to flush data to Doris.
{"TxnId":-1,"Label":"datax_doris_writer_fed9613c-8d95-4284-a9a2-949985cf3f8d","TwoPhaseCommit":"false","Status":"Fail","Message":"errCode = 7, detailMessage = unknown table, tableName=mediation","NumberTotalRows":0,"NumberLoadedRows":0,"NumberFilteredRows":0,"NumberUnselectedRows":0,"LoadBytes":0,"LoadTimeMs":0,"BeginTxnTimeMs":0,"StreamLoadPutTimeMs":0,"ReadDataTimeMs":0,"WriteDataTimeMs":0,"CommitAndPublishTimeMs":0}
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.asyncFlush(DorisWriterManager.java:170)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.access$000(DorisWriterManager.java:19)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager$1.run(DorisWriterManager.java:134)
 ... 1 more
Caused by: java.io.IOException: Failed to flush data to Doris.

在这种情况下,错误是由于未知表引起的。这可能是由许多因素引起的,例如:

表不存在。
表不可访问。
表配置不正确。
要修复错误,您需要确定错误的原因并采取措施进行纠正。如果表不存在,则需要创建表。如果无法访问表,则需要授予用户对表的写入权限。如果表配置不正确,则需要正确配置它。

一旦您纠正了错误的原因,就应该能够无问题地将数据写入Doris中。

以下是一些解决此错误的附加提示:

检查Doris日志以获取更多有关错误的信息。
尝试使用其他工具(例如Doris CLI或Doris API)将数据写入Doris。
如果仍然遇到问题,您可以联系Doris支持团队寻求帮助。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Java 5月 25,2023

Java 中的梯度下降

今天的大多数人工智能都是使用某种形式的神经网络实现的。在我的前两篇文章中,我介绍了神经网络并向您展示了如何在 Java 中构建神经网络。神经网络的力量主要来自于它的深度学习能力,而这种能力建立在梯度下降反向传播的概念和执行之上。我将通过快速深入了解 Java 中的反向传播和梯度下降来结束这个简短的系列文章。
有人说人工智能并不是那么智能,它主要是反向传播。那么,现代机器学习的基石是什么?
要了解反向传播,您必须首先了解神经网络的工作原理。基本上,神经网络是称为神经元的节点的有向图。神经元有一个特定的结构,它接受输入,将它们与权重相乘,添加一个偏差值,并通过激活函数运行所有这些。神经元将它们的输出馈送到其他神经元,直到到达输出神经元。输出神经元产生网络的输出。 (有关更完整的介绍,请参阅机器学习风格:神经网络简介。)
从这里开始,我假设您了解网络及其神经元的结构,包括前馈。示例和讨论将集中在梯度下降的反向传播上。我们的神经网络将有一个输出节点、两个“隐藏”节点和两个输入节点。使用一个相对简单的例子可以更容易地理解算法所涉及的数学。图 1 显示了示例神经网络的图表。
图 1. 我们将用于示例的神经网络图。
梯度下降反向传播的思想是将整个网络视为一个多元函数,为损失函数提供输入。损失函数通过将网络输出与已知的良好结果进行比较来计算一个表示网络执行情况的数字。与良好结果配对的输入数据集称为训练集。损失函数旨在随着网络行为远离正确而增加数值。
梯度下降算法采用损失函数并使用偏导数来确定网络中每个变量(权重和偏差)对损失值的贡献。然后它向后移动,访问每个变量并调整它以减少损失值。
理解梯度下降涉及微积分中的一些概念。首先是导数的概念。 MathsIsFun.com 对导数有很好的介绍。简而言之,导数为您提供函数在单个点处的斜率(或变化率)。换句话说,函数的导数为我们提供了给定输入的变化率。 (微积分的美妙之处在于它可以让我们在没有其他参考点的情况下找到变化——或者更确切地说,它可以让我们假设输入的变化非常小。)
下一个重要的概念是偏导数。偏导数让我们采用多维(也称为多变量)函数并仅隔离其中一个变量以找到给定维度的斜率。
导数回答了以下问题:函数在特定点的变化率(或斜率)是多少?偏导数回答了以下问题:给定方程的多个输入变量,仅这一个变量的变化率是多少?
梯度下降使用这些思想来访问方程中的每个变量并对其进行调整以最小化方程的输出。这正是我们训练网络时想要的。如果我们将损失函数视为绘制在图表上,我们希望以增量方式向函数的最小值移动。也就是说,我们要找到全局最小值。
请注意,增量的大小在机器学习中称为“学习率”。
当我们探索梯度下降反向传播的数学时,我们将紧贴代码。当数学变得过于抽象时,查看代码将有助于让我们脚踏实地。让我们首先查看我们的 Neuron 类,如清单 1 所示。
Neuron 类只有三个 Double 成员:weight1、weight2 和 bias。它也有一些方法。用于前馈的方法是 compute()。它接受两个输入并执行神经元的工作:将每个输入乘以适当的权重,加上偏差,然后通过 sigmoid 函数运行它。
在我们继续之前,让我们重新审视一下 sigmoid 激活的概念,我在神经网络简介中也讨论过它。清单 2 显示了一个基于 Java 的 sigmoid 激活函数。
sigmoid 函数接受输入并将欧拉数 (Math.exp) 提高到负数,加 1 再除以 1。效果是将输出压缩在 0 和 1 之间,越来越大和越来越小的数字逐渐接近极限。
DeepAI.org 对机器学习中的 sigmoid 函数有很好的介绍。
回到清单 1 中的 Neuron 类,除了 compute() 方法之外,我们还有 getSum() 和 getDerivedOutput()。 getSum() 只是进行权重 * 输入 + 偏差计算。请注意,compute() 采用 getSum() 并通过 sigmoid() 运行它。 getDerivedOutput() 方法通过一个不同的函数运行 getSum():sigmoid 函数的导数。
现在看一下清单 3,它显示了 Java 中的 sigmoid 导数函数。我们已经从概念上讨论了衍生品,下面是实际应用。
记住导数告诉我们函数在其图中的单个点的变化是什么,我们可以感受一下这个导数在说什么:告诉我给定输入的 sigmoid 函数的变化率。您可以说它告诉我们清单 1 中的预激活神经元对最终激活结果有何影响。
您可能想知道我们如何知道清单 3 中的 sigmoid 导数函数是正确的。答案是,如果它已经被其他人验证过,并且如果我们知道根据特定规则正确微分的函数是准确的,我们就会知道导数函数是正确的。一旦我们理解了它们在说什么并相信它们是准确的,我们就不必回到第一原理并重新发现这些规则——就像我们接受并应用简化代数方程式的规则一样。
所以,在实践中,我们是按照求导法则来求导数的。如果您查看 sigmoid 函数及其导数,您会发现后者可以通过遵循这些规则得出。出于梯度下降的目的,我们需要了解导数规则,相信它们有效,并了解它们的应用方式。我们将使用它们来找出每个权重和偏差在网络最终损失结果中所扮演的角色。
符号 f prime f'(x) 是“f 对 x 的导数”的一种表达方式。另一个是:
两者是等价的:
您很快就会看到的另一种表示法是偏导数表示法:
这就是说,给我变量 x 的 f 的导数。
最令人好奇的衍生规则是链式法则。它说当一个函数是复合的(函数中的函数,又名高阶函数)时,你可以像这样扩展它:
我们将使用链式法则来解压我们的网络并获得每个权重和偏差的偏导数。

作者 east
Hive 5月 20,2023

Hive 查询示例:您需要了解的内容

如果您在 Hadoop 上使用 Hive,了解 Hive 查询并熟悉一两个 Hive 查询示例是实现有效集群管理的关键。 Hive 查询消耗时间和资源,因此必须通过 Hive 查询调优来提高效率。在本文中,您将了解什么是 Hive 查询、它们如何影响您的集群(正面和负面)、有用的 Hive 查询方法,以及一个好的 Hive 查询示例是什么样的。让我们开始吧。

Hive 查询是来自 Hadoop 数据库的特定信息请求。这些信息请求由 Apache Hive 执行,Apache Hive 是一个在 Hadoop 之上开发的开源数据仓库平台。 Facebook 在编写 Java MapReduce 平台方面创建了 Hive 来执行数据分析、分布式处理和减少工作。

Hive 查询使用一组预定义的代码,这些代码是您的数据库语言的本机代码。然后数据库接收指令,一旦它理解了该指令,就收集并发布所请求的信息。

Hive 是为提高效率而设计的,这就是为什么它的查询需要完美调整和编写良好的原因。您还可以设置依赖项以启用查询的自动计划。这将保证一旦一个动作完成,下一个动作立即开始。

与增加网络带宽相比,提高系统的 RAM 容量和 CPU 能力可以加快 Hive 响应时间。

调优不当的查询会给组织带来重大挫折。其中最大的是错过了 SLA(服务水平协议)。

这些协议表示企业及其客户同意的服务水平,包括性能保证、数据安全、正常运行时间和客户服务标准。因此,如果低效查询导致错过 SLA,结果可能是罚款、退款,或者在某些情况下终止合同。

调整不当的 Hive 查询也会消耗资源。这些会在两个方面影响您的 Hadoop 集群。一:调优不佳的查询会耗尽集群中其他用户或功能的资源。这会导致性能下降和响应时间变慢。二:使用的资源产生成本。由于 Hive 查询调优不佳而造成的资源浪费会增加您的 AWS 账单,让您非常头疼。

低效查询的其他一些影响可能会破坏集群性能、减慢数据库速度和停机时间。

由于低效查询会产生许多负面影响,因此优化查询至关重要。

有一些非常方便的 Hive 查询调优方法,具体取决于您是针对时间还是资源使用进行优化:

适当的 Hive 调整允许您操作尽可能少的数据。一种方法是通过分区,将“键”分配给数据被隔离的子目录。当您的查询需要信息时,您可以定位数据所在的特定子集,这样可以节省您扫描不需要的数据的时间。

分桶类似于分区,尽管它有助于通过扫描更少的数据来提高连接性能。

这有助于最大限度地减少遍历查询过程中各个步骤的数据量,以及在查询状态之间移动所需的时间。

在执行方面,利用执行引擎(如 Tez、Hive on Spark 和 LLAP)可以帮助提高查询性能,而无需低级调优方法。在不需要顺序操作时利用并行执行也是明智的。您的系统可以执行的并行度取决于可用资源和整体数据结构。

此外,它有助于在所有任务之间保持工作的均匀分布,以避免出现偏差。请记住:您的整体查询速度只能与最慢的任务一样快。

最后,当您查看测试时,采样(也称为单元测试)是最好的 Hive 查询调优技术之一。通过获取数据的一个子集并一次运行一千行查询,您可以更快地找到失败、奇怪的结果和错误。这有助于您微调查询以获得更高的准确性和效率。

每个查询还应该自动进行审查和性能测量。这确保他们在晋升到更高级别的环境之前满足最低要求。将不良查询排除在生产之外。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 5月 19,2023

doris批量导出表结构java版本实现

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DorisDump {

    private static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
    private static final String DB_URL = "jdbc:mysql://localhost:9030/database";
    private static final String USER = "user";
    private static final String PASSWORD = "password";

    public static void main(String[] args) throws SQLException, IOException {
        // Parse command line arguments
        Args args = new Args(args);

        // Connect to the database
        Connection connection = DriverManager.getConnection(DB_URL, USER, PASSWORD);

        // Create a statement
        Statement statement = connection.createStatement();

        // Get the list of tables
        ResultSet tables = statement.executeQuery("show tables;");

        // Create a list to store the create table statements
        List<String> createTableStatements = new ArrayList<>();

        // Iterate over the tables
        while (tables.next()) {
            // Get the name of the table
            String tableName = tables.getString(1);

            // Get the create table statement for the table
            ResultSet createTableStatement = statement.executeQuery("show create table " + tableName + ";");

            // Add the create table statement to the list
            createTableStatements.add(createTableStatement.getString(1));
        }

        // Write the create table statements to a file
        File file = new File("table.txt");
        try (FileWriter fileWriter = new FileWriter(file)) {
            for (String createTableStatement : createTableStatements) {
                fileWriter.write(createTableStatement + "\n");
            }
        }

        // Close the connection
        connection.close();
    }

    private static class Args {

        private String host;
        private int port;
        private String user;
        private String password;
        private String database;

        public Args(String[] args) {
            for (int i = 0; i < args.length; i++) {
                if (args[i].equals("-h")) {
                    host = args[i + 1];
                } else if (args[i].equals("-P")) {
                    port = Integer.parseInt(args[i + 1]);
                } else if (args[i].equals("-u")) {
                    user = args[i + 1];
                } else if (args[i].equals("-p")) {
                    password = args[i + 1];
                } else if (args[i].equals("-d")) {
                    database = args[i + 1];
                }
            }
        }

        public String getHost() {
            return host;
        }

        public int getPort() {
            return port;
        }

        public String getUser() {
            return user;
        }

        public String getPassword() {
            return password;
        }

        public String getDatabase() {
            return database;
        }
    }
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Java 5月 19,2023

mysql导出某个库所有表名和表名的注释并保存在excel

import java.io.File;
import java.io.FileOutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;

public class ExportMysqlSchema {

    public static void main(String[] args) throws Exception {
        // 连接mysql数据库
        String url = "jdbc:mysql://localhost:3306/test?user=root&password=root";
        Connection conn = DriverManager.getConnection(url);
        Statement stmt = conn.createStatement();

        // 获取某个库所有表名和表名的注释
        String sql = "SELECT TABLE_NAME, TABLE_COMMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA='test'";
        ResultSet rs = stmt.executeQuery(sql);

        // 创建excel文件
        File file = new File("mysql_schema.xls");
        FileOutputStream fos = new FileOutputStream(file);

        // 创建excel工作簿
        Workbook workbook = new HSSFWorkbook();
        // 创建excel工作表
        Sheet sheet = workbook.createSheet("test");
        // 创建excel表头
        Row header = sheet.createRow(0);
        header.createCell(0).setCellValue("TABLE_NAME");
        header.createCell(1).setCellValue("TABLE_COMMENT");

        // 写入excel数据
        int index = 1; // 行索引
        while (rs.next()) {
            // 获取表名和表名的注释
            String tableName = rs.getString(1);
            String tableComment = rs.getString(2);
            // 创建excel行
            Row row = sheet.createRow(index++);
            // 写入excel单元格
            row.createCell(0).setCellValue(tableName);
            row.createCell(1).setCellValue(tableComment);
        }

        // 保存excel文件
        workbook.write(fos);

        // 关闭资源
        fos.close();
        workbook.close();
        rs.close();
        stmt.close();
        conn.close();
    }
}
作者 east
Kafka 5月 18,2023

2020 年精选:Kafka 优化:四个最佳实践

2020 年不会成为十年来最好的年份之一,但我们可以肯定地说,至少有一件好事发生了:Kafka 优化的四个最佳实践的总结。这篇博文最初发表于 5 月,并迅速成为我们读者的最爱。鉴于最佳实践至今仍然有效,我们想再次强调它们,以圆满结束这一年。阅读并享受它。

阿帕奇卡夫卡很棒。它允许创建易于扩展的实时、高吞吐量、低延迟数据流。优化的 Kafka 性能还带来其他好处,例如抵抗集群内发生的机器/节点故障以及集群上数据和消息的持久化。 Kafka 框架的性能优化应该是重中之重。

但优化是一项复杂的工作。优化您的 Apache Kafka 部署可能是一个挑战,因为分布式架构有很多层,并且可以在这些层内调整参数。

例如: 通常,具有自动数据冗余的高吞吐量发布-订阅 (pub/sub) 模式是一件好事。但是,当您的消费者努力跟上您的数据流,或者如果他们无法阅读消息,因为这些消息在消费者到达它们之前就消失了,那么就需要做一些工作来支持消费应用程序的性能需求。

Kafka 优化是一个广泛的主题,可以非常深入和精细,但这里有一些关键的最佳实践可以帮助您入门:

这听起来可能非常明显,但您会惊讶于有多少人使用旧版本的 Kafka。一个非常简单的 Kafka 优化举措是升级并使用最新版本的平台。您必须确定您的客户是否使用旧版本的 Kafka(0.10 或更早版本)。如果是,他们应该立即升级。

最新版本的 Kafka(版本 0.8x)附带 Apache ZooKeeper,主要用于协调消费者群体。使用过时版本的 Kafka 会导致重新平衡运行时间过长以及重新平衡算法失败。

优化 Apache Kafka 部署是优化平台堆栈层的练习。分区是吞吐量性能所基于的存储层。每个分区的数据速率是消息的平均大小乘以每秒消息数。简而言之,它是数据通过分区的速率。所需的吞吐率决定了分区的目标架构。

解决方案架构师希望每个分区都支持相似的数据量和吞吐率。实际上,数据速率会随着时间的推移而变化,生产者和消费者的原始数量也会随之变化。

可变性带来的性能挑战是消费者滞后的可能性,也就是消费者读取率落后于生产者写入率。随着 Kafka 环境的扩展,随机分区是一种有效的方法,可确保您不会在不必要地尝试将静态定义应用于移动性能目标时引入人为瓶颈。

分区领导通常是通过由 Zookeeper 维护的元数据进行简单选举的产物。然而,领导选举并没有考虑到各个分区的性能。根据您的 Kafka 发行版,可以利用专有的平衡器,但由于缺乏此类工具,随机分区提供了最不干涉的路径来平衡性能。

外卖?在写入主题时坚持随机分区,除非体系结构要求另有要求。

在较旧的 Kafka 版本中,参数 receive.buffer.bytes 默认设置为 64kB。在较新的 Kafka 版本中,参数为 socket.receive.buffer.bytes,默认为 100kB。

这对 Kafka 优化意味着什么?对于高吞吐量环境,这些默认值太小,因此不够用。当代理和消费者之间的网络带宽延迟乘积大于 LAN(局域网)时,情况就很明显了。

如果您的网络以 10 Gbps 或更高的速度运行并且延迟为 1 毫秒或更长,建议您将套接字缓冲区调整为 8 或 16 MB。如果内存有问题,请考虑 1 MB。

优化 Apache Kafka 部署是一项持续的工作,但这五个最佳实践应该是一个坚实的开始。上面提到的性能优化技巧只是用户可以实施以提高 Kafka 性能的一些优化方法。 Kafka 越来越受到应用程序开发人员、IT 专业人员和数据管理人员的欢迎。并且有充分的理由。查看我们的其他资源,其中详细讨论了 Kafka 应用于应用程序开发和数据管理的特定领域时的最佳实践。

已经在使用 Kafka 了吗?使用 Pepperdata Streaming Spotlight 监控和改进其性能。

回顾一下,我们建议您升级到最新版本的 Kafka。这是一件小事,但可以发挥重要作用。接下来,是确保您了解数据吞吐率。除非架构需求另有要求,否则我们建议您在写入主题时选择随机分区。如果你想实现高速摄取,调整消费者套接字缓冲区。我们希望您喜欢这篇 2020 年最佳博文,其中重点介绍了我们为 Kafka 优化推荐的最佳实践。

作者 east
Hive 5月 18,2023

修复hive重命名分区后新分区为0的问题

hive分区重命名后,新的分区的分区大小为0 ,
例如

alter table entersv.ods_t_test partition(dt='2022-11-08') rename to partition(dt='2022-11-21')

ods_t_test 的2022-11-21分区大小为0。怎样修复

  • 使用 msck repair table 命令来修复表的元数据,让hive重新扫描分区目录并更新分区信息2。
  • 使用 analyze table 命令来重新计算分区的统计信息,包括分区大小,行数等3。

下面的示例代码:

-- 修复表的元数据
msck repair table entersv.ods_t_test;
-- 重新计算分区的统计信息
analyze table entersv.ods_t_test partition(dt) compute statistics;
作者 east
Spark 5月 17,2023

通过可观察性优化 Spark 中的性能

Apache Spark 为大数据行业的企业提供了许多好处。然而,与任何技术解决方案一样,它也伴随着挑战和障碍。通过优先考虑强调可观察性的工具,您可以优化 Spark 作业的性能,并开始深入了解性能问题的原因,而不仅仅是问题的本质。

Apache Spark 架构为大数据行业提供非常有用的工具有几个原因:

无论 Spark 多么强大,它仍然面临着一系列挑战。因此:根据我们的 2020 年大数据性能报告,观察到 Spark 作业的失败频率是其他作业的四到七倍。

此外,该报告进一步揭示了在没有 Spark 性能调整和优化的情况下:

这就是为什么公司需要优化 Spark 的性能。如果不应用 Spark 优化技术,集群将继续过度配置和利用资源。据一位分析师称,在全球范围内,仅闲置资源一项就产生了约 88 亿美元的年增长率。

在优化 Spark 工作负载和作业时,关键是可观察性。

以内存利用率为例。也许用户需要分配更多内存以避免垃圾回收。或者,在多租户环境中,用户可能分配了过多的内存,导致租户之间出现排队等问题。如果没有正确的优化解决方案,Spark 用户对如何为他们的集群正确分配内存一无所知。

Spark 性能调优的另一个机会是减少(如果不能避免的话)数据倾斜。 Spark 对数据倾斜很敏感,对于高度分布式和瘫痪的应用程序来说,它可能是非常具有破坏性的。数据倾斜导致某些应用程序元素的工作时间超过它们应有的时间,而其他计算资源则闲置,未得到充分利用。有助于优化 Spark 性能的工具应该跟踪数据倾斜并提出有效的建议来纠正它。

那么如何优化 Spark 的性能并衡量成功呢?再次,通过可观察性。

Spark 用户最终需要说,“嘿,我的应用程序现在运行无故障,而且我始终如一地满足我的 SLA。”为此,他们需要合适的可观察性工具来帮助他们确定内存利用率、数据倾斜以及大多数公司工作的多租户环境中可能出现的其他问题。

作者 east
Spark 5月 17,2023

银行业大数据分析:如何减少超支

在银行业有效利用大数据分析现在是一项竞争要求。大数据在银行业中的作用是多方面的。最常见的用例是访问各种类型的第一方数据,以更好地了解客户并根据他们的需求定制产品和解决方案。其他大数据银行业务示例和用例包括利用命令式数据协助银行处理 Customer 360,以及通过高级财务数据分析进行风险管理。总体而言,银行业通过大数据分析实现的自动化也最大限度地减少了人类情感和偏见对金融交易的影响。

然而,许多银行每年花费超过 1 亿美元来继续获得银行业大数据分析的好处。其中一些是不必要的超支。公司需要采用新的技术工具来帮助他们优化性能并减少超支。

多租户数据湖的挑战

所有主要银行都在随时存储和访问大量数据。这些数据位于每年都在扩展的数据湖中。通常,它们的大小会增加一倍以上。

曾几何时,大数据银行示例和用例是分开的。将有一个商业银行数据湖,一个零售数据湖,一个投资数据湖,一个家庭保险数据湖,等等。但今天,越来越多的数据涌入,形成了一个巨大的数据湖。银行的各个部分充当租户,访问同一个数据湖。这种多租户数据湖安排给系统和服务器带来了巨大压力。通常,在银行业有效使用大数据分析的能力会逐渐停止。

多租户数据湖还带来了另一个问题:数据隐私。关于银行和金融机构如何处理大量不同的、非结构化的私人数据,存在很多紧张的讨论。并且有充分的理由;当谈到银行业的数据分析时,个人信息是从消费者行为分析中收集的,他们的决策是使用大量杂乱无章的信息做出的。当然,安全协议已经到位,可以保护如此庞大的数据集合。但是,当每个人都浸入同一个湖中时,数据泄露的风险就会增加。

银行如何最终超支以跟上

金融服务公司通常如何应对多租户数据湖的挑战?通过增加基础设施。根据 ResearchandMarkets 的一份报告,在大数据和人工智能上投资至少 5000 万美元的公司去年增长了 7%。在全球范围内,每年约有 1800 亿美元用于大数据分析投资。

简而言之,在银行业有效利用大数据分析的压力促使银行迅速投资额外的本地和云基础设施,以跟上不断增长的数据量。这会导致 Dell 或 IBM 的巨额硬件账单,和/或 AWS 的巨额云账单。

此外,运营基础设施本身也需要花钱。指挥基础设施和不断扩大的数据湖的管理和维护需要具有专业知识的专业人员的服务。成本迅速增加,许多银行每年花费超过 1 亿美元来继续获得银行业大数据分析的好处。

作者 east
Spark 5月 17,2023

为什么 Spark 这么慢?今天优化 Spark 的 5 种方法

在竞争日益激烈的世界中寻找优势?通过 Kubernetes 2023 状态揭示 Kubernetes 可以为您的业务做些什么。现在阅读以探索如何利用 Kubernetes 的优势、解锁潜在解决方案并克服挑战。

当 Apache Spark 运行良好时,它确实运行良好。但有时,用户会发现自己在问这个令人沮丧的问题。

Spark之所以如此受欢迎,是因为它比传统的数据处理解决方案能够执行更多的计算和更多的流处理。与 MapReduce 等流行的传统系统相比,Spark 的速度要快 10-100 倍。但是,虽然 Spark 能够处理范围广泛的工作负载和大数据集,但有时也会遇到困难。这就是原因,这就是您可以采取的措施。

所以:您已经尝试了一些 Apache Spark 性能调优技术,但您的应用程序仍然很慢。此时,是时候深入了解您的 Spark 架构,并确定导致您的实例运行缓慢的原因。

驱动故障

在 Spark 架构中,驱动程序充当编排器。因此,它配备的内存少于执行程序。当驱动程序遇到 OutOfMemory (OOM) 错误时,可能是以下原因造成的:

简而言之,当驱动程序执行需要更多内存的服务或尝试使用比分配的内存更多的内存时,就会发生 OOM 错误。解决这种情况的两个有效的 Spark 调优技巧是:

高并发

有时,Spark 运行缓慢是因为运行的并发任务太多。

高并发能力是一个有益的特性,因为它提供了 Spark 原生的细粒度共享。这会导致最大的资源利用率,同时减少查询延迟。 Spark 将作业和查询划分为多个阶段,并将每个阶段分解为多个任务。 Spark 会根据多种因素并发执行这些任务。

但是,并行执行的任务数基于 spark.executor.cores 属性。虽然高并发意味着要执行多个任务,但如果该值设置得太高而没有适当考虑内存,执行程序将失败。

低效查询

为什么 Spark 这么慢?也许你有一个写得不好的查询潜伏在某处。

按照设计,Spark 的 Catalyst 引擎会自动尝试最大程度地优化查询。但是,如果查询本身写得不好,任何优化工作都注定会失败。例如,一个查询被编程为选择 Parquet/ORC 表的所有列。每列都需要某种程度的内存中列批处理状态。如果查询选择所有列,则会导致更高的开销。

一个好的查询读取尽可能少的列。一个好的 Spark 性能调优实践是尽可能使用过滤器。这有助于限制获取到执行程序的数据。

另一个好技巧是使用分区修剪。将查询转换为使用分区列是优化查询的一种方法,因为它可以极大地限制数据移动。

配置不正确

获得正确的内存配置对于 Spark 应用程序的整体性能至关重要。

每个 Spark 应用程序都有一组不同的内存和缓存要求。如果配置不正确,Spark 应用程序会变慢或崩溃。深入查看 spark.executor.memory 或 spark.driver.memory 值将有助于确定工作负载是否需要更多或更少的内存。

YARN 容器内存开销也会导致 Spark 应用程序变慢,因为 YARN 需要更长的时间来分配更大的内存池。实际情况是 YARN 在容器中运行每个 Spark 组件,例如驱动程序和执行程序。它产生的开销内存实际上是用于JVM(驱动程序)开销、interned字符串和JVM的其他元数据的堆外内存。

当由于 YARN 内存开销导致 Spark 性能下降时,您需要将 spark.yarn.executor.memoryOverhead 设置为正确的值。通常,为开销分配的理想内存量是执行程序内存的 10%。

您需要采取某些步骤来确保 Spark 运行不慢。以下是使您的 Spark 架构、节点和应用程序以最佳水平运行的一些有效方法。

数据序列化

这种特殊的 Spark 优化技术将内存中的数据结构转换为可以存储在文件中或通过网络传输的不同格式。使用这种策略,您可以显着提高分布式应用程序的性能。两种流行的数据序列化方法是:

Java 序列化——您使用 ObjectOutputStream 框架序列化数据,并利用 java.io.Externalizable 来完全控制序列化的性能。 Java 序列化提供轻量级持久性。

Kyro 序列化——Spark 利用 Kryo 序列化库 (v4) 比 Java 序列化更快地序列化对象。这是一种更紧凑的方法。要通过使用 Kyro 序列化真正提高 Spark 应用程序的性能,必须通过 registerKryoClasses 方法注册这些类。

缓存

缓存是一种高效的优化技术,在处理重复需要和查询的数据时使用。 Cache() 和 persist() 非常适合存储数据集、RDD 和 DataFrame 的计算。

需要记住的是,cache() 将数据放入内存中,而 persist() 将数据存储在用户指定或定义的存储级别中。缓存有助于降低成本并在处理重复计算时节省时间,因为从内存读取数据比从磁盘读取数据快得多。

数据结构调整

数据结构调整减少了 Spark 内存消耗。数据结构调优通常包括:

垃圾收集优化

垃圾回收是一种内存管理工具。每个应用程序都将数据存储在内存中,内存中的数据有一个生命周期。垃圾收集标记哪些数据不再需要,标记为删除,然后删除。删除发生在应用程序暂停期间。这些暂停是要避免的。当垃圾收集成为瓶颈时,使用带有 -XX:+UseG1GC 的 G1GC 垃圾收集器已被证明效率更高。

Spark 并不总是完美运行。这是一个很棒的数据处理平台,但不能让它完全自动运行。一致的 Spark 性能调优将帮助您的 Spark 基础设施以最佳水平运行

下次您发现自己问“为什么 Spark 这么慢?”时,请深入了解 Spark 架构并仔细研究。前面提到的 Spark 性能缓慢的原因可能只是罪魁祸首之一,而提到的提高性能的技巧可能是您需要改进的地方。

作者 east
Kafka 5月 16,2023

Kafka Streams 最佳实践:今天要尝试的 3 个

Kafka Streams 最好定义为专门为构建应用程序和微服务而设计的客户端库。考虑 Kafka 流的一种简洁方法是将其视为一种消息服务,其中数据(以消息的形式)在 Kafka 集群中从一个应用程序传输到另一个应用程序,从一个位置传输到另一个仓库。所有输入和输出数据都存放在 Apache Kafka 集群中。

为了形象化,这是 Kafka 环境中数据传输的样子:数据请求(消费者,用 Kafka 术语来说)被创建并发送到另一端。在这里,生产者响应请求生成内容,并通过 Kafka 架构将其直接传送回消费者,消费者随后消费信息。

Kafka Streams 是开发人员非常流行的工具,主要是因为它可以处理从消费者到生产者的数百万个请求,并将它们分布在数十台服务器上,以确保快速和连续传输,同时保持准确性。相反,该平台可以将大量数据从生产者转移到消费者,同时保证消费者实际上能够以他们需要的速度和他们需要的顺序消费数据。

但人们真正喜欢 Kafka Streams 的地方在于数据流没有停顿。大规模实时数据处理对于许多应用程序来说至关重要。 (没有它,Facebook 或 Uber 就会有大麻烦。)Kafka 不间断地传输数据,这与使用过时遗留应用程序的传统环境不同。 Kafka Streams 可在其集群内实现无阻碍的数据流,确保数据在来回循环中从一个应用程序传输到另一个应用程序,从消费者传输到生产者,中间没有任何停顿。其内置冗余可确保数据在传输过程中不会丢失,并完好无损地到达预定目的地。

所以:Kafka Streams 很棒。 Kafka Streams 为任何应用程序开发项目带来的优势直接证明了该平台越来越受欢迎和无处不在。但要真正从 Kafka Streams 中获得最大价值,您需要将一些最佳实践应用于底层 Kafka 平台:

如果你想优化 Kafka Streams 环境中的数据流,你需要跟踪很多事情。其中之一是速度,因为它涉及:

速度是一个关键组成部分。 Kafka Streams 可以有效地促进数据在集群内的移动。但是,如果消息移动的速度对您的应用程序来说不够快,则可能意味着麻烦。确保数据和消息以您需要的速度移动。

要完全优化 Kafka Streams,您的架构必须使用适量的资源构建,以便它获得并保持必要的数据流速度以实现其目标。简而言之,您需要解决以下问题:

必须构建消息路由以满足您的应用程序要求并且吞吐量要令您满意。这是关于为工作而建造的。你不会想要一辆半卡车来运送比萨饼。同样的原则适用于比特,而不仅仅是原子。

如果 Kafka 开始表现不佳,问题可能出在 Kafka 指标上。但这也可能是另一个问题,例如硬盘驱动器或某些性能不佳的内存。团队需要能够尽可能快速有效地进行故障排除。

借助正确的大数据分析工具,您可以集中查看堆栈中的所有硬件、应用程序和监控指标,包括来自 Kafka Streams 的指标。您得到的是一个单一的统一界面,其中指标和消息传递相互关联。这使您可以查看哪个应用程序遇到问题以及它如何影响 Kafka 及其功能。

在大多数大数据监控设置中,硬件监控不同于大数据应用程序监控。将 Kafka 监控添加到图片中,您将拥有一个大的脱节监控环境。但是使用像 Pepperdata 这样的统一监控套件,您可以查看和跟踪所有内容。您对大数据堆栈享有绝对且无与伦比的可见性和可观察性。

Kafka 是一个丰富而复杂的平台。还有很多东西要学。但是 Kafka Streams 的这三个最佳实践是让您获得成功的强大基础。

作者 east

上一 1 … 15 16 17 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?
  • 如何进行AUTOSAR模块的持续集成(CI)部署与版本控制?

文章归档

  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (42)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (484)
    • CDH (6)
    • datax (4)
    • doris (28)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (9)
    • 运维 (33)
      • Docker (2)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.