运行Spark Streaming应用时出现内存不足的问题

运行Spark Streaming应用时出现内存不足的问题

现象描述

某Spark Streaming应用对每个批次不大于3000M的数据进行wordcount,即使每个executor给予30G内存,执行一段时间后还是会发生内存不足。

日志信息如下:

2016-02-04 20:19:43,458 | ERROR | [Thread-29] | Uncaught exception in thread Thread[Thread-29,5,main] | org.apache.spark.Logging$class.logError(Logging.scala:96)
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
        at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
        at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
        at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:173)

可能原因

Spark Streaming从Kafka接收数据的方式有两种:

  • Receiver-based Approach
  • Direct Approach (No Receivers)

上述问题只有Receiver-based的方式会出现,Direct的方式不会出现该问题。

在Spark Streaming应用中,每一个批次会生成一个job。如果job的处理时间大于批次的时间间隔(批次时间间隔在Spark Streaming应用中定义),则从数据源(即Kafka)接收的数据就会累积,最后造成任务的不断积压,导致executor端内存溢出。

定位思路

无。

处理步骤

当出现如上问题时,建议可采用如下两种方法进行调整,两种方法可同时使用:

  • 适当缩短批次的时间,使得接收到的数据量不要太大。
  • 根据任务量增大内存,使得job的处理时间加快,保证job的处理时间比批次的时间短。

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注