gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Spark

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Spark"
  • ( 页面2 )
Spark 10月 7,2023

spark sql如何行转列

在数据仓库中,行转列通常称为”变形”(Pivoting) 或 “透视”(Pivoting),可使用Spark SQL的pivot语句实现。下面是一个简单的示例:

假设我们有如下表格:

+-------+-------+------+
| name  | brand | year |
+-------+-------+------+
| Alice | BMW   | 2017 |
| Bob   | Tesla | 2018 |
| Alice | Tesla | 2019 |
| Bob   | BMW   | 2020 |
+-------+-------+------+

我们想要把该表中的品牌列转换为4列,分别表示不同的品牌,而值则为对应品牌的年份。具体操作如下:

SELECT name, 
       COALESCE(BMW, 0) AS BMW, 
       COALESCE(Tesla, 0) AS Tesla 
FROM (
  SELECT name,
         brand,
         year 
  FROM my_table
) T
PIVOT (
  MAX(year) 
  FOR brand IN ('BMW', 'Tesla')
)

运行上面的代码,得到的结果如下所示:

+-------+------+------+
| name  | BMW  | Tesla|
+-------+------+------+
| Alice | 2017 | 2019 |
| Bob   | 2020 | 2018 |
+-------+------+------+

可以看到,原本的品牌列已经被转换成了两个新的列,并且对于每个人名,都有对应的品牌年份信息填充其中。这就是典型的行转列操作。其中,COALESCE函数用于处理可能存在的空值情况。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 30,2023

监控Spark运行超时及kill掉重跑

在用oozie的调度任务,用shell调度spark任务,在生产环境运行时,正常1-2个小时跑完的任务,有时出现跑了5、6个小时还没跑完,造成的原因很奇怪,有可能是数据倾斜,任务占用太多资源偶尔出错。为了监控这种现象,并设定阈值为3个小时,如果超过3小时没跑完就kill掉。可以结合oozie失败重试机制实现重跑。

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;

public class YarnJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个Configuration对象,用于加载Hadoop和Yarn的配置文件
        Configuration conf = new Configuration();
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        conf.addResource("yarn-site.xml");

        // 创建一个YarnClient对象,用于访问Yarn的api
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(conf);
        yarnClient.start();

        // 调用Yarn的api,获取所有正在运行的应用程序
        List<ApplicationReport> apps = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));

        // 遍历每个应用程序
        for (ApplicationReport app : apps) {
            // 获取应用程序的ID和名称
            ApplicationId appId = app.getApplicationId();
            String appName = app.getName();
            // 判断应用程序是否是由Oozie Shell命令启动的spark任务
            if (appName.startsWith("oozie:launcher")) {
                // 如果是,打印日志或者做其他操作
                System.out.println("Found Oozie Shell spark job: " + appId);
                // 获取应用程序的开始时间和当前时间
                long startTime = app.getStartTime();
                long currentTime = System.currentTimeMillis();
                // 计算应用程序的运行时间(毫秒)
                long jobDuration = currentTime - startTime;
                // 判断应用程序的运行时间是否超过阈值
                if (jobDuration > TIMEOUT_THRESHOLD) {
                    // 如果超过阈值,调用Yarn的api,终止应用程序
                    yarnClient.killApplication(appId);
                    // 打印日志或者做其他操作
                    System.out.println("Killed Oozie Shell spark job: " + appId);
                    // 重新运行应用程序或者做其他操作
                    // ...
                } else {
                    // 如果没有超过阈值,打印日志或者做其他操作
                    System.out.println("Job " + appId + " is running normally");
                }
            }
        }

        // 关闭YarnClient对象
        yarnClient.stop();
    }
}

如果要监控oozie的调度任务,也可以用下面的方法:

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// 导入oozie的api相关的类
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;

public class OozieJobMonitor {

    // 定义一个正则表达式,用于匹配作业的运行时间
    private static final Pattern DURATION_PATTERN = Pattern.compile("Duration\\s*:\\s*(\\d+) days, (\\d+) hours, (\\d+) minutes");

    // 定义一个常量,表示超时的阈值(3小时)
    private static final long TIMEOUT_THRESHOLD = 3 * 60 * 60 * 1000; // 3 hours in milliseconds

    public static void main(String[] args) throws Exception {
        // 创建一个OozieClient对象,用于调用oozie的api
        OozieClient oozieClient = new OozieClient("http://localhost:11000/oozie");
        // 调用oozie的api,查询所有正在运行的作业
        List<WorkflowJob> jobs = oozieClient.getJobsInfo("status=RUNNING");
        // 遍历每个作业
        for (WorkflowJob job : jobs) {
            // 获取作业的ID和信息
            String jobId = job.getId();
            String jobInfo = job.toString();
            // 解析作业的信息,获取作业的运行时间
            long jobDuration = parseJobDuration(jobInfo);
            // 判断作业的运行时间是否超过阈值
            if (jobDuration > TIMEOUT_THRESHOLD) {
                // 如果超过阈值,调用oozie的api,终止作业
                oozieClient.kill(jobId);
                // 打印日志或者做其他操作
                System.out.println("Job " + jobId + " is killed due to timeout");
                // 重新运行作业或者做其他操作
                // ...
            } else {
                // 如果没有超过阈值,打印日志或者做其他操作
                System.out.println("Job " + jobId + " is running normally");
            }
        }
    }

    // 定义一个方法,用于解析作业的信息,并返回作业的运行时间(毫秒)
    private static long parseJobDuration(String jobInfo) {
        // 创建一个Matcher对象,用于匹配正则表达式和作业信息
        Matcher matcher = DURATION_PATTERN.matcher(jobInfo);
        // 如果找到了匹配的结果,就从结果中提取天数、小时数和分钟数,并转换为毫秒
        if (matcher.find()) {
            int days = Integer.parseInt(matcher.group(1));
            int hours = Integer.parseInt(matcher.group(2));
            int minutes = Integer.parseInt(matcher.group(3));
            long duration = (days * 24 + hours) * 60 + minutes; // duration in minutes
            duration *= 60 * 1000; // duration in milliseconds
            return duration;
        } else {
            // 如果没有找到匹配的结果,就返回-1表示无法解析
            return -1;
        }
    }
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 24,2023

spark cacheTable的作用

DataFrame.sqlContext.cacheTable 是一种用于缓存 Spark DataFrame 中数据表的方法。它可以用来在内存中缓存数据,以提高查询性能和加速数据分析过程。

作用:

当你使用 DataFrame.sqlContext.cacheTable 方法缓存一个数据表时,Spark 会将该表的数据加载到内存中,以便在后续的查询和操作中快速访问数据,而不必每次都从磁盘读取。这可以带来以下好处:

  1. 加速查询性能: 缓存数据表可以显著提高查询性能,因为查询操作可以在内存中快速执行,而不需要再次访问磁盘。特别是对于复杂的查询或多次重复的查询,性能的提升会非常明显。
  2. 减少磁盘IO: 缓存表可以减少磁盘IO操作,从而减少整体的数据读取时间。这对于大规模数据集或需要频繁访问的数据表特别有用。
  3. 优化迭代计算: 在迭代计算中,如果同一数据表被多次使用,缓存该表可以避免重复加载和计算,加快迭代过程。
  4. 交互式分析: 在交互式分析环境中,缓存数据表可以让你更快地尝试不同的查询和分析操作,提高工作效率。

使用示例:

以下是一个示例,展示了如何使用 DataFrame.sqlContext.cacheTable 方法来缓存一个数据表:

python复制代码from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("CacheTableExample").getOrCreate()

# 读取数据并创建 DataFrame
data_df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 缓存数据表
data_df.createOrReplaceTempView("my_cached_table")
spark.sql("CACHE TABLE my_cached_table")

# 执行查询
result = spark.sql("SELECT * FROM my_cached_table WHERE age > 25")

# 关闭 SparkSession
spark.stop()

在这个示例中,数据表 “my_cached_table” 被缓存到内存中,然后可以执行快速的查询操作。需要注意的是,缓存数据表会占用一定的内存空间,因此在内存有限的情况下,需要根据实际需求进行权衡和管理。同时,缓存的数据在某些情况下可能会被自动清除,例如当内存不足时。

作者 east
Spark 8月 23,2023

spark sql对表某个字段拆分成多条记录

需求:数据库某个表,有个字段area,里面有多个省份,用逗号分隔,例如”广东省,浙江省,北京市“,要求分隔成多条记录。

要使用Spark SQL对表中的某个字段根据逗号拆分成多条记录,你可以使用split()函数来实现。以下是具体的Scala代码示例:

scala复制代码import org.apache.spark.sql.functions._

// 假设你已经创建了一个SparkSession对象,命名为spark

// 加载原始数据作为DataFrame
val df = spark.read.format("csv").option("header", "true").load("path_to_your_file.csv")

// 使用split函数将目标字段拆分成多列,并使用explode函数将拆分后的数组转换成多行
val explodedDF = df.withColumn("split_col", split(col("your_column"), ",")) 
                   .selectExpr(df.columns: _*)
                   .withColumn("exploded_col", explode(col("split_col")))

// 显示处理后的结果
explodedDF.show()

你需要将代码中的以下部分进行替换以适应你的场景:

  • path_to_your_file.csv:指定你的CSV文件的路径。
  • your_column:要拆分的字段的名称。

这段代码会将指定字段按逗号拆分成多个记录,并把拆分后的记录显示在控制台上。你可以根据自己的需求进一步处理和操作拆分后的数据。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 15,2023

优化spark dataframe left join速度

如果 Spark DataFrame 的左连接速度很慢,你可以考虑以下几种方法来提高速度:

  1. 分区优化: 确保参与左连接的两个表都进行了适当的分区,以便能够并行处理数据。可以使用 repartition 或者 partitionBy 方法来调整分区,将数据均匀地分布到集群的节点上。
  2. 广播小表: 如果左连接中的一个表很小,可以将其广播到每个节点上,以减少网络传输开销。使用 broadcast 方法可以将小表广播,以优化连接性能。
  3. 数据倾斜处理: 如果左连接的键存在数据倾斜,即某些键的数据量远大于其他键,这可能导致性能下降。可以考虑使用一些技术来处理数据倾斜,如改变连接键、使用随机前缀、或者使用coalesce 等方法重新平衡数据。
  4. 适当选择算法: Spark 会根据连接表的大小和分区情况选择不同的连接算法。可以尝试在连接操作前使用 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 来强制 Spark 选择适当的连接算法。
  5. 调整资源配置: 确保集群的资源配置足够,以避免资源瓶颈。可以调整 Spark 配置参数,如内存、CPU 核心数等,以适应连接操作的需求。
  6. 内存优化: 使用合适的内存管理策略,确保内存不会出现溢出或者过度分配。可以通过合理设置 Spark 内存分配参数来进行优化。
  7. 索引优化: 如果连接的键有索引,可以优化连接速度。但注意,Spark 并不像传统数据库系统那样直接支持索引优化,因此需要结合其他优化方法来使用。
  8. 避免多次连接: 在同一流程中,避免多次连接相同的表。如果有多个连接操作,考虑将数据缓存起来或者将连接操作分阶段执行。
  9. 持久化中间结果: 如果有复杂的计算逻辑,可以在中间步骤持久化数据,以避免重复计算。
  10. 监控调优: 使用 Spark UI 和相关日志来监控连接操作的执行计划和性能。根据监控结果进行调优。

最终的优化策略可能会因数据量、数据分布、集群配置等因素而有所不同。建议根据实际情况进行测试和调整,以找到最适合你数据和环境的优化方法。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 3,2023

CDH Yarn WebUI没有显示最近的FINISHED Applications

在跑spark任务时,CDH6.3.2的Yarn WebUI没有显示最近的FINISHED Applications,想查看已跑完的spark看不到,非常不方便。

CDH(Cloudera Distribution for Hadoop)是一个大数据处理平台,其中的YARN(Yet Another Resource Negotiator)是用于资源管理和任务调度的核心组件。您提到在YARN的WEBUI中,”FINISHED Applications”(已完成的应用程序)没有显示最新的记录。这可能由多种原因导致。

  1. 刷新问题: 有时候,Web界面可能由于不正确的缓存或其他问题而未能及时刷新,导致显示的信息不是最新的。
  2. 日志滚动策略: YARN的日志滚动策略可能会导致已完成的应用程序的日志被压缩或删除,从而影响了Web界面的展示。
  3. 数据清理策略: 可能在CDH配置中设置了数据清理策略,这可能会导致一些旧的已完成应用程序被自动清理,从而在Web界面中不再显示。

解决方案:

以下是一些可能的解决方案,您可以尝试进行排查和解决问题:

  1. 手动刷新界面: 首先尝试手动刷新Web界面,看看是否能够获取到最新的数据。有时候,简单的刷新就可以解决问题。
  2. 检查日志设置: 查看CDH集群中YARN的日志滚动和保留策略。您可以调整这些设置,确保日志文件得到正确保留并且不会被过早清理。
  3. 调整数据清理策略: 如果CDH集群中配置了自动数据清理策略,您可以考虑调整清理策略,以便保留更多的已完成应用程序数据。
  4. 查看日志和错误信息: 查看YARN的日志文件以及相关的错误信息,看看是否有任何与已完成应用程序信息显示相关的问题。
  5. 升级和修复: 如果问题仍然存在,可能需要考虑升级CDH版本或者应用程序,以获取更稳定的Web界面功能。
  6. 联系支持: 如果您尝试了以上方法仍无法解决问题,可以考虑联系CDH的技术支持团队寻求进一步帮助。

看了Yarn的配置,觉得都正常,重启了Yarn组件,重新跑任务
FINISHED Applications 有显示最新任务了。仔细查看原来是之前没有用倒序,默认是顺序排列,最新的没有显示在第一页。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 8月 2,2023

spark dataframe 如何从Seq 集合select 所有的列

要从一个 Seq 集合中选择所有的列,你可以使用 Spark DataFrame 的 select 方法,并将 Seq 集合作为参数传递给该方法。假设你已经创建了一个名为 dataframe 的 Spark DataFrame,其中包含多个列,现在你想从一个 Seq 集合中选择所有的列,可以按照以下步骤进行操作:

import org.apache.spark.sql.functions.col 
// 假设你有一个 Seq 集合,其中包含你要选择的列名
val columnsToSelect = Seq("col1", "col2", "col3", ...)
// 使用 select 方法,并将 Seq 集合中的列名转换为对应的列对象
val selectedDataFrame = dataframe.select(columnsToSelect.map(col): _*)

上述代码中,col 是 Spark 的函数,用于将列名转换为列对象。通过将 Seq 集合中的列名映射为列对象,并在 select 方法中使用 : _* 来展开参数,你可以选择所有在 Seq 集合中指定的列。

现在,selectedDataFrame 中将包含来自 dataframe 中指定的所有列。

作者 east
Spark 7月 28,2023

使用日期字段来实现数仓每月算一次的功能

在数仓开发中,为了实现对某个宽表每月执行一次计算的功能,由于宽表是关联日期维度表,有字段 is_end_month 可以判断是否是月末。

为了使调度任务简单,可以每天执行一次,判断当天不是月末是不执行具体计算任务,只有是月末时才执行。

// 假设你已经创建了SparkSession对象,命名为spark
import org.apache.spark.sql.functions.col

// 假设你的DataFrame名为df,包含is_end_month字段
val df = spark.read
  .format("csv")
  .option("header", "true")
  .load("path_to_your_data.csv") // 替换为你的数据路径

// 获取is_end_month字段的第一条数据
val firstRow = df.select(col("is_end_month")).first()

// 从Row对象中获取is_end_month的值,假设该字段为整数类型
val isEndMonthValue = firstRow.getAs[Int]("is_end_month")

// 判断is_end_month的值并执行相应操作
if (isEndMonthValue == 0) {
    spark.stop() // 退出Spark
} else if (isEndMonthValue == 1) {
    // 继续执行其他代码
    // ...
}
注意替换代码中的数据路径为你实际的数据文件路径,同时根据实际的数据类型对getAs方法中的类型进行调整。此代码将首先从DataFrame中选择is_end_month字段的第一行数据,然后将该字段的值提取为整数并进行判断,如果值为0,则停止Spark并退出,否则继续执行其他代码。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 7月 27,2023

spark任务运行出错:Container [pid=6978,containerID=container_1688954893175_0080_04_000001] is running 11698176B beyond the ‘PHYSICAL’ memory limit. Current usage: 1.5 GB of 1.5 GB physical memory used; 3.3 GB of 3.1 GB virtual memory used. Killing container.

运行spark任务报错,查看错误日志如下:
Failing this attempt.Diagnostics: [2023-07-25 09:57:03.987]Container [pid=6978,containerID=container_1688954893175_0080_04_000001] is running 11698176B beyond the ‘PHYSICAL’ memory limit. Current usage: 1.5 GB of 1.5 GB physical memory used; 3.3 GB of 3.1 GB virtual memory used. Killing container. Dump of the process-tree for container_1688954893175_0080_04_000001 :

根据错误日志,可以看出 Spark 任务执行过程中出现了内存问题。错误信息显示容器 container_1688954893175_0080_04_000001 正在超出 ‘PHYSICAL’ 内存限制。当前使用情况显示已使用了 1.5 GB 的物理内存(PHYSICAL memory),而该容器的物理内存限制是 1.5 GB,虚拟内存(virtual memory)使用了 3.3 GB,虚拟内存限制是 3.1 GB。由于超过了物理内存限制,Spark 正在终止该容器。

这种情况通常出现在 Spark 任务运行时,需要更多的内存资源,但配置的内存资源不足以满足任务的需求。这可能由以下原因导致:

  1. 数据量较大:处理的数据量超过了所分配的内存资源,导致内存不足。
  2. 计算复杂度高:Spark 任务涉及复杂的计算逻辑或涉及大量的数据转换操作,导致内存需求增加。
  3. 资源配置不合理:Spark 配置中分配给执行器(Executor)的内存资源设置过小。

解决方案:

  1. 增加物理内存:如果可行,可以在执行 Spark 任务的机器上增加物理内存,这样有更多的内存资源供 Spark 使用。
  2. 优化数据处理:考虑对数据处理逻辑进行优化,减少不必要的计算和数据转换,以降低内存需求。
  3. 调整 Spark 配置:在 Spark 任务提交时,通过 --conf 参数来调整 Executor 的内存分配。可以尝试增加 spark.executor.memory 参数的值来提高每个 Executor 的内存,如果有多个 Executor,可以适当增加 Executor 的数量。
  4. 使用分区技术:合理地对数据进行分区,以减少单个任务需要处理的数据量,从而减少内存压力。
  5. 检查资源使用情况:查看其他正在运行的任务以及系统的资源使用情况,确保没有其他任务占用了过多的资源,导致 Spark 任务无法获取足够的资源。
  6. 调整虚拟内存限制:如果虚拟内存限制过小,可以尝试增加虚拟内存限制,但这不是主要解决方案,因为虚拟内存只是在物理内存不足时充当备用。

根据具体情况选择合适的解决方案,并确保 Spark 任务有足够的内存资源来执行。如果问题持续存在,可能需要进一步分析任务的执行计划和资源使用情况,以找出更深层次的原因并进行针对性的优化。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Spark 7月 17,2023

spark dataframe如何提取某一行的字段的值

有几种方法可以从Spark dataframe中提取某一行的字段的值,使用scala语言实现。我会给你一些例子和链接,你可以参考一下。

  • 一种方法是使用**row.getString(index)**方法,其中index是字段在行中的位置。例如,如果你想要获取第一行的第一个字段的值,你可以这样写:
val row = df.first() // 获取第一行
val value = row.getString(0) // 获取第一个字段的值

这个方法需要你知道字段的索引和类型。1

  • 另一种方法是使用**row.getAsT**方法,其中T是字段的类型,columnName是字段的名称。例如,如果你想要获取第一行的name字段的值,你可以这样写:
val row = df.first() // 获取第一行
val value = row.getAs[String]("name") // 获取name字段的值

这个方法不需要你知道字段的索引,但需要你知道字段的类型和名称。2

  • 还有一种方法是使用**df.select(columnName).collect()**方法,其中columnName是字段的名称。这个方法会返回一个包含所有行的字段值的数组。例如,如果你想要获取所有行的name字段的值,你可以这样写:
val values = df.select("name").collect() // 获取所有行的name字段的值

复制

这个方法不需要你知道字段的索引和类型,但需要你知道字段的名称。

作者 east
Spark 6月 19,2023

spark dataframe left join另一个dataframe 空值异常的问题

当在Spark开发时,某个字段有空值会出现不少意想不到的情况。Spark SQL中对两个Dataframe使用join时,当作为连接的字段的值含有null值。由于null表示的含义是未知,既不知道有没有,在SQL中null值与任何其他值的比较(即使是null)永远不会为真。故在进行连接操作时null == null不为True,所以结果中不会出现该条记录,即左侧表格的这条记录对应右侧的值均为null。

解决方法一:

如果两个DataFrame进行left join时,多个字段的值有空值,那么结果就会为空。为了解决这个问题,我们可以先对这两个DataFrame进行处理,在处理的过程中将空值替换成一个特殊值,例如:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}

// 左边的DataFrame为leftDF,右边的DataFrame为rightDF
// 给定leftDF和rightDF共同进行left join的字段列
val joinColumns: Seq[String] = Seq("col1", "col2", "col3")

// 定义替换的特殊值
val specialValue: String = "__NULL__"

// 对leftDF和rightDF的joinColumns列进行空值替换
val leftJoinDF: DataFrame = replaceNullsWithSpecialValue(leftDF, joinColumns, specialValue)
val rightJoinDF: DataFrame = replaceNullsWithSpecialValue(rightDF, joinColumns, specialValue)

// 对leftJoinDF和rightJoinDF进行join操作
val joinedDF: DataFrame = leftJoinDF.join(rightJoinDF, joinColumns, "left")

// 定义空值替换函数
def replaceNullsWithSpecialValue(df: DataFrame, columns: Seq[String], replacement: String): DataFrame = {
  val columnsToReplace: Seq[Column] = columns.map(col(_))
  val columnsToKeep = df.columns.filterNot(columns.contains(_)).map(col)
  df.select((columnsToReplace ++ columnsToKeep):_*).na.fill(replacement, columnsToReplace)
}

在这里,我们使用na.fill()函数将DataFrame中的空值替换为特殊值。在处理完之后,我们就可以对两个DataFrame进行left join操作了。

解决方法二:

一种可能的解决方案是使用NULL safe equality operator(<=>),它可以在join条件中处理NULL值,使得NULL值与NULL值相等。例如,如果你有两个dataframe,df1和df2,你想要根据多个字段进行left join,你可以写成:

import org.apache.spark.sql.functions._
val joinedDF = df1.join(df2, df1("col1") <=> df2("col1") && df1("col2") <=> df2("col2"), "left")

这样,即使col1或col2中有NULL值,也不会影响join的结果。


关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书


作者 east
doris, Spark 6月 16,2023

Spark写数据到Doris报错node and exceeded the max retry times

用spark dataframe向doris写数据时,报下面错误:

Failed to load data on BE: http://192.168.50.10:18040/api/mydb/dwd_virtual_table/_stream_load? node and exceeded the max retry times.

发现表没写入成功。刚开始很困惑,后来发现是 dataFrame中的字段和目标表不一致 。

这种提示很不友好,有没有更好方式提示,方法是有的,可以用jdbc写入,发现错误时可以看到具体的提示。代码参考如下:

def writeByJDBC(dataframe: DataFrame, dorisTable: String): Unit = {
    dataframe.write.format("jdbc")
      .mode(SaveMode.Append)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://" + DORIS_HOST + ":9030/" +DATABASE_NAME + "?rewriteBatchedStatements=false")
      .option("batchsize", "" + WRITE_BATCH_SIZE)
      .option("user", DORIS_USER)
      .option("password", DORIS_PASSWORD)
      .option("isolationLevel", "NONE")
      //  .option("doris.write.fields","case_id,defendant_name,finance_name,mediation_name,mediator_name,dt")
      .option("dbtable", dorisTable)
      .save()
  }

不过这种方式还是没有Spark Doris Connector的方式写效率高,可以用上面jdbc方式调试,没问题再切换 Spark Doris Connector 方式:

def writeByDoris(dataframe: DataFrame, dorisTable: String): Unit = {
dataframe.write.format(“doris”)
.option(“doris.table.identifier”, dorisTable)
.option(“doris.fenodes”, DORIS_HOST + “:” + DORIS_FE_HTTP_PORT)
.option(“user”, DORIS_USER)
.option(“password”, DORIS_PASSWORD)
.option(“sink.batch.size”, WRITE_BATCH_SIZE)
.option(“sink.max-retries”, 3)
.option(“doris.request.retries”, 6)
.option(“doris.request.retries”, 100)
.option(“doris.request.connect.timeout.ms”, 60000)
.save()
}

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east

上一 1 2 3 … 9 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.