Spark出现Unable to acquire异常

出现Unable to acquire异常

现象描述

执行Spark SQL语句时,出现java.io.IOException: Unable to acquire […] bytes of memory异常,如下:

WARN TaskSetManager: Lost task 578.2 in stage 30.0 (TID 228063, 8-5-203-1, 244): java.io.IOException: Unable to acquire 16777216 bytes of memory
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:354)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:141)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:109)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68)
    at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
    at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
    at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
    at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
    at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
    at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
    at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:75)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:42)
    at org.apache.spark.scheduler.Task.run(Task.scala:90)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745).

一定概率下,当以上WARN连续导致同一个Task失败4次后,会导致Job级别的失败,如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 537 in stage 30.0 failed 4 times, most recent failure: Lost task 537.3 in stage 30.0 (TID 228865, 8-5-202-7, 650): java.io.IOException: Unable to acquire 16777216 bytes of memory
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:354)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:141)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:109)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68)
    at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
    at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
    at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
    at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
    at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
    at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
    at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:267)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:75)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:42)
    at org.apache.spark.scheduler.Task.run(Task.scala:90)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)               

可能原因

目前Spark Shuffle内存管理存在缺陷:原理上讲,ShuffleMemoryManger给Task分配内存时,根据运行时的Task个数去动态切分可分配的总内存,当一个Task结束后,运行时的Task个数相应减少,此时ShuffleMemoryManger会根据减少后的Task个数重新切分可分配的内存。在某些情况下,在新的Task起来之前,已运行的Task将内存全部占走。

在该场景下,新的Task会申请不到内存,然后触发溢出逻辑溢出当前UnsafeExternalSorter所占的内存,并重试申请动作,但由于其本身所占内存为0,溢出后还是分配不到内存,抛出上述异常,表示Task失败。

失败的Task会进行重试,若其他的Task及时地释放了内存,则Task会重试成功,Job不会失败。如果此时其他Task未及时释放内存,则Task重试失败。当该Task连续4次失败后导致Job失败。

定位思路

无。

处理步骤

进入Spark客户端的“$Spark_Client/conf/spark-defaults.conf”配置文件修改对应配置以规避此问题。

  • 方法一:设置spark.executor.cores=1,将单个Executor内的并行度将为1可规避此问题。
  • 方法二:增大spark.sql.shuffle.partitions,可降低该异常出现的概率。
  • 方法三:减小spark.buffer.pageSize,可降低该异常出现的概率

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注