Spark Streaming调优实践

当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能。有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源。本章我们就来介绍一些能够提高应用性能的参数和配置。另外需要指出的是,优化本身是一个具体性很强的事情,不同的应用及落地场景会有不同的优化方式,并没有一个统一的优化标准。本章我们将一些常用的和在项目中踩过的“坑”总结一下,列举以下常见的优化方式。

数据序列化在分布式应用中,序列化(serialization)对性能的影响是显著的。如果使用一种对象序列化慢、占用字节多的序列化格式,就会严重降低计算效率。通常在Spark中,主要有如下3个方面涉及序列化:

● 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

● 将自定义的类型作为RDD的泛型类型时,所有自定义类型对象都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

● 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER), Spark会将RDD中的每个partition都序列化成一个大的字节数组。而Spark综合考量易用性和性能,提供了下面两种序列化库。

● Java序列化:默认情况下,Spark使用Java的对象输出流框架(ObjectOutputStream framework)来进行对象的序列化,并且可用在任意实现Java.io.Serializable接口的自定义类上。我们可以通过扩展Java.io.Externalizable来更加精细地控制序列化行为。Java序列化方式非常灵活,但是通常序列化速度非常慢而且对于很多类会产生非常巨大的序列化结果。

● Kryo序列化:Spark在2.0.0以上的版本可以使用Kryo库来非常快速地进行对象序列化,Kryo要比Java序列化更快、更紧凑(10倍),但是其不支持所有的Serializable类型,并且在使用自定义类之前必须先注册。

我们可以在初始化SparkConf时,调用conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)来使用Kryo。一旦进行了这个配置,Kryo序列化不仅仅会用在Shuffling操作时worker节点间的数据传递,也会用在RDDs序列化到硬盘的过程。Spark官方解释没有将Kryo作为默认序列化方式的唯一原因是,Kryo必须用户自己注册(注意如果我们不注册自定义类,Kryo也是可以正常运行的,但是它必须存储每个对象的完整类名,这是非常浪费的),但是其推荐在网络频繁传输的应用中使用Kryo。另外值得注意的是,在Spark 2.0.0之后,Spark已经默认将Kryo序列化作为简单类型(基本类型、基本类型的数组及string类型)RDD进行Shuffling操作时传输数据的对象序列化方式。Spark已经自动包含注册了绝大部分Scala的核心类,如果需要向Kryo注册自己的类别,可以使用registerKryoClasses方法。使用Kryo的代码框架如下:

如果我们的对象非常大,可能需要增加Spark.kryoserializer.buffer的配置。同样在Spark Streaming中,通过优化序列化格式可以缩减数据序列化的开销,而在Streaming中还会涉及以下两类数据的序列化。

● 输入数据:在4.4.1节中曾讲过,Spark Streaming中不同于RDD默认是以非序列化的形式存于内存当中,Streaming中由接收器(Receiver)接收而来的数据,默认是以序列化重复形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于Executor的内存当中。而采用这种方式的目的,一方面是由于将输入数据序列化为字节流可以减少垃圾回收(GC)的开销,另一方面对数据的重复可以对Executor节点的失败有更好的容错性。同时需要注意的是,输入数据流一开始是保存在内存当中,当内存不足以存放流式计算依赖的输入数据时,会自动存放于硬盘当中。而在Streaming中这部分序列化是一个很大的开销,接收器必须先反序列化(deserialize)接收到的数据,然后再序列化(serialize)为Spark本身的序列化格式。

● 由Streaming操作产生RDD的持久化:由流式计算产生的RDDs有可能持久化在内存当中,例如由于基于窗口操作的数据会被反复使用,所以会持久化在内存当中。值得注意的是,不同于Spark核心默认使用非序列化的持久化方式(StorageLevel. MEMORY_ONLY),流式计算为了减少垃圾回收(GC)的开销,默认使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在Spark还是在Spark Streaming中,使用Kryo序列化方式,都可以减少CPU和内存的开销。而对于流式计算,如果数据量不是很大,并且不会造成过大的垃圾回收(GC)开销,我们可以考虑利用非序列化对象进行持久化。

例如,我们使用很小的批处理时间间隔,并且没有基于窗口的操作,可以通过显示设置相应的存储级别来关闭持久化数据时的序列化,这样可以减少序列化引起的CPU开销,但是潜在的增加了GC的开销。

2.广播大变量

我们可以看出,不论Spark还是Spark Streaming的应用,在集群节点间进行数据传输时,都会有序列化和反序列化的开销,而如果我们的应用有非常大的对象时,这部分开销是巨大的。比如应用中的任何子任务需要使用Driver节点的一个大型配置查询表,这时就可以考虑将该表通过共享变量的方式,广播到每一个子节点,从而大大减少在传输和序列化上的开销。另外,Spark在Master节点会打印每个任务的序列化对象大小,我们可以通过观察任务的大小,考虑是否需要广播某些大变量。通常一个任务的大小超过20KB,是值得去优化的。当我们将大型的配置查询表广播出去时,每个节点可以读取配置项进行任务计算,那么假设配置发生了动态改变时,如何通知各个子节点配置表更改了呢?(尤其是对于流式计算的任务,重启服务代价还是蛮大的。)

广播变量是只读的,也就是说广播出去的变量没法再修改,那么应该怎么解决这个问题呢?我们可以利用Spark中的unpersist()函数,Spark通常会按照LRU(leastRecently Used)即最近最久未使用原则对老数据进行删除,我们并不需要操作具体的数据,但如果是手动删除,可以使用unpersist()函数。

3.数据处理和接收时的并行度

作为分布式系统,增加接收和处理数据的并行度是提高整个系统性能的关键,也能够充分发挥集群机器资源。关于partition和parallelism。partition指的就是数据分片的数量,每一次Task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多Executor的计算能力无法充分利用;但是如果partition太大了则会导致分片太多,执行效率降低。在执行Action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及Shuffle,因此这个parallelism的参数没有影响)。由上述可得,partition和parallelism这两个概念密切相关,都是涉及数据分片,作用方式其实是统一的。通过Spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量,如reduceByKey和reduceByKeyAndWindow。

Spark Streaming接收Kafka数据的方式,这个过程有一个数据反序列化并存储到Spark的开销,如果数据接收成为了整个系统的瓶颈,那么可以考虑增加数据接收的并行度。每个输入DStream会创建一个单一的接收器(receiver在worker节点运行)用来接收一个单一的数据流。而对于接收多重数据的情况,可以创建多个输入DStream用来接收源数据流的不同分支(partitions)。如果我们利用Receiver的形式接收Kafka,一个单一的Kafka输入DStream接收了两个不同topic的数据流,我们为了提高并行度可以创建两个输入流,分别接收其中一个topic上的数据。这样就可以创建两个接收器来并行地接收数据,从而提高整体的吞吐量。而之后对于多个DStreams,可以通过union操作并为一个DStream,之后便可以在这个统一的输入DStream上进行操作,代码示例如下:

如果采用Direct连接方式,前面讲过Spark中的partition和Kafka中的partition是一一对应的,但一般默认设置为Kafka中partition的数量,这样来达到足够并行度以接收Kafka数据。

4.设置合理的批处理间隔

对于一个Spark Streaming应用,只有系统处理数据的速度能够赶上数据接收的速度,整个系统才能保持稳定,否则就会造成数据积压。换句话说,即每个batch的数据一旦生成就需要被尽快处理完毕。这一点我们可以通过Spark监控界面进行查看,比较批处理时间必须小于批处理间隔。通过设置合理的批处理大小(batch size),使得每批数据能够在接收后被尽快地处理完成(即数据处理的速度赶上数据生成的速度)。如何选取合适的批处理时间呢?一个好的方法是:先保守地设置一个较大的批处理间隔(如5~10s),以及一个很低的数据速率,来观测系统是否能够赶上数据传输速率。我们可以通过查看每个处理好的batch的端到端延迟来观察,也可以看全局延迟来观察(可以在Spark log4j的日志里或者使用StreamingListener接口,也可以直接在UI界面查看)。如果延迟保持在一个相对稳定的状态,则整个系统是稳定的,否则延迟不断上升,那说明整个系统是不稳定的。在实际场景中,也可以直接观察系统正在运行的Spark监控界面来判断系统的稳定性。

5. 内存优化内存

优化是在所有应用落地中必须经历的话题,虽然Spark在内存方面已经为开发者做了很多优化和默认设置,但是我们还是需要针对具体的情况进行调试。在优化内存的过程中需要从3个方面考虑这个问题:对象本身需要的内存;访问这些对象的内存开销;垃圾回收(GC garbagecollection)导致的开销。通常来说,对于Java对象而言,有很快的访问速度,但是很容易消耗原始数据2~5倍以上的内存空间,可以归结为以下几点原因:

● 每个独立的Java对象,都会有一个“对象头”,大约16个字节用来保存一些基本信息,如指向类的指针,对于一个只包含很少数据量在内的对象(如一个Int类型数据),这个开销是相对巨大的。

● Java的String对象会在原始数据的基础上额外开销40个字节,因为除了字符数组(Chars array)本身之外,还需要保存如字符串长度等额外信息,而且由于String内部存储字符时是按照UTF-16格式编码的,所以一个10字符的字符串开销很容易超过60个字符。

● 对于集合类(collection classes),如HashMap、LinkedList,通常使用链表的形式将数据结构链在一起,那么对于每一个节点(entry,如Map.Entry)都会有一个包装器(wrapper),而这个包装器对象不仅包含对象头,还会保存指向下一个节点的指针(每个8字节)。

● 熟悉Java的开发者应该知道,Java数据类型分为基本类型和包装类型,对于int、long等基本类型是直接在栈中分配空间,如果我们想将这些类型用在集合类中(如Map),需要使用对基本数据类型打包(当然这是Java的一个自动过程),而打包后的基本数据类型就会产生额外的开销。针对以上内存优化的基本问题,接下来首先介绍Spark中如何管理内存,之后介绍一些能够在具体应用中更加有效地使用内存的具体策略,例如,如何确定合适的内存级别,如何改变数据结构或将数据存储为序列化格式来节省内存等,也会从Spark的缓存及Java的垃圾回收方面进行分析,另外,也会对SparkStreaming进行分析。

5.1 内存管理

Spark对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于Shuffle类操作、join操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送。在Spark内部执行和存储分享同一片内存空间(M),当没有执行类内存被使用时,存储类内存可以使用全部的内存空间,反之亦然。执行类内存可以剥夺存储类内存的空间,但是有一个前提是,存储类内存所占空间不得低于某一个阈值R,也就是说R指定了M中的一块子空间块是永远不会被剥夺的。而另一方面由于实现上的复杂性,存储类内存是不可以剥夺执行类内存的。Spark的这种设计方式确保了系统一些很好的特性:首先,如果应用不需要缓存数据,那么所有的空间都可以用作执行类内存,可以一定程度上避免不必要的内存不够用时溢出到硬盘的情况;其次,如果应用需要使用缓存数据,会有最小的内存空间R能够保证这部分数据块免于被剥夺;最后,这种方式对于使用者而言是完全黑盒的,使用者不需要了解内部如何根据不同的任务负载来进行内存划分。Spark提供了两个相关的配置,但是大多数情况下直接使用默认值就能满足大部分负载情况:

● Spark Memory.Fraction表示M的大小占整个JVM(Java Virtue Machine)堆空间的比例(默认是0.6),剩余的空间(40%)被用来保存用户的数据结构及Spark内部的元数据(metadata),另一方面预防某些异常数据记录造成的OOM(Out of Memory)错误。

● Spark.Memory.StorageFraction表示R的大小占整个M的比例(默认是0.5), R是存储类内存在M中占用的空间,其中缓存的数据块不会被执行类内存剥夺。

5.2 优化策略

当我们需要初步判断内存的占用情况时,可以创建一个RDD,然后将其缓存(cache)起来,然后观察网页监控页面的存储页部分,就可以看出RDD占用了多少内存。而对于特殊的对象,我们可以调用SizeEstimator的estimate()方法来评估内存消耗,这对于实验不同数据层的内存消耗,以及判断广播变量在每个Executor堆上所占用的内存是非常有效的。当我们了解了内存的消耗情况后,发现占用内存过大,可以着手做一些优化,一方面可以在数据结构方面进行优化。首先需要注意的是,我们要避免本章开头提到的Java本身数据结构的头部开销,比如基于指针的数据结构或者包装器类型,有以下方式可以进行优化:

● 在设计数据结构时,优先使用基本数据类型及对象数组等,避免使用Java或者Scala标准库当中的集合类(如HashMap),在fastutil库中,为基本数据类型提供了方便的集合类接口,这些接口也兼容Java标准库。

● 尽可能避免在数据结构中嵌套大量的小对象和指针。

● 考虑使用数值类ID或者枚举对象来代替字符串类型作为主键(Key)。

● 如果我们的运行时内存小于32GB,可以加上JVM配置-XX:+UseCompressedOops将指针的占用空间由8个字节压缩到4个字节,我们也可以在Spark-env.sh中进行配置。

假设我们通过以上策略还是发现对象占用了过大的内存,可以用一个非常简单的方式来降低内存使用,就是将对象以序列化的形式(serialized form)存储,在RDD的持久化接口中使用序列化的存储级别,如MEMORY_ONLY_SER, Spark便会将每个RDD分区存储为一个很大的字节数组。而这种方式会使得访问数据的速度有所下降,因为每个对象访问时都需要有一个反序列化的过程。在7.1节中我们已经介绍过,优先使用Kryo序列化方式,其占用大小远低于Java本身的序列化方式。

5.3 垃圾回收(GC)优化

如果我们在应用中进行了频繁的RDD变动,那么JVM的垃圾回收会成为一个问题(也就是说,假设在程序中只创建了一个RDD,后续所有操作都围绕这个RDD,那么垃圾回收就不存在问题)。当Java需要通过删除旧对象来为新对象开辟空间时,它便会扫描我们曾创建的所有对象并找到不再使用的对象。所以垃圾回收的开销是和Java对象的个数成比例的,我们要尽可能地使用包含较少对象的数据结构(如使用Int数组代替LinkedList)来降低这部分开销。另外前面提到的用序列化形式存储也是一个很好的方法,序列化后每个对象在每个RDD分区下仅有一个对象(一个字节数组)。注意当GC开销成为瓶颈时,首先要尝试的便是序列化缓存(serialized caching)。在做GC优化时,我们首先需要了解GC发生的频率以及其所消耗的时间。这可以通过在Java选项中加入-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps来实现;之后当Spark任务运行后,便可以在Worker日志中看到GC发生时打印的信息。注意这些日志是打印在集群中的Worker节点上的(在工作目录的stdout文件中),而非Driver程序。为了进一步优化GC,首先简单介绍下Java虚拟机内部是如何进行内存管理的。

(1)Java对象是存储在堆空间内的,堆空间被分为两部分,即年轻区域(Young region)和老年区域(Old region),其中年轻代(Young generation)会用来存储短生命周期的对象,而老年代(Old generation)会用来存储较长生命周期的对象。

(2)年轻代的区域又被分为3个部分[Eden, Survivor1,Survivor2]。

(3)一个简单的GC流程大致是:当Eden区域满了,一次小型GC过程会将Eden和Survivor1中还存活的对象复制到Survivor2区域上,Survivor区域是可交换的(即来回复制),当一个对象存活周期已足够长或者Survivor2区域已经满时,那么它们会被移动到老年代上,而当老年代的区域也满了时,就会触发一次完整的GC过程。Java的这种GC机制主要是基于程序中创建的大多数对象,都会在创建后被很快销毁,只有极少数对象会存活下来,所以其分为年轻代和老年代两部分,而这两部分GC的方式也是不同的,其时间复杂度也是不同的,年轻代会更加快一些,感兴趣的读者可以进一步查阅相关资料。基于以上原因,Spark在GC方面优化的主要目标是:只有长生命周期的RDD会被存储在老年代上,而年轻代上有足够的空间来存储短生命周期的对象,从而尽可能避免任务执行时创建的临时对象触发完整GC流程。我们可以通过以下步骤来一步步优化:

(1)通过GC统计信息观察是否存在过于频繁的GC操作,如果在任务完成前,完整的GC操作被调用了多次,那么说明可执行任务并没有获得足够的内存空间。

(2)如果触发了过多的小型GC,而完整的GC操作并没有调用很多次,那么给Eden区域多分配一些内存空间会有所帮助。我们可以根据每个任务所需内存大小来预估Eden的大小,如果Eden设置大小为E,可以利用配置项-Xmn=4/3*E来对年轻代的区域大小进行设置(其中4/3的比例是考虑到survivor区域所需空间)。(3)如果我们观察GC打印的统计信息,发现老年代接近存满,那么就需要改变spark.memory.fraction来减少存储类内存(用于caching)的占用,因为与其降低任务的执行速度,不如减少对象的缓存大小。另一个可选方案是减少年轻代的大小,即通过-Xmn来进行配置,也可以通过JVM的NewRatio参数进行调整,大多数JVM的该参数的默认值是2,意思是老年代占整个堆内存的2/3,这个比例需要大于Spark.Memory.Fraction。

(4)通过加入-XX:+UserG1GC来使用G1GC垃圾回收器,这可以一定程度提高GC的性能。另外注意对于executor堆内存非常大的情况,一定通过-XX:G1HeapRegionSize来增加G1区域的大小。

针对以上步骤我们举一个例子,如果我们的任务是从HDFS当中读取数据,任务需要的内存空间可以通过从HDFS当中读取的数据块大小来进行预估,一般解压后的数据块大小会是原数据块的2~3倍,所以如果我们希望3、4个任务同时运行在工作空间中,假设每个HDFS块大小是128MB,那么需要将Eden大小设置为4×3×128MB。改动之后,我们可以监控GC的频率和时间消耗,看看有没有达到优化的效果。对于优化GC,主要还是从降低全局GC的频率出发,executor中对于GC优化的配置可以通过spark.executor.extraJavaOptions来配置。

5.4 Spark Streaming内存优化

前面介绍了Spark中的优化策略和关于GC方面的调优,对于Spark Streaming的应用程序,这些策略也都是适用的,除此之外还会有一些其他方面的优化点。对于Spark Streaming应用所需要的集群内存,很大程度上取决于要使用哪种类型的transformation操作。比如,假设我们想使用10分钟数据的窗口操作,那么我们的集群必须有足够的空间能够保存10分钟的全部数据;亦或,我们在大量的键值上使用了updateStateByKey操作,那么所需要的内存空间会较大。而如果我们仅仅使用简单的Map、Filter、Store操作,那么所需空间会较小。默认情况下,接收器接收来的数据会以StorageLevel.MEMORY_AND_DISK_SER_2的格式存储,那么如果内存不足时,数据就会序列化到硬盘上,这样会损失SparkStreaming应用的性能。所以通常建议为Spark Streaming应用分配充足的内存,可以在小规模数据集上进行测试和判断。另一方面与Spark程序有显著区别的是,Spark Streaming程序对实时性要求会较高,所以我们需要尽可能降低JVM垃圾回收所导致的延迟。基于此,我们可以通过以下几个参数对内存使用和GC开销进行优化调整。

● DStream的持久化级别:输入数据默认是持久化为字节流的,因为相较于反序列化的开销,其更会降低内存的使用并且减少GC的开销。所以优先使用Kryo序列化方式,可以大大降低序列化后的尺寸和内存开销。另外,如果需要更进一步减少内存开销,可以通过配置spark.rdd.compress进行更进一步的压缩(当然对于目前的集群机器,大多数内存都足够了)。

● 及时清理老数据:默认情况下所有的输入数据和由DStream的Transormation操作产生的持久RDD会被自动清理,即Spark Streaming会决定何时对数据进行清理。例如,假设我们使用10分钟的窗口操作,Spark Streaming会保存之前10分钟的所有数据,并及时清理过时的老数据。数据保存的时间可以通过stremingContext.remember进行设置。

● CMS垃圾回收器:不同于之前我们在Spark中的建议,由于需要减少GC间的停顿,所以这里建议使用并发标记清除类的GC方式。即使并发GC会降低全局系统的生产吞吐量,但是使用这种GC可以使得每个Batch的处理时间更加一致(不会因为某个Batch处理时发生了GC,而导致处理时间剧增)。我们需要确保在Driver节点(在spark-submit中使用—driver-java-options)和Executor节点(在Spark配置中使用spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都设置了CMS GC方式。

● 其他减少GC开销的方式有:可以通过OFF_HEAP存储级别的RDD持久化方式,以及可以在Executor上使用更小的堆内存,从而降低每个JVM堆垃圾回收的压力。

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注