gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

月度归档6月 2023

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  2023   /  
  • 6月
Spark 6月 19,2023

spark dataframe left join另一个dataframe 空值异常的问题

当在Spark开发时,某个字段有空值会出现不少意想不到的情况。Spark SQL中对两个Dataframe使用join时,当作为连接的字段的值含有null值。由于null表示的含义是未知,既不知道有没有,在SQL中null值与任何其他值的比较(即使是null)永远不会为真。故在进行连接操作时null == null不为True,所以结果中不会出现该条记录,即左侧表格的这条记录对应右侧的值均为null。

解决方法一:

如果两个DataFrame进行left join时,多个字段的值有空值,那么结果就会为空。为了解决这个问题,我们可以先对这两个DataFrame进行处理,在处理的过程中将空值替换成一个特殊值,例如:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}

// 左边的DataFrame为leftDF,右边的DataFrame为rightDF
// 给定leftDF和rightDF共同进行left join的字段列
val joinColumns: Seq[String] = Seq("col1", "col2", "col3")

// 定义替换的特殊值
val specialValue: String = "__NULL__"

// 对leftDF和rightDF的joinColumns列进行空值替换
val leftJoinDF: DataFrame = replaceNullsWithSpecialValue(leftDF, joinColumns, specialValue)
val rightJoinDF: DataFrame = replaceNullsWithSpecialValue(rightDF, joinColumns, specialValue)

// 对leftJoinDF和rightJoinDF进行join操作
val joinedDF: DataFrame = leftJoinDF.join(rightJoinDF, joinColumns, "left")

// 定义空值替换函数
def replaceNullsWithSpecialValue(df: DataFrame, columns: Seq[String], replacement: String): DataFrame = {
  val columnsToReplace: Seq[Column] = columns.map(col(_))
  val columnsToKeep = df.columns.filterNot(columns.contains(_)).map(col)
  df.select((columnsToReplace ++ columnsToKeep):_*).na.fill(replacement, columnsToReplace)
}

在这里,我们使用na.fill()函数将DataFrame中的空值替换为特殊值。在处理完之后,我们就可以对两个DataFrame进行left join操作了。

解决方法二:

一种可能的解决方案是使用NULL safe equality operator(<=>),它可以在join条件中处理NULL值,使得NULL值与NULL值相等。例如,如果你有两个dataframe,df1和df2,你想要根据多个字段进行left join,你可以写成:

import org.apache.spark.sql.functions._
val joinedDF = df1.join(df2, df1("col1") <=> df2("col1") && df1("col2") <=> df2("col2"), "left")

这样,即使col1或col2中有NULL值,也不会影响join的结果。


关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书


作者 east
doris, Spark 6月 16,2023

Spark写数据到Doris报错node and exceeded the max retry times

用spark dataframe向doris写数据时,报下面错误:

Failed to load data on BE: http://192.168.50.10:18040/api/mydb/dwd_virtual_table/_stream_load? node and exceeded the max retry times.

发现表没写入成功。刚开始很困惑,后来发现是 dataFrame中的字段和目标表不一致 。

这种提示很不友好,有没有更好方式提示,方法是有的,可以用jdbc写入,发现错误时可以看到具体的提示。代码参考如下:

def writeByJDBC(dataframe: DataFrame, dorisTable: String): Unit = {
    dataframe.write.format("jdbc")
      .mode(SaveMode.Append)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://" + DORIS_HOST + ":9030/" +DATABASE_NAME + "?rewriteBatchedStatements=false")
      .option("batchsize", "" + WRITE_BATCH_SIZE)
      .option("user", DORIS_USER)
      .option("password", DORIS_PASSWORD)
      .option("isolationLevel", "NONE")
      //  .option("doris.write.fields","case_id,defendant_name,finance_name,mediation_name,mediator_name,dt")
      .option("dbtable", dorisTable)
      .save()
  }

不过这种方式还是没有Spark Doris Connector的方式写效率高,可以用上面jdbc方式调试,没问题再切换 Spark Doris Connector 方式:

def writeByDoris(dataframe: DataFrame, dorisTable: String): Unit = {
dataframe.write.format(“doris”)
.option(“doris.table.identifier”, dorisTable)
.option(“doris.fenodes”, DORIS_HOST + “:” + DORIS_FE_HTTP_PORT)
.option(“user”, DORIS_USER)
.option(“password”, DORIS_PASSWORD)
.option(“sink.batch.size”, WRITE_BATCH_SIZE)
.option(“sink.max-retries”, 3)
.option(“doris.request.retries”, 6)
.option(“doris.request.retries”, 100)
.option(“doris.request.connect.timeout.ms”, 60000)
.save()
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 6月 16,2023

spark sql多个指标合并为1条sql

在 Spark SQL 指标开发中,使用一条 SQL 语句来计算多个指标是常见的做法。以下是几种常用的方法:

  1. 使用子查询:将多个指标的计算逻辑放在子查询中,再在外层 SELECT 语句中进行聚合。这种方式可以避免代码冗余,但可能会降低 SQL 语句的可读性。
  2. 使用 Window 函数:使用 Window 函数可以在一条 SQL 语句中计算多个指标,并且可以避免对同一个数据集进行多次扫描。具体操作是使用 over() 函数配合 sum、avg、max、min等聚合函数。这种方式相比子查询可以提升 SQL 的效率,但需要熟练掌握 Window 函数的使用方法。
  3. 使用 UDAF:如果需要计算的指标比较复杂,可以考虑开发自定义聚合函数(UDAF),这样可以将计算逻辑封装到统一的函数中,提高代码的可复用性和可维护性。

例子:

  1. 使用子查询进行多指标计算

假设我们有一个订单表 orders,包含以下字段:order_id, customer_id, order_date, amount。我们需要计算每个客户在 2022 年的订单数量和总订单金额。可以使用如下 SQL 语句实现:

SELECT 
  customer_id, 
  (SELECT COUNT(*) 
   FROM orders 
   WHERE customer_id = o.customer_id AND YEAR(order_date) = 2022) AS order_count, 
  (SELECT SUM(amount) 
   FROM orders 
   WHERE customer_id = o.customer_id AND YEAR(order_date) = 2022) AS total_amount 
FROM orders o 
GROUP BY customer_id;

这个 SQL 语句使用了两个子查询来计算每个客户在 2022 年的订单数量和总订单金额。子查询返回的结果会作为 select 语句中的一个列,因此可以使用 group by 对客户进行分组。

  1. 使用 Window 函数进行多指标计算

使用 Window 函数可以更方便地对查询结果进行分析。假设我们有一个销售表 sales,包含以下字段:sale_id, customer_id, product_id, sale_date, sale_amount。我们需要计算以下指标:每个客户的总销售额、每个客户的最大销售额、每个客户的销售额排名。可以使用如下 SQL 语句实现:

SELECT 
  customer_id, 
  SUM(sale_amount) OVER (PARTITION BY customer_id) AS total_sale, 
  MAX(sale_amount) OVER (PARTITION BY customer_id) AS max_sale, 
  RANK() OVER (PARTITION BY customer_id ORDER BY sale_amount DESC) AS sale_rank 
FROM sales;

这个 SQL 语句使用了三个 Window 函数,分别计算每个客户的总销售额、最大销售额和销售额排名。这里我们将结果按客户分组,然后使用了 Partition By 子句指定了客户维度。

  1. 使用自定义聚合函数进行多指标计算

自定义聚合函数(UDAF)可以针对特定的需求编写自己的聚合逻辑。以计算最小值、最大值和平均值为例,我们可以实现一个自定义平均值 UDAF:

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
 
class CustomAvg extends UserDefinedAggregateFunction {
 
  // 输入参数的数据类型
  def inputSchema: StructType = new StructType().add("inputColumn", DoubleType)
 
  // 中间结果的数据类型
  def bufferSchema: StructType = new StructType().add("sum", DoubleType).add("count", LongType)
 
  // 输出结果的数据类型
  def dataType: DataType = DoubleType
 
  // 是否固定输入参数类型
  def deterministic: Boolean = true
 
  // 初始化中间结果缓存
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0.0 // sum
    buffer(1) = 0L // count
  }
 
  // 更新中间结果缓存
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getDouble(0) + input.getDouble(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
 
  // 合并中间结果缓存
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
 
  // 计算最终结果
  def evaluate(buffer: Row): Any = {
    buffer.getDouble(0) / buffer.getLong(1)
  }
}

这里我们实现了一个 CustomAvg UDAF,可以通过输入若干个 Double 类型的值,计算它们的平均值并返回。我们可以在 SparkSQL 中使用这个 UDAF 计算多个指标的平均值:

SELECT 
  CustomAvg(column1) AS avg1, 
  CustomAvg(column2) AS avg2, 
  CustomAvg(column3) AS avg3 
FROM table;

这个 SQL 语句使用了 CustomAvg UDAF 计算了三个列的平均值,可以根据实际需求进行扩展。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 6月 13,2023

Spark 写入数据到doris异常

用spark写入到doris,由于服务器配置不高, sink.batch.size 设置过大时,超出服务器内存限制,doris be端直接闪崩。如果 sink.batch.size 设置过小,则出现写入次数太频繁无法写入。 调整 write 的
sink.batch.size 参数大小,逐步增加 batchsize 大小,以达到更好的写入性能与内存占用的平衡。例如,可以逐渐将 batchsize 参数的值从 200 调整到 500,1000,2000,以找到最好的性能与内存占用平衡点。 刚开始设置10000时服务器be端闪崩,后来设置200时又写到中途报错,最后找到2000这个平衡点。

def writeDoris(dataframe: DataFrame, dorisTable: String): Unit = {
dataframe.write.format(“doris”)
.option(“doris.table.identifier”, dorisTable)
.option(“doris.fenodes”, DORIS_HOST + “:” + DORIS_FE_HTTP_PORT)
.option(“user”, DORIS_USER)
.option(“password”, DORIS_PASSWORD)
.option(“sink.batch.size”, 2000)
.save()
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 6月 8,2023

spark运行写入doris的任务报错park.DorisStreamLoad: Doris BE’s response cannot map to schema

在CDH的集群上跑spark任务,运行报错:
ERROR spark.DorisStreamLoad: Doris BE’s response cannot map to schema. res: “Access denied for user ‘default_cluster:hdfs@10.0.10.101’ (using password: YES)”
org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of org.apache.doris.spark.rest.models.BackendV2 (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value (‘Access denied for user ‘default_cluster:hdfs@10.0.10.101′ (using password: YES)’)
at [Source: (String)””Access denied for user ‘default_cluster:hdfs@10.0.10.101’

查了一下,连接doris的用户名和密码没错。后来发现,spark-submit的参数–conf spark.user=hdfs 影响了,把这个参数去掉就正常了。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
doris 6月 1,2023

spark jdbc模式写入异常

用spark jdbc模式写入doris报下面错误:

java.sql.BatchUpdateException: Insert has filtered data in strict mode, tracking_url=http://192.168.1.40:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:800)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:667)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Insert has filtered data in strict mode, tracking_url=http://10.0.80.54:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1098)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1046)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1371)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:746)
	... 17 more

点开上面的=http://192.168.1.40:8040/api/_load_error_log?file=__shard_49/error_log_insert_stmt_8384ff7a733e4ce5-bc24c049a085fdd2_8384ff7a733e4ce5_bc24c049a085fdd2,发现报下面的错误:

Reason: no partition for this tuple. tuple=

查看要写入doris对应的表,这个表是动态分区的,发现这个表没有插入数据的分区。后来匹配对应的doris表动态分区后,插入数据果然正常了。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.