Spark Shuffle的优化

Spark Shuffle 是连接不同 Stage 的关键环节,也是 Spark 作业中最容易产生性能瓶颈的地方之一。它涉及大量磁盘 I/O、网络传输和内存使用。优化 Shuffle 对提升作业性能和稳定性至关重要。以下是一些关键的 Spark Shuffle 优化策略:

核心目标: 减少 Shuffle 数据量、降低 I/O 开销、提升网络传输效率、优化内存使用、处理数据倾斜。

主要优化策略:

  1. 减少 Shuffle 数据量 (根本之道):
    • Map-Side 预聚合/Combining: 在 Shuffle 写之前,尽可能在 Mapper 端对数据进行聚合(combineByKeyreduceByKeyaggregateByKey)。这能显著减少需要传输的键值对数量。优先使用 reduceByKey/aggregateByKey 而不是 groupByKey
    • 选择更高效的算子: reduceByKey 优于 groupByKey + reducetreeReduce/treeAggregate 在聚合深度大时优于 reduce/aggregate(减少网络轮次)。
    • 过滤数据: 尽早使用 filter 过滤掉不需要参与 Shuffle 的数据。
    • 列裁剪: 只选择 Shuffle 后真正需要的列(尤其是在使用 DataFrame/Dataset API 时)。避免传输整个对象。
    • 避免不必要的 distinct distinct 会触发 Shuffle。考虑是否可以用其他方式(如 Map 端去重)或在更小的数据集上使用。
    • 广播小表: 如果 Join 操作中一个表很小,使用 broadcast 将其分发到所有 Executor,避免大表 Shuffle。spark.sql.autoBroadcastJoinThreshold 控制自动广播的阈值。
  2. 优化 Shuffle 写:
    • 调整 spark.shuffle.file.buffer 增加 Shuffle 写过程中每个分区文件的内存缓冲区大小(默认 32K)。增大此值(如 64K, 128K)可以减少磁盘 I/O 次数,但会增加内存压力。需权衡。
    • 调整 spark.shuffle.spill.diskWriteBufferSize 增大溢出到磁盘时使用的缓冲区大小(默认 1024K),同样减少写磁盘次数。
    • 启用 spark.shuffle.unsafe.file.output.buffer 对于使用 Tungsten 的 Shuffle,设置这个直接内存缓冲区大小(默认 32K),作用类似 spark.shuffle.file.buffer
    • 优化 spark.shuffle.spill 确保 spark.shuffle.memoryFraction 或 spark.memory.fraction 设置合理,为 Shuffle 分配足够内存,减少溢出次数。监控 GC 和溢出情况。
    • 选择高效的 Shuffle 实现:
      • Sort Shuffle (sort): Spark 1.2+ 的默认方式。对每个分区排序并合并小文件。通常最稳定高效。
      • Tungsten-Sort (tungsten-sort): 基于 Project Tungsten,使用堆外内存和更高效的编码。在 Spark 1.4+ 可用,有时性能更好(尤其处理原始类型时)。通常当 spark.shuffle.manager=sort 且满足条件(序列化器支持重定位、非聚合 Shuffle 等)时会自动使用。
    • 文件合并 (spark.shuffle.consolidateFiles): (在较新 Spark 版本中已被优化或默认行为替代) 在老版本中启用此选项可以让多个 Reduce Task 共享同一个 Mapper 输出的合并文件,减少小文件数量。新版本 Sort Shuffle 本身已优化文件数量。
  3. 优化 Shuffle 读:
    • 调整 spark.reducer.maxSizeInFlight 控制每次 Reduce Task 从远程 Executor 拉取数据的最大大小(默认 48M)。增大此值(如 96M)可以提高吞吐量,但会增加内存使用。需监控网络和内存。
    • 调整 spark.shuffle.io.maxRetries 和 spark.shuffle.io.retryWait 网络不稳定时,增加重试次数和等待时间以避免 Fetch Failed 错误。但过度重试会拖慢作业。
    • 调整 spark.shuffle.io.numConnectionsPerPeer 如果集群节点很多且网络是瓶颈,适当增加此值(默认 1)可以提升并发连接数。
    • 启用 spark.shuffle.compress (默认开启) 压缩 Shuffle 数据(写和读)。使用高效的压缩算法:
      • spark.io.compression.codec 推荐 lz4(速度快)或 zstd(压缩率高,速度也不错)。snappy 是默认值,也是不错的选择。避免使用低效的 lzf
    • 调整 spark.shuffle.service.enabled 启用 External Shuffle Service。这允许 Executor 在退出后(如动态资源分配下)Shuffle 文件仍能被访问,提高稳定性。通常在生产环境推荐启用。
  4. 调整分区数量:
    • 关键参数 spark.sql.shuffle.partitions (SQL/DataFrame/Dataset): 控制 Shuffle 后(如 Join, Aggregation)的分区数(默认 200)。这是最重要的优化点之一。
      • 分区过少: 每个分区数据量过大 -> 可能导致 OOM、GC 时间长、Task 执行慢、无法充分利用集群资源。
      • 分区过多: 每个分区数据量过小 -> Task 调度开销增大、产生大量小文件、网络请求次数增多(影响 Shuffle 读)。
      • 如何调整: 根据集群总核心数和数据量估算。经验值通常是集群总核心数的 2-3 倍。例如,集群有 100 个 Executor,每个 4 核,总核心数 400,可设置为 800 – 1200。需要根据实际作业数据量和执行情况(查看 Spark UI 中的 Shuffle Read Size/Records)反复调整测试。 数据量极大时可设置更高。
    • RDD API: repartition/coalesce: 在 RDD 操作中显式控制分区数。
  5. 处理数据倾斜:
    • 识别倾斜: 通过 Spark UI 查看各 Stage 中 Task 的执行时间分布。执行时间显著长于其他 Task 的通常处理了倾斜的分区。查看 Shuffle Read Size 差异。
    • 缓解策略:
      • 过滤倾斜 Key: 如果极少数倾斜 Key 可以单独处理或过滤掉。
      • 加盐打散: 给倾斜的 Key 添加随机前缀(扩容),在局部聚合后去掉前缀再全局聚合。
      • 提高并行度: 增加 spark.sql.shuffle.partitions,让倾斜 Key 分散到更多分区(对于单个 Key 数据量特别大的情况效果有限)。
      • 使用 skew join 优化 (Spark 3.0+ AQE): 自适应查询执行 (AQE) 能自动检测倾斜 Join 并将倾斜的分区分裂成更小的子分区进行处理。强烈推荐启用 AQE (spark.sql.adaptive.enabled=true)。
      • 特定算子: reduceByKey 比 groupByKey 更能容忍一定程度的倾斜(因为 Map 端合并了)。对于 Join 倾斜,考虑广播小表或使用 SortMergeJoin/ShuffleHashJoin 的替代方案。
  6. 利用自适应查询执行:
    • 启用 AQE (spark.sql.adaptive.enabled=true): Spark 3.0+ 的核心优化特性。
      • 动态合并 Shuffle 分区: 根据 Shuffle 后实际数据大小,自动将过小的分区合并,避免大量小 Task 的开销。
      • 动态调整 Join 策略: 在运行时根据统计信息将 SortMergeJoin 切换为 BroadcastJoin(如果发现小表符合广播条件)。
      • 动态优化倾斜 Join: 自动检测并处理 Shuffle Join 中的数据倾斜问题。
    • 相关参数: spark.sql.adaptive.coalescePartitions.enabledspark.sql.adaptive.coalescePartitions.minPartitionNumspark.sql.adaptive.advisoryPartitionSizeInBytesspark.sql.adaptive.skewJoin.enabledspark.sql.adaptive.skewJoin.skewedPartitionFactorspark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 等。强烈建议在生产环境中启用并适当配置 AQE。
  7. 硬件与集群配置:
    • 使用 SSD: 将 Shuffle 溢出目录 (spark.local.dir) 配置到 SSD 磁盘上能极大提升 Shuffle 写/读的 I/O 性能。这是非常有效的优化。
    • 充足内存: 确保 Executor 有足够内存(spark.executor.memory),特别是当 spark.memory.fraction 分配给 Storage 和 Execution(包含 Shuffle)时。减少溢出到磁盘的次数。
    • 高速网络: 万兆甚至更高带宽、低延迟的网络能显著加速 Shuffle 数据传输。
    • 合理 CPU 核数: 避免单个 Executor 分配过多 CPU 核(如 > 5),因为多个 Task 竞争磁盘/网络 I/O 可能成为瓶颈。通常每个 Executor 配置 3-5 核是一个较好的平衡点。

优化流程建议:

  1. 监控与诊断: 使用 Spark Web UI 仔细分析作业运行情况。重点关注:
    • Shuffle Read/Write 的总数据量和在各 Stage/Task 上的分布。
    • Task 的执行时间分布(识别倾斜)。
    • GC 时间。
    • 是否有溢出到磁盘 (Spill (Memory)Spill (Disk))。
    • Shuffle Write Time / Shuffle Read Time
    • 日志中的 WARN/ERROR 信息(如 FetchFailed, OOM)。
  2. 定位瓶颈: 根据监控信息判断是数据量太大、I/O 慢、网络慢、内存不足还是数据倾斜。
  3. 应用策略: 针对性地选择上述优化策略进行调整。优先考虑减少数据量和调整分区数。
  4. 迭代测试: 修改配置或代码后,在小规模数据或测试集群上运行测试,观察效果。每次最好只修改一个主要配置,以便定位效果。
  5. 利用 AQE: 确保在 Spark 3.x 环境中启用并配置好 AQE,它能自动处理很多棘手的优化问题(小分区合并、倾斜 Join)。

Spark Shuffle 优化原则

类别优化建议
算子选择使用 reduceByKey 代替 groupByKey,选择合适的 Join
分区策略控制合理的并发度、分区数,避免极端数据倾斜
参数调优内存、缓冲区、网络传输参数细致设置
数据倾斜通过打散、随机 key、局部聚合等方式规避热点 key
AQE开启 Spark SQL 的自适应执行,自动处理 Join/倾斜问题
文件合并启用 consolidateFiles 降低磁盘负担

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注