Spark出现Unable to acquire异常
出现Unable to acquire异常
现象描述
执行Spark SQL语句时,出现java.io.IOException: Unable to acquire […] bytes of memory异常,如下:
WARN TaskSetManager: Lost task 578.2 in stage 30.0 (TID 228063, 8-5-203-1, 244): java.io.IOException: Unable to acquire 16777216 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:354) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:141) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:109) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68) at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50) at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83) at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278) at scala.collection.AbstractTraversable.collect(Traversable.scala:105) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:75) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:42) at org.apache.spark.scheduler.Task.run(Task.scala:90) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745).
一定概率下,当以上WARN连续导致同一个Task失败4次后,会导致Job级别的失败,如下:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 537 in stage 30.0 failed 4 times, most recent failure: Lost task 537.3 in stage 30.0 (TID 228865, 8-5-202-7, 650): java.io.IOException: Unable to acquire 16777216 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:354) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:141) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:109) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68) at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50) at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83) at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278) at scala.collection.AbstractTraversable.collect(Traversable.scala:105) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:267) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:75) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:42) at org.apache.spark.scheduler.Task.run(Task.scala:90) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
可能原因
目前Spark Shuffle内存管理存在缺陷:原理上讲,ShuffleMemoryManger给Task分配内存时,根据运行时的Task个数去动态切分可分配的总内存,当一个Task结束后,运行时的Task个数相应减少,此时ShuffleMemoryManger会根据减少后的Task个数重新切分可分配的内存。在某些情况下,在新的Task起来之前,已运行的Task将内存全部占走。
在该场景下,新的Task会申请不到内存,然后触发溢出逻辑溢出当前UnsafeExternalSorter所占的内存,并重试申请动作,但由于其本身所占内存为0,溢出后还是分配不到内存,抛出上述异常,表示Task失败。
失败的Task会进行重试,若其他的Task及时地释放了内存,则Task会重试成功,Job不会失败。如果此时其他Task未及时释放内存,则Task重试失败。当该Task连续4次失败后导致Job失败。
定位思路
无。
处理步骤
进入Spark客户端的“$Spark_Client/conf/spark-defaults.conf”配置文件修改对应配置以规避此问题。
- 方法一:设置spark.executor.cores=1,将单个Executor内的并行度将为1可规避此问题。
- 方法二:增大spark.sql.shuffle.partitions,可降低该异常出现的概率。
- 方法三:减小spark.buffer.pageSize,可降低该异常出现的概率