Spark当Collect超大结果集到Driver时出现异常

当Collect超大结果集到Driver时出现异常

现象描述

当Collect超大的结果集到Driver端时会出现如下两种错误:

  • 出现OOM错误。日志信息如下:java.lang.OutOfMemoryError: GC overhead limit exceeded 16/01/25 12:08:56 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveBroadcast(69,true)] in 1 attempts org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#366390194]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280) at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270) at scala.concurrent.Future$class.recover(Future.scala:324) at scala.concurrent.impl.Promise$DefaultPromise.recover(Promise.scala:153) at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:319) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
  • 当结果集出现数据倾斜,有些数据块大于2G时,同时使用kryo进行序列化时会报NegativeArraySizeException错误。日志信息如下:16/02/16 16:55:13 WARN TaskSetManager: Lost task 750.0 in stage 66.0 (TID 33887, datasight-192): com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: values (org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:260) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)

可能原因

  • Driver端OOM 把结果收集到Driver端并打印主要有两步,第一步:使用一个数组存储从各节点收集过来的结果,第二步转换成可打印的格式再打印到屏幕上。结果集在内存中是以java对象形式存在的,内存占用比较大,在转化格式的过程中还会生成很多中间数组,使得driver的内存耗费非常大,很容易出现OOM错误。
  • kryo序列化报NegativeArraySizeException错误 Spark对kryo一次序列化的数据大小进行了限制,最多一次序列化2G数据,超过这个限制就会报如上错误。

定位思路

无。

处理步骤

当出现如上问题时,建议可采用如下方法进行调整。

  • 结果集很大时,不要把结果集拿到driver端,建议将结果集落到磁盘中,避免出现OOM错误。
  • 如果已通过上述操作规避OOM错误,那么NegativeArraySizeException错误也不会出现。如果用户不执行上述建议规避错误,您也可以在Spark客户端配置文件“spark-defaults.conf”中设置序列化器spark.serializer = org.apache.spark.serializer.JavaSerializer,来规避出现NegativeArraySizeException错误。

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注