gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面16 )
doris 7月 18,2023

java代码判断doris的某一天的分区是否存在

需要使用doris的SHOW PARTITIONS命令来查看表的分区信息。这个命令会显示表的所有分区信息,包括分区名、范围、副本数等。您可以根据您设定的动态分区规则判断是否有缺失或多余的分区1。

要通过java代码执行doris的SHOW PARTITIONS命令,您可以使用以下步骤:

  • 导入doris的jdbc驱动包,例如doris-jdbc-0.15.0.jar2。
  • 创建一个数据库连接对象,指定doris的url、用户名和密码。
  • 创建一个语句对象,执行SHOW PARTITIONS FROM tbl1命令,其中tbl1是您要查询的表名。
  • 获取一个结果集对象,遍历结果集,判断某一天的分区是否存在。

以下是一个可能的java代码示例,仅供参考:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class DorisPartitionTest {

    public static void main(String[] args) throws Exception {
        // Load the driver
        Class.forName("org.apache.doris.jdbc.Driver");
        // Create a connection object
        Connection conn = DriverManager.getConnection("jdbc:doris://127.0.0.1:9030?database=test_db", "root", "");
        // Create a statement object
        Statement stmt = conn.createStatement();
        // Execute the command
        ResultSet rs = stmt.executeQuery("SHOW PARTITIONS FROM tbl1");
        // Define a variable to store the partition name of the day to be queried
        String partitionName = "p20210717";
        // Define a variable to store the result of whether the partition exists
        boolean partitionExists = false;
        // Traverse the result set
        while (rs.next()) {
            // Get the partition name column
            String name = rs.getString("PartitionName");
            // Compare with the partition name to be queried
            if (name.equals(partitionName)) {
                // If equal, set the result to true and break the loop
                partitionExists = true;
                break;
            }
        }
        // Close the resources
        rs.close();
        stmt.close();
        conn.close();
        // Print the result
        System.out.println("The partition " + partitionName + " exists: " + partitionExists);
    }
}
作者 east
Spark 7月 17,2023

spark dataframe如何提取某一行的字段的值

有几种方法可以从Spark dataframe中提取某一行的字段的值,使用scala语言实现。我会给你一些例子和链接,你可以参考一下。

  • 一种方法是使用**row.getString(index)**方法,其中index是字段在行中的位置。例如,如果你想要获取第一行的第一个字段的值,你可以这样写:
val row = df.first() // 获取第一行
val value = row.getString(0) // 获取第一个字段的值

这个方法需要你知道字段的索引和类型。1

  • 另一种方法是使用**row.getAsT**方法,其中T是字段的类型,columnName是字段的名称。例如,如果你想要获取第一行的name字段的值,你可以这样写:
val row = df.first() // 获取第一行
val value = row.getAs[String]("name") // 获取name字段的值

这个方法不需要你知道字段的索引,但需要你知道字段的类型和名称。2

  • 还有一种方法是使用**df.select(columnName).collect()**方法,其中columnName是字段的名称。这个方法会返回一个包含所有行的字段值的数组。例如,如果你想要获取所有行的name字段的值,你可以这样写:
val values = df.select("name").collect() // 获取所有行的name字段的值

复制

这个方法不需要你知道字段的索引和类型,但需要你知道字段的名称。

作者 east
Spark 6月 19,2023

spark dataframe left join另一个dataframe 空值异常的问题

当在Spark开发时,某个字段有空值会出现不少意想不到的情况。Spark SQL中对两个Dataframe使用join时,当作为连接的字段的值含有null值。由于null表示的含义是未知,既不知道有没有,在SQL中null值与任何其他值的比较(即使是null)永远不会为真。故在进行连接操作时null == null不为True,所以结果中不会出现该条记录,即左侧表格的这条记录对应右侧的值均为null。

解决方法一:

如果两个DataFrame进行left join时,多个字段的值有空值,那么结果就会为空。为了解决这个问题,我们可以先对这两个DataFrame进行处理,在处理的过程中将空值替换成一个特殊值,例如:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}

// 左边的DataFrame为leftDF,右边的DataFrame为rightDF
// 给定leftDF和rightDF共同进行left join的字段列
val joinColumns: Seq[String] = Seq("col1", "col2", "col3")

// 定义替换的特殊值
val specialValue: String = "__NULL__"

// 对leftDF和rightDF的joinColumns列进行空值替换
val leftJoinDF: DataFrame = replaceNullsWithSpecialValue(leftDF, joinColumns, specialValue)
val rightJoinDF: DataFrame = replaceNullsWithSpecialValue(rightDF, joinColumns, specialValue)

// 对leftJoinDF和rightJoinDF进行join操作
val joinedDF: DataFrame = leftJoinDF.join(rightJoinDF, joinColumns, "left")

// 定义空值替换函数
def replaceNullsWithSpecialValue(df: DataFrame, columns: Seq[String], replacement: String): DataFrame = {
  val columnsToReplace: Seq[Column] = columns.map(col(_))
  val columnsToKeep = df.columns.filterNot(columns.contains(_)).map(col)
  df.select((columnsToReplace ++ columnsToKeep):_*).na.fill(replacement, columnsToReplace)
}

在这里,我们使用na.fill()函数将DataFrame中的空值替换为特殊值。在处理完之后,我们就可以对两个DataFrame进行left join操作了。

解决方法二:

一种可能的解决方案是使用NULL safe equality operator(<=>),它可以在join条件中处理NULL值,使得NULL值与NULL值相等。例如,如果你有两个dataframe,df1和df2,你想要根据多个字段进行left join,你可以写成:

import org.apache.spark.sql.functions._
val joinedDF = df1.join(df2, df1("col1") <=> df2("col1") && df1("col2") <=> df2("col2"), "left")

这样,即使col1或col2中有NULL值,也不会影响join的结果。


关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书


作者 east
doris, Spark 6月 16,2023

Spark写数据到Doris报错node and exceeded the max retry times

用spark dataframe向doris写数据时,报下面错误:

Failed to load data on BE: http://192.168.50.10:18040/api/mydb/dwd_virtual_table/_stream_load? node and exceeded the max retry times.

发现表没写入成功。刚开始很困惑,后来发现是 dataFrame中的字段和目标表不一致 。

这种提示很不友好,有没有更好方式提示,方法是有的,可以用jdbc写入,发现错误时可以看到具体的提示。代码参考如下:

def writeByJDBC(dataframe: DataFrame, dorisTable: String): Unit = {
    dataframe.write.format("jdbc")
      .mode(SaveMode.Append)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://" + DORIS_HOST + ":9030/" +DATABASE_NAME + "?rewriteBatchedStatements=false")
      .option("batchsize", "" + WRITE_BATCH_SIZE)
      .option("user", DORIS_USER)
      .option("password", DORIS_PASSWORD)
      .option("isolationLevel", "NONE")
      //  .option("doris.write.fields","case_id,defendant_name,finance_name,mediation_name,mediator_name,dt")
      .option("dbtable", dorisTable)
      .save()
  }

不过这种方式还是没有Spark Doris Connector的方式写效率高,可以用上面jdbc方式调试,没问题再切换 Spark Doris Connector 方式:

def writeByDoris(dataframe: DataFrame, dorisTable: String): Unit = {
dataframe.write.format(“doris”)
.option(“doris.table.identifier”, dorisTable)
.option(“doris.fenodes”, DORIS_HOST + “:” + DORIS_FE_HTTP_PORT)
.option(“user”, DORIS_USER)
.option(“password”, DORIS_PASSWORD)
.option(“sink.batch.size”, WRITE_BATCH_SIZE)
.option(“sink.max-retries”, 3)
.option(“doris.request.retries”, 6)
.option(“doris.request.retries”, 100)
.option(“doris.request.connect.timeout.ms”, 60000)
.save()
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 6月 16,2023

spark sql多个指标合并为1条sql

在 Spark SQL 指标开发中,使用一条 SQL 语句来计算多个指标是常见的做法。以下是几种常用的方法:

  1. 使用子查询:将多个指标的计算逻辑放在子查询中,再在外层 SELECT 语句中进行聚合。这种方式可以避免代码冗余,但可能会降低 SQL 语句的可读性。
  2. 使用 Window 函数:使用 Window 函数可以在一条 SQL 语句中计算多个指标,并且可以避免对同一个数据集进行多次扫描。具体操作是使用 over() 函数配合 sum、avg、max、min等聚合函数。这种方式相比子查询可以提升 SQL 的效率,但需要熟练掌握 Window 函数的使用方法。
  3. 使用 UDAF:如果需要计算的指标比较复杂,可以考虑开发自定义聚合函数(UDAF),这样可以将计算逻辑封装到统一的函数中,提高代码的可复用性和可维护性。

例子:

  1. 使用子查询进行多指标计算

假设我们有一个订单表 orders,包含以下字段:order_id, customer_id, order_date, amount。我们需要计算每个客户在 2022 年的订单数量和总订单金额。可以使用如下 SQL 语句实现:

SELECT 
  customer_id, 
  (SELECT COUNT(*) 
   FROM orders 
   WHERE customer_id = o.customer_id AND YEAR(order_date) = 2022) AS order_count, 
  (SELECT SUM(amount) 
   FROM orders 
   WHERE customer_id = o.customer_id AND YEAR(order_date) = 2022) AS total_amount 
FROM orders o 
GROUP BY customer_id;

这个 SQL 语句使用了两个子查询来计算每个客户在 2022 年的订单数量和总订单金额。子查询返回的结果会作为 select 语句中的一个列,因此可以使用 group by 对客户进行分组。

  1. 使用 Window 函数进行多指标计算

使用 Window 函数可以更方便地对查询结果进行分析。假设我们有一个销售表 sales,包含以下字段:sale_id, customer_id, product_id, sale_date, sale_amount。我们需要计算以下指标:每个客户的总销售额、每个客户的最大销售额、每个客户的销售额排名。可以使用如下 SQL 语句实现:

SELECT 
  customer_id, 
  SUM(sale_amount) OVER (PARTITION BY customer_id) AS total_sale, 
  MAX(sale_amount) OVER (PARTITION BY customer_id) AS max_sale, 
  RANK() OVER (PARTITION BY customer_id ORDER BY sale_amount DESC) AS sale_rank 
FROM sales;

这个 SQL 语句使用了三个 Window 函数,分别计算每个客户的总销售额、最大销售额和销售额排名。这里我们将结果按客户分组,然后使用了 Partition By 子句指定了客户维度。

  1. 使用自定义聚合函数进行多指标计算

自定义聚合函数(UDAF)可以针对特定的需求编写自己的聚合逻辑。以计算最小值、最大值和平均值为例,我们可以实现一个自定义平均值 UDAF:

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
 
class CustomAvg extends UserDefinedAggregateFunction {
 
  // 输入参数的数据类型
  def inputSchema: StructType = new StructType().add("inputColumn", DoubleType)
 
  // 中间结果的数据类型
  def bufferSchema: StructType = new StructType().add("sum", DoubleType).add("count", LongType)
 
  // 输出结果的数据类型
  def dataType: DataType = DoubleType
 
  // 是否固定输入参数类型
  def deterministic: Boolean = true
 
  // 初始化中间结果缓存
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0.0 // sum
    buffer(1) = 0L // count
  }
 
  // 更新中间结果缓存
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getDouble(0) + input.getDouble(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
 
  // 合并中间结果缓存
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
 
  // 计算最终结果
  def evaluate(buffer: Row): Any = {
    buffer.getDouble(0) / buffer.getLong(1)
  }
}

这里我们实现了一个 CustomAvg UDAF,可以通过输入若干个 Double 类型的值,计算它们的平均值并返回。我们可以在 SparkSQL 中使用这个 UDAF 计算多个指标的平均值:

SELECT 
  CustomAvg(column1) AS avg1, 
  CustomAvg(column2) AS avg2, 
  CustomAvg(column3) AS avg3 
FROM table;

这个 SQL 语句使用了 CustomAvg UDAF 计算了三个列的平均值,可以根据实际需求进行扩展。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 6月 13,2023

Spark 写入数据到doris异常

用spark写入到doris,由于服务器配置不高, sink.batch.size 设置过大时,超出服务器内存限制,doris be端直接闪崩。如果 sink.batch.size 设置过小,则出现写入次数太频繁无法写入。 调整 write 的
sink.batch.size 参数大小,逐步增加 batchsize 大小,以达到更好的写入性能与内存占用的平衡。例如,可以逐渐将 batchsize 参数的值从 200 调整到 500,1000,2000,以找到最好的性能与内存占用平衡点。 刚开始设置10000时服务器be端闪崩,后来设置200时又写到中途报错,最后找到2000这个平衡点。

def writeDoris(dataframe: DataFrame, dorisTable: String): Unit = {
dataframe.write.format(“doris”)
.option(“doris.table.identifier”, dorisTable)
.option(“doris.fenodes”, DORIS_HOST + “:” + DORIS_FE_HTTP_PORT)
.option(“user”, DORIS_USER)
.option(“password”, DORIS_PASSWORD)
.option(“sink.batch.size”, 2000)
.save()
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 6月 8,2023

spark运行写入doris的任务报错park.DorisStreamLoad: Doris BE’s response cannot map to schema

在CDH的集群上跑spark任务,运行报错:
ERROR spark.DorisStreamLoad: Doris BE’s response cannot map to schema. res: “Access denied for user ‘default_cluster:hdfs@10.0.10.101’ (using password: YES)”
org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of org.apache.doris.spark.rest.models.BackendV2 (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value (‘Access denied for user ‘default_cluster:hdfs@10.0.10.101′ (using password: YES)’)
at [Source: (String)””Access denied for user ‘default_cluster:hdfs@10.0.10.101’

查了一下,连接doris的用户名和密码没错。后来发现,spark-submit的参数–conf spark.user=hdfs 影响了,把这个参数去掉就正常了。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 6月 1,2023

spark jdbc模式写入异常

用spark jdbc模式写入doris报下面错误:

java.sql.BatchUpdateException: Insert has filtered data in strict mode, tracking_url=http://192.168.1.40:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:800)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Insert has filtered data in strict mode, tracking_url=http://10.0.80.54:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1098)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1046)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1371)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:746)
	... 17 more

点开上面的=http://192.168.1.40:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2,发现报下面的错误:

Reason: no partition for this tuple. tuple=

查看要写入doris对应的表,这个表是动态分区的,发现这个表没有插入数据的分区。后来匹配对应的doris表动态分区后,插入数据果然正常了。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 5月 26,2023

datax写入doris报错:Writing records to Doris failed

用datax同步数据到doris,报下面错误:

java.lang.RuntimeException: Writing records to Doris failed.
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.close(DorisWriterManager.java:113)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriter$Task.post(DorisWriter.java:150)
 at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:65)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Writing records to Doris failed.
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.checkFlushException(DorisWriterManager.java:189)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.waitAsyncFlushingDone(DorisWriterManager.java:150)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.flush(DorisWriterManager.java:98)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.close(DorisWriterManager.java:111)
 ... 3 more
Caused by: java.io.IOException: java.io.IOException: Failed to flush data to Doris.
{"TxnId":-1,"Label":"datax_doris_writer_fed9613c-8d95-4284-a9a2-949985cf3f8d","TwoPhaseCommit":"false","Status":"Fail","Message":"errCode = 7, detailMessage = unknown table, tableName=mediation","NumberTotalRows":0,"NumberLoadedRows":0,"NumberFilteredRows":0,"NumberUnselectedRows":0,"LoadBytes":0,"LoadTimeMs":0,"BeginTxnTimeMs":0,"StreamLoadPutTimeMs":0,"ReadDataTimeMs":0,"WriteDataTimeMs":0,"CommitAndPublishTimeMs":0}
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.asyncFlush(DorisWriterManager.java:170)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager.access$000(DorisWriterManager.java:19)
 at com.alibaba.datax.plugin.writer.doriswriter.DorisWriterManager$1.run(DorisWriterManager.java:134)
 ... 1 more
Caused by: java.io.IOException: Failed to flush data to Doris.

在这种情况下,错误是由于未知表引起的。这可能是由许多因素引起的,例如:

表不存在。
表不可访问。
表配置不正确。
要修复错误,您需要确定错误的原因并采取措施进行纠正。如果表不存在,则需要创建表。如果无法访问表,则需要授予用户对表的写入权限。如果表配置不正确,则需要正确配置它。

一旦您纠正了错误的原因,就应该能够无问题地将数据写入Doris中。

以下是一些解决此错误的附加提示:

检查Doris日志以获取更多有关错误的信息。
尝试使用其他工具(例如Doris CLI或Doris API)将数据写入Doris。
如果仍然遇到问题,您可以联系Doris支持团队寻求帮助。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Java 5月 25,2023

Java 中的梯度下降

今天的大多数人工智能都是使用某种形式的神经网络实现的。在我的前两篇文章中,我介绍了神经网络并向您展示了如何在 Java 中构建神经网络。神经网络的力量主要来自于它的深度学习能力,而这种能力建立在梯度下降反向传播的概念和执行之上。我将通过快速深入了解 Java 中的反向传播和梯度下降来结束这个简短的系列文章。
有人说人工智能并不是那么智能,它主要是反向传播。那么,现代机器学习的基石是什么?
要了解反向传播,您必须首先了解神经网络的工作原理。基本上,神经网络是称为神经元的节点的有向图。神经元有一个特定的结构,它接受输入,将它们与权重相乘,添加一个偏差值,并通过激活函数运行所有这些。神经元将它们的输出馈送到其他神经元,直到到达输出神经元。输出神经元产生网络的输出。 (有关更完整的介绍,请参阅机器学习风格:神经网络简介。)
从这里开始,我假设您了解网络及其神经元的结构,包括前馈。示例和讨论将集中在梯度下降的反向传播上。我们的神经网络将有一个输出节点、两个“隐藏”节点和两个输入节点。使用一个相对简单的例子可以更容易地理解算法所涉及的数学。图 1 显示了示例神经网络的图表。
图 1. 我们将用于示例的神经网络图。
梯度下降反向传播的思想是将整个网络视为一个多元函数,为损失函数提供输入。损失函数通过将网络输出与已知的良好结果进行比较来计算一个表示网络执行情况的数字。与良好结果配对的输入数据集称为训练集。损失函数旨在随着网络行为远离正确而增加数值。
梯度下降算法采用损失函数并使用偏导数来确定网络中每个变量(权重和偏差)对损失值的贡献。然后它向后移动,访问每个变量并调整它以减少损失值。
理解梯度下降涉及微积分中的一些概念。首先是导数的概念。 MathsIsFun.com 对导数有很好的介绍。简而言之,导数为您提供函数在单个点处的斜率(或变化率)。换句话说,函数的导数为我们提供了给定输入的变化率。 (微积分的美妙之处在于它可以让我们在没有其他参考点的情况下找到变化——或者更确切地说,它可以让我们假设输入的变化非常小。)
下一个重要的概念是偏导数。偏导数让我们采用多维(也称为多变量)函数并仅隔离其中一个变量以找到给定维度的斜率。
导数回答了以下问题:函数在特定点的变化率(或斜率)是多少?偏导数回答了以下问题:给定方程的多个输入变量,仅这一个变量的变化率是多少?
梯度下降使用这些思想来访问方程中的每个变量并对其进行调整以最小化方程的输出。这正是我们训练网络时想要的。如果我们将损失函数视为绘制在图表上,我们希望以增量方式向函数的最小值移动。也就是说,我们要找到全局最小值。
请注意,增量的大小在机器学习中称为“学习率”。
当我们探索梯度下降反向传播的数学时,我们将紧贴代码。当数学变得过于抽象时,查看代码将有助于让我们脚踏实地。让我们首先查看我们的 Neuron 类,如清单 1 所示。
Neuron 类只有三个 Double 成员:weight1、weight2 和 bias。它也有一些方法。用于前馈的方法是 compute()。它接受两个输入并执行神经元的工作:将每个输入乘以适当的权重,加上偏差,然后通过 sigmoid 函数运行它。
在我们继续之前,让我们重新审视一下 sigmoid 激活的概念,我在神经网络简介中也讨论过它。清单 2 显示了一个基于 Java 的 sigmoid 激活函数。
sigmoid 函数接受输入并将欧拉数 (Math.exp) 提高到负数,加 1 再除以 1。效果是将输出压缩在 0 和 1 之间,越来越大和越来越小的数字逐渐接近极限。
DeepAI.org 对机器学习中的 sigmoid 函数有很好的介绍。
回到清单 1 中的 Neuron 类,除了 compute() 方法之外,我们还有 getSum() 和 getDerivedOutput()。 getSum() 只是进行权重 * 输入 + 偏差计算。请注意,compute() 采用 getSum() 并通过 sigmoid() 运行它。 getDerivedOutput() 方法通过一个不同的函数运行 getSum():sigmoid 函数的导数。
现在看一下清单 3,它显示了 Java 中的 sigmoid 导数函数。我们已经从概念上讨论了衍生品,下面是实际应用。
记住导数告诉我们函数在其图中的单个点的变化是什么,我们可以感受一下这个导数在说什么:告诉我给定输入的 sigmoid 函数的变化率。您可以说它告诉我们清单 1 中的预激活神经元对最终激活结果有何影响。
您可能想知道我们如何知道清单 3 中的 sigmoid 导数函数是正确的。答案是,如果它已经被其他人验证过,并且如果我们知道根据特定规则正确微分的函数是准确的,我们就会知道导数函数是正确的。一旦我们理解了它们在说什么并相信它们是准确的,我们就不必回到第一原理并重新发现这些规则——就像我们接受并应用简化代数方程式的规则一样。
所以,在实践中,我们是按照求导法则来求导数的。如果您查看 sigmoid 函数及其导数,您会发现后者可以通过遵循这些规则得出。出于梯度下降的目的,我们需要了解导数规则,相信它们有效,并了解它们的应用方式。我们将使用它们来找出每个权重和偏差在网络最终损失结果中所扮演的角色。
符号 f prime f'(x) 是“f 对 x 的导数”的一种表达方式。另一个是:
两者是等价的:
您很快就会看到的另一种表示法是偏导数表示法:
这就是说,给我变量 x 的 f 的导数。
最令人好奇的衍生规则是链式法则。它说当一个函数是复合的(函数中的函数,又名高阶函数)时,你可以像这样扩展它:
我们将使用链式法则来解压我们的网络并获得每个权重和偏差的偏导数。

作者 east
Hive 5月 20,2023

Hive 查询示例:您需要了解的内容

如果您在 Hadoop 上使用 Hive,了解 Hive 查询并熟悉一两个 Hive 查询示例是实现有效集群管理的关键。 Hive 查询消耗时间和资源,因此必须通过 Hive 查询调优来提高效率。在本文中,您将了解什么是 Hive 查询、它们如何影响您的集群(正面和负面)、有用的 Hive 查询方法,以及一个好的 Hive 查询示例是什么样的。让我们开始吧。

Hive 查询是来自 Hadoop 数据库的特定信息请求。这些信息请求由 Apache Hive 执行,Apache Hive 是一个在 Hadoop 之上开发的开源数据仓库平台。 Facebook 在编写 Java MapReduce 平台方面创建了 Hive 来执行数据分析、分布式处理和减少工作。

Hive 查询使用一组预定义的代码,这些代码是您的数据库语言的本机代码。然后数据库接收指令,一旦它理解了该指令,就收集并发布所请求的信息。

Hive 是为提高效率而设计的,这就是为什么它的查询需要完美调整和编写良好的原因。您还可以设置依赖项以启用查询的自动计划。这将保证一旦一个动作完成,下一个动作立即开始。

与增加网络带宽相比,提高系统的 RAM 容量和 CPU 能力可以加快 Hive 响应时间。

调优不当的查询会给组织带来重大挫折。其中最大的是错过了 SLA(服务水平协议)。

这些协议表示企业及其客户同意的服务水平,包括性能保证、数据安全、正常运行时间和客户服务标准。因此,如果低效查询导致错过 SLA,结果可能是罚款、退款,或者在某些情况下终止合同。

调整不当的 Hive 查询也会消耗资源。这些会在两个方面影响您的 Hadoop 集群。一:调优不佳的查询会耗尽集群中其他用户或功能的资源。这会导致性能下降和响应时间变慢。二:使用的资源产生成本。由于 Hive 查询调优不佳而造成的资源浪费会增加您的 AWS 账单,让您非常头疼。

低效查询的其他一些影响可能会破坏集群性能、减慢数据库速度和停机时间。

由于低效查询会产生许多负面影响,因此优化查询至关重要。

有一些非常方便的 Hive 查询调优方法,具体取决于您是针对时间还是资源使用进行优化:

适当的 Hive 调整允许您操作尽可能少的数据。一种方法是通过分区,将“键”分配给数据被隔离的子目录。当您的查询需要信息时,您可以定位数据所在的特定子集,这样可以节省您扫描不需要的数据的时间。

分桶类似于分区,尽管它有助于通过扫描更少的数据来提高连接性能。

这有助于最大限度地减少遍历查询过程中各个步骤的数据量,以及在查询状态之间移动所需的时间。

在执行方面,利用执行引擎(如 Tez、Hive on Spark 和 LLAP)可以帮助提高查询性能,而无需低级调优方法。在不需要顺序操作时利用并行执行也是明智的。您的系统可以执行的并行度取决于可用资源和整体数据结构。

此外,它有助于在所有任务之间保持工作的均匀分布,以避免出现偏差。请记住:您的整体查询速度只能与最慢的任务一样快。

最后,当您查看测试时,采样(也称为单元测试)是最好的 Hive 查询调优技术之一。通过获取数据的一个子集并一次运行一千行查询,您可以更快地找到失败、奇怪的结果和错误。这有助于您微调查询以获得更高的准确性和效率。

每个查询还应该自动进行审查和性能测量。这确保他们在晋升到更高级别的环境之前满足最低要求。将不良查询排除在生产之外。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 5月 19,2023

doris批量导出表结构java版本实现

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DorisDump {

    private static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
    private static final String DB_URL = "jdbc:mysql://localhost:9030/database";
    private static final String USER = "user";
    private static final String PASSWORD = "password";

    public static void main(String[] args) throws SQLException, IOException {
        // Parse command line arguments
        Args args = new Args(args);

        // Connect to the database
        Connection connection = DriverManager.getConnection(DB_URL, USER, PASSWORD);

        // Create a statement
        Statement statement = connection.createStatement();

        // Get the list of tables
        ResultSet tables = statement.executeQuery("show tables;");

        // Create a list to store the create table statements
        List<String> createTableStatements = new ArrayList<>();

        // Iterate over the tables
        while (tables.next()) {
            // Get the name of the table
            String tableName = tables.getString(1);

            // Get the create table statement for the table
            ResultSet createTableStatement = statement.executeQuery("show create table " + tableName + ";");

            // Add the create table statement to the list
            createTableStatements.add(createTableStatement.getString(1));
        }

        // Write the create table statements to a file
        File file = new File("table.txt");
        try (FileWriter fileWriter = new FileWriter(file)) {
            for (String createTableStatement : createTableStatements) {
                fileWriter.write(createTableStatement + "\n");
            }
        }

        // Close the connection
        connection.close();
    }

    private static class Args {

        private String host;
        private int port;
        private String user;
        private String password;
        private String database;

        public Args(String[] args) {
            for (int i = 0; i < args.length; i++) {
                if (args[i].equals("-h")) {
                    host = args[i + 1];
                } else if (args[i].equals("-P")) {
                    port = Integer.parseInt(args[i + 1]);
                } else if (args[i].equals("-u")) {
                    user = args[i + 1];
                } else if (args[i].equals("-p")) {
                    password = args[i + 1];
                } else if (args[i].equals("-d")) {
                    database = args[i + 1];
                }
            }
        }

        public String getHost() {
            return host;
        }

        public int getPort() {
            return port;
        }

        public String getUser() {
            return user;
        }

        public String getPassword() {
            return password;
        }

        public String getDatabase() {
            return database;
        }
    }
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east

上一 1 … 15 16 17 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.