gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档Spark

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "Spark"
  • ( 页面5 )
flume, Spark 2月 20,2022

大数据运维一些常见批量操作命令

在使用flume过程,由于故障停止采集,堆积文件很多,想迁移到新目录,但如果文件数目太多的话,想直接用mv 命令会报错。这时我们需要利用管道技术和xargs命令。

xargs(英文全拼: eXtended ARGuments)是给命令传递参数的一个过滤器,也是组合多个命令的一个工具。

xargs 可以将管道或标准输入(stdin)数据转换成命令行参数,也能够从文件的输出中读取数据。

常用操作1:把当前目录的文件移到新的目录newdir

find . -name '2106*.json' | xargs -i mv {} ../newdir

常用操作2:把当前目录json.1后缀的批量删除

find . -name '*.json.1' | xargs -i rm -f {}

常用操作3:shell批量kill掉java进程

ps aux | grep test.jar | grep -v grep | awk '{print $2}' | xargs kill -9
作者 east
Spark 1月 6,2022

Spark如何在生产环境调试

接手别人开发的spark程序,大概弄懂整个流程,但一些细节总是猜不透,在生产环境运行效果也达不到理想。

想去修改,遇到下面的问题:

一、由于生产环境是运行在linux服务上的,在华为HD Insight大数据平台上,在开发机不知怎样调试。

解决方式:后来发现其实在idea是可以远程调试的:

  1. 打开工程,在菜单栏中选择“Run > Edit Configurations”。
  2. 在弹出的配置窗口中用鼠标左键单击左上角的号,在下拉菜单中选择Remote,如图1所示。 图1 选择Remote

3. 选择对应要调试的源码模块路径,并配置远端调试参数Host和Port,如图2所示。
其中Host为Spark运行机器IP地址,Port为调试的端口号(确保该端口在运行机器上没被占用)。

说明: 当改变Port端口号时,For JDK1.4.x对应的调试命令也跟着改变,比如Port设置为5006,对应调试命令会变更为-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5006,这个调试命令在启动Spark程序时要用到。

4.执行以下命令,远端启动Spark运行SparkPi。 ./spark-submit –master yarn-client –driver-java-options “-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5006” –class org.apache.spark.examples.SparkPi /opt/FI-Client/Spark2x/spark/examples/jars/spark-examples_2.11-2.1.0.jar 用户调试时需要把–class和jar包换成自己的程序,-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5006需要换成3获取到的For JDK1.4.x对应的调试命令。

5.设置调试断点。 在IDEA代码编辑窗口左侧空白处单击鼠标左键设置相应代码行断点,如图4所示,在SparkPi.scala的29行设置断点。

6.启动调试。 在IDEA菜单栏中选择“Run > Debug ‘Unnamed’”开启调试窗口,接着开始SparkPi的调试,比如单步调试、查看调用栈、跟踪变量值等,如图5所示。

二、在spark executor执行的,如果看调试结果?

解决方式是在相应的rdd加上collect()方法,把结果传送到driver来看

作者 east
Spark 12月 6,2021

Spark Streaming多个输入流

由于业务需要,一个地方部署1个Spark Streaming程序,由于业务扩展部署了多个地方,导致大数据平台的yarn资源不足了,CPU和内存经常是100%的。而且多套只是配置不同的程序,一旦有修改,维护起来也不方便。于是想到提升Spark Streaming的并行度,同时接收多个Dstream的输入。

通过网络接收数据(如Kafka、Flume、套接字等)需要将数据反序列化并存储在Spark上,如果数据接收成为系统中的瓶颈,则需要并行接收数据。主要通过提升Receiver的并发度和调整Receiver的RDD数据分区时间隔。提升Receiver的并发度:在Worker节点上对每个输入DStream创建一个Receiver并运行,以接收一个数据流。通过创建多个输入DStream并配置从数据源接收不同分区的数据流,从而实现接收多数据流。例如,一个单Kafka输入DStream接收两个主题的数据,可以分成两个Kafka的输入流,每个仅仅接收一个主题。输入DStream运行在两个Worker节点的接收器上,从而能够并行接受并行,提高整体的吞吐量。多DStream可以通过联合(union)在一起从而创建一个DStream,这样一些应用在一个输入DStream的转换操作便可以用在联合后的DStream上。

JavaDstream<string> sources1=ssc.receiverstream(new JavacustomReceiver2(ip1, port, StorageLevel.MEMORY_ONLY-2()));

JavaDStream<String> sources2 = ssc.receiverStream(new JavaCustomReceiver2(ip2, port, StorageLevel.MEMORY_ONLY-2()));
JavaDStream<String> sources3 = ssc.receiverstream(new JavaCustomreceiver2(whip, port, StorageLeve1.MEMORY_ONLY-2()));

Javadstream<string> sources3 = ssc.socketTextstream(ip3, port, storagetevel.MEMORY ONLY2())); 
JavaDStream<String> sources = sources1.union(sources2).union(sources3);
作者 east
bug清单, Kafka, Spark 6月 10,2021

运行 Spark Streaming出现”Could not find KafkaClient entry in the JAAS configuration”

在使用FusionInsight HD大数据平台,用Spark Streaming来处理数据接入,kafka作为消费者,运行程序时出现”Could not find KafkaClient entry in the JAAS configuration”,当时怀疑是FusionInsight HD 的客户端相关配置有问题。

采用替换法思维,在另一台已经验证 FusionInsight HD 的客户端没问题的服务上运行,果然这个问题没做出现,仔细对比了这2台服务FusionInsight HD 的客户端 的配置,发现在spark配置文件(hd安装目录/Spark2x/spark/conf/Jaas.conf)要修改为下面的配置:

KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
principal="大数据平台账号"
useTicketCache=false
keyTab="user.keytab的路径"
storeKey=true;
};

作者 east
Spark 3月 3,2021

Spark Streaming调优实践

当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能。有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源。本章我们就来介绍一些能够提高应用性能的参数和配置。另外需要指出的是,优化本身是一个具体性很强的事情,不同的应用及落地场景会有不同的优化方式,并没有一个统一的优化标准。本章我们将一些常用的和在项目中踩过的“坑”总结一下,列举以下常见的优化方式。

数据序列化在分布式应用中,序列化(serialization)对性能的影响是显著的。如果使用一种对象序列化慢、占用字节多的序列化格式,就会严重降低计算效率。通常在Spark中,主要有如下3个方面涉及序列化:

● 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

● 将自定义的类型作为RDD的泛型类型时,所有自定义类型对象都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

● 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER), Spark会将RDD中的每个partition都序列化成一个大的字节数组。而Spark综合考量易用性和性能,提供了下面两种序列化库。

● Java序列化:默认情况下,Spark使用Java的对象输出流框架(ObjectOutputStream framework)来进行对象的序列化,并且可用在任意实现Java.io.Serializable接口的自定义类上。我们可以通过扩展Java.io.Externalizable来更加精细地控制序列化行为。Java序列化方式非常灵活,但是通常序列化速度非常慢而且对于很多类会产生非常巨大的序列化结果。

● Kryo序列化:Spark在2.0.0以上的版本可以使用Kryo库来非常快速地进行对象序列化,Kryo要比Java序列化更快、更紧凑(10倍),但是其不支持所有的Serializable类型,并且在使用自定义类之前必须先注册。

我们可以在初始化SparkConf时,调用conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)来使用Kryo。一旦进行了这个配置,Kryo序列化不仅仅会用在Shuffling操作时worker节点间的数据传递,也会用在RDDs序列化到硬盘的过程。Spark官方解释没有将Kryo作为默认序列化方式的唯一原因是,Kryo必须用户自己注册(注意如果我们不注册自定义类,Kryo也是可以正常运行的,但是它必须存储每个对象的完整类名,这是非常浪费的),但是其推荐在网络频繁传输的应用中使用Kryo。另外值得注意的是,在Spark 2.0.0之后,Spark已经默认将Kryo序列化作为简单类型(基本类型、基本类型的数组及string类型)RDD进行Shuffling操作时传输数据的对象序列化方式。Spark已经自动包含注册了绝大部分Scala的核心类,如果需要向Kryo注册自己的类别,可以使用registerKryoClasses方法。使用Kryo的代码框架如下:

如果我们的对象非常大,可能需要增加Spark.kryoserializer.buffer的配置。同样在Spark Streaming中,通过优化序列化格式可以缩减数据序列化的开销,而在Streaming中还会涉及以下两类数据的序列化。

● 输入数据:在4.4.1节中曾讲过,Spark Streaming中不同于RDD默认是以非序列化的形式存于内存当中,Streaming中由接收器(Receiver)接收而来的数据,默认是以序列化重复形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于Executor的内存当中。而采用这种方式的目的,一方面是由于将输入数据序列化为字节流可以减少垃圾回收(GC)的开销,另一方面对数据的重复可以对Executor节点的失败有更好的容错性。同时需要注意的是,输入数据流一开始是保存在内存当中,当内存不足以存放流式计算依赖的输入数据时,会自动存放于硬盘当中。而在Streaming中这部分序列化是一个很大的开销,接收器必须先反序列化(deserialize)接收到的数据,然后再序列化(serialize)为Spark本身的序列化格式。

● 由Streaming操作产生RDD的持久化:由流式计算产生的RDDs有可能持久化在内存当中,例如由于基于窗口操作的数据会被反复使用,所以会持久化在内存当中。值得注意的是,不同于Spark核心默认使用非序列化的持久化方式(StorageLevel. MEMORY_ONLY),流式计算为了减少垃圾回收(GC)的开销,默认使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在Spark还是在Spark Streaming中,使用Kryo序列化方式,都可以减少CPU和内存的开销。而对于流式计算,如果数据量不是很大,并且不会造成过大的垃圾回收(GC)开销,我们可以考虑利用非序列化对象进行持久化。

例如,我们使用很小的批处理时间间隔,并且没有基于窗口的操作,可以通过显示设置相应的存储级别来关闭持久化数据时的序列化,这样可以减少序列化引起的CPU开销,但是潜在的增加了GC的开销。

2.广播大变量

我们可以看出,不论Spark还是Spark Streaming的应用,在集群节点间进行数据传输时,都会有序列化和反序列化的开销,而如果我们的应用有非常大的对象时,这部分开销是巨大的。比如应用中的任何子任务需要使用Driver节点的一个大型配置查询表,这时就可以考虑将该表通过共享变量的方式,广播到每一个子节点,从而大大减少在传输和序列化上的开销。另外,Spark在Master节点会打印每个任务的序列化对象大小,我们可以通过观察任务的大小,考虑是否需要广播某些大变量。通常一个任务的大小超过20KB,是值得去优化的。当我们将大型的配置查询表广播出去时,每个节点可以读取配置项进行任务计算,那么假设配置发生了动态改变时,如何通知各个子节点配置表更改了呢?(尤其是对于流式计算的任务,重启服务代价还是蛮大的。)

广播变量是只读的,也就是说广播出去的变量没法再修改,那么应该怎么解决这个问题呢?我们可以利用Spark中的unpersist()函数,Spark通常会按照LRU(leastRecently Used)即最近最久未使用原则对老数据进行删除,我们并不需要操作具体的数据,但如果是手动删除,可以使用unpersist()函数。

3.数据处理和接收时的并行度

作为分布式系统,增加接收和处理数据的并行度是提高整个系统性能的关键,也能够充分发挥集群机器资源。关于partition和parallelism。partition指的就是数据分片的数量,每一次Task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多Executor的计算能力无法充分利用;但是如果partition太大了则会导致分片太多,执行效率降低。在执行Action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及Shuffle,因此这个parallelism的参数没有影响)。由上述可得,partition和parallelism这两个概念密切相关,都是涉及数据分片,作用方式其实是统一的。通过Spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量,如reduceByKey和reduceByKeyAndWindow。

Spark Streaming接收Kafka数据的方式,这个过程有一个数据反序列化并存储到Spark的开销,如果数据接收成为了整个系统的瓶颈,那么可以考虑增加数据接收的并行度。每个输入DStream会创建一个单一的接收器(receiver在worker节点运行)用来接收一个单一的数据流。而对于接收多重数据的情况,可以创建多个输入DStream用来接收源数据流的不同分支(partitions)。如果我们利用Receiver的形式接收Kafka,一个单一的Kafka输入DStream接收了两个不同topic的数据流,我们为了提高并行度可以创建两个输入流,分别接收其中一个topic上的数据。这样就可以创建两个接收器来并行地接收数据,从而提高整体的吞吐量。而之后对于多个DStreams,可以通过union操作并为一个DStream,之后便可以在这个统一的输入DStream上进行操作,代码示例如下:

如果采用Direct连接方式,前面讲过Spark中的partition和Kafka中的partition是一一对应的,但一般默认设置为Kafka中partition的数量,这样来达到足够并行度以接收Kafka数据。

4.设置合理的批处理间隔

对于一个Spark Streaming应用,只有系统处理数据的速度能够赶上数据接收的速度,整个系统才能保持稳定,否则就会造成数据积压。换句话说,即每个batch的数据一旦生成就需要被尽快处理完毕。这一点我们可以通过Spark监控界面进行查看,比较批处理时间必须小于批处理间隔。通过设置合理的批处理大小(batch size),使得每批数据能够在接收后被尽快地处理完成(即数据处理的速度赶上数据生成的速度)。如何选取合适的批处理时间呢?一个好的方法是:先保守地设置一个较大的批处理间隔(如5~10s),以及一个很低的数据速率,来观测系统是否能够赶上数据传输速率。我们可以通过查看每个处理好的batch的端到端延迟来观察,也可以看全局延迟来观察(可以在Spark log4j的日志里或者使用StreamingListener接口,也可以直接在UI界面查看)。如果延迟保持在一个相对稳定的状态,则整个系统是稳定的,否则延迟不断上升,那说明整个系统是不稳定的。在实际场景中,也可以直接观察系统正在运行的Spark监控界面来判断系统的稳定性。

5. 内存优化内存

优化是在所有应用落地中必须经历的话题,虽然Spark在内存方面已经为开发者做了很多优化和默认设置,但是我们还是需要针对具体的情况进行调试。在优化内存的过程中需要从3个方面考虑这个问题:对象本身需要的内存;访问这些对象的内存开销;垃圾回收(GC garbagecollection)导致的开销。通常来说,对于Java对象而言,有很快的访问速度,但是很容易消耗原始数据2~5倍以上的内存空间,可以归结为以下几点原因:

● 每个独立的Java对象,都会有一个“对象头”,大约16个字节用来保存一些基本信息,如指向类的指针,对于一个只包含很少数据量在内的对象(如一个Int类型数据),这个开销是相对巨大的。

● Java的String对象会在原始数据的基础上额外开销40个字节,因为除了字符数组(Chars array)本身之外,还需要保存如字符串长度等额外信息,而且由于String内部存储字符时是按照UTF-16格式编码的,所以一个10字符的字符串开销很容易超过60个字符。

● 对于集合类(collection classes),如HashMap、LinkedList,通常使用链表的形式将数据结构链在一起,那么对于每一个节点(entry,如Map.Entry)都会有一个包装器(wrapper),而这个包装器对象不仅包含对象头,还会保存指向下一个节点的指针(每个8字节)。

● 熟悉Java的开发者应该知道,Java数据类型分为基本类型和包装类型,对于int、long等基本类型是直接在栈中分配空间,如果我们想将这些类型用在集合类中(如Map),需要使用对基本数据类型打包(当然这是Java的一个自动过程),而打包后的基本数据类型就会产生额外的开销。针对以上内存优化的基本问题,接下来首先介绍Spark中如何管理内存,之后介绍一些能够在具体应用中更加有效地使用内存的具体策略,例如,如何确定合适的内存级别,如何改变数据结构或将数据存储为序列化格式来节省内存等,也会从Spark的缓存及Java的垃圾回收方面进行分析,另外,也会对SparkStreaming进行分析。

5.1 内存管理

Spark对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于Shuffle类操作、join操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送。在Spark内部执行和存储分享同一片内存空间(M),当没有执行类内存被使用时,存储类内存可以使用全部的内存空间,反之亦然。执行类内存可以剥夺存储类内存的空间,但是有一个前提是,存储类内存所占空间不得低于某一个阈值R,也就是说R指定了M中的一块子空间块是永远不会被剥夺的。而另一方面由于实现上的复杂性,存储类内存是不可以剥夺执行类内存的。Spark的这种设计方式确保了系统一些很好的特性:首先,如果应用不需要缓存数据,那么所有的空间都可以用作执行类内存,可以一定程度上避免不必要的内存不够用时溢出到硬盘的情况;其次,如果应用需要使用缓存数据,会有最小的内存空间R能够保证这部分数据块免于被剥夺;最后,这种方式对于使用者而言是完全黑盒的,使用者不需要了解内部如何根据不同的任务负载来进行内存划分。Spark提供了两个相关的配置,但是大多数情况下直接使用默认值就能满足大部分负载情况:

● Spark Memory.Fraction表示M的大小占整个JVM(Java Virtue Machine)堆空间的比例(默认是0.6),剩余的空间(40%)被用来保存用户的数据结构及Spark内部的元数据(metadata),另一方面预防某些异常数据记录造成的OOM(Out of Memory)错误。

● Spark.Memory.StorageFraction表示R的大小占整个M的比例(默认是0.5), R是存储类内存在M中占用的空间,其中缓存的数据块不会被执行类内存剥夺。

5.2 优化策略

当我们需要初步判断内存的占用情况时,可以创建一个RDD,然后将其缓存(cache)起来,然后观察网页监控页面的存储页部分,就可以看出RDD占用了多少内存。而对于特殊的对象,我们可以调用SizeEstimator的estimate()方法来评估内存消耗,这对于实验不同数据层的内存消耗,以及判断广播变量在每个Executor堆上所占用的内存是非常有效的。当我们了解了内存的消耗情况后,发现占用内存过大,可以着手做一些优化,一方面可以在数据结构方面进行优化。首先需要注意的是,我们要避免本章开头提到的Java本身数据结构的头部开销,比如基于指针的数据结构或者包装器类型,有以下方式可以进行优化:

● 在设计数据结构时,优先使用基本数据类型及对象数组等,避免使用Java或者Scala标准库当中的集合类(如HashMap),在fastutil库中,为基本数据类型提供了方便的集合类接口,这些接口也兼容Java标准库。

● 尽可能避免在数据结构中嵌套大量的小对象和指针。

● 考虑使用数值类ID或者枚举对象来代替字符串类型作为主键(Key)。

● 如果我们的运行时内存小于32GB,可以加上JVM配置-XX:+UseCompressedOops将指针的占用空间由8个字节压缩到4个字节,我们也可以在Spark-env.sh中进行配置。

假设我们通过以上策略还是发现对象占用了过大的内存,可以用一个非常简单的方式来降低内存使用,就是将对象以序列化的形式(serialized form)存储,在RDD的持久化接口中使用序列化的存储级别,如MEMORY_ONLY_SER, Spark便会将每个RDD分区存储为一个很大的字节数组。而这种方式会使得访问数据的速度有所下降,因为每个对象访问时都需要有一个反序列化的过程。在7.1节中我们已经介绍过,优先使用Kryo序列化方式,其占用大小远低于Java本身的序列化方式。

5.3 垃圾回收(GC)优化

如果我们在应用中进行了频繁的RDD变动,那么JVM的垃圾回收会成为一个问题(也就是说,假设在程序中只创建了一个RDD,后续所有操作都围绕这个RDD,那么垃圾回收就不存在问题)。当Java需要通过删除旧对象来为新对象开辟空间时,它便会扫描我们曾创建的所有对象并找到不再使用的对象。所以垃圾回收的开销是和Java对象的个数成比例的,我们要尽可能地使用包含较少对象的数据结构(如使用Int数组代替LinkedList)来降低这部分开销。另外前面提到的用序列化形式存储也是一个很好的方法,序列化后每个对象在每个RDD分区下仅有一个对象(一个字节数组)。注意当GC开销成为瓶颈时,首先要尝试的便是序列化缓存(serialized caching)。在做GC优化时,我们首先需要了解GC发生的频率以及其所消耗的时间。这可以通过在Java选项中加入-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps来实现;之后当Spark任务运行后,便可以在Worker日志中看到GC发生时打印的信息。注意这些日志是打印在集群中的Worker节点上的(在工作目录的stdout文件中),而非Driver程序。为了进一步优化GC,首先简单介绍下Java虚拟机内部是如何进行内存管理的。

(1)Java对象是存储在堆空间内的,堆空间被分为两部分,即年轻区域(Young region)和老年区域(Old region),其中年轻代(Young generation)会用来存储短生命周期的对象,而老年代(Old generation)会用来存储较长生命周期的对象。

(2)年轻代的区域又被分为3个部分[Eden, Survivor1,Survivor2]。

(3)一个简单的GC流程大致是:当Eden区域满了,一次小型GC过程会将Eden和Survivor1中还存活的对象复制到Survivor2区域上,Survivor区域是可交换的(即来回复制),当一个对象存活周期已足够长或者Survivor2区域已经满时,那么它们会被移动到老年代上,而当老年代的区域也满了时,就会触发一次完整的GC过程。Java的这种GC机制主要是基于程序中创建的大多数对象,都会在创建后被很快销毁,只有极少数对象会存活下来,所以其分为年轻代和老年代两部分,而这两部分GC的方式也是不同的,其时间复杂度也是不同的,年轻代会更加快一些,感兴趣的读者可以进一步查阅相关资料。基于以上原因,Spark在GC方面优化的主要目标是:只有长生命周期的RDD会被存储在老年代上,而年轻代上有足够的空间来存储短生命周期的对象,从而尽可能避免任务执行时创建的临时对象触发完整GC流程。我们可以通过以下步骤来一步步优化:

(1)通过GC统计信息观察是否存在过于频繁的GC操作,如果在任务完成前,完整的GC操作被调用了多次,那么说明可执行任务并没有获得足够的内存空间。

(2)如果触发了过多的小型GC,而完整的GC操作并没有调用很多次,那么给Eden区域多分配一些内存空间会有所帮助。我们可以根据每个任务所需内存大小来预估Eden的大小,如果Eden设置大小为E,可以利用配置项-Xmn=4/3*E来对年轻代的区域大小进行设置(其中4/3的比例是考虑到survivor区域所需空间)。(3)如果我们观察GC打印的统计信息,发现老年代接近存满,那么就需要改变spark.memory.fraction来减少存储类内存(用于caching)的占用,因为与其降低任务的执行速度,不如减少对象的缓存大小。另一个可选方案是减少年轻代的大小,即通过-Xmn来进行配置,也可以通过JVM的NewRatio参数进行调整,大多数JVM的该参数的默认值是2,意思是老年代占整个堆内存的2/3,这个比例需要大于Spark.Memory.Fraction。

(4)通过加入-XX:+UserG1GC来使用G1GC垃圾回收器,这可以一定程度提高GC的性能。另外注意对于executor堆内存非常大的情况,一定通过-XX:G1HeapRegionSize来增加G1区域的大小。

针对以上步骤我们举一个例子,如果我们的任务是从HDFS当中读取数据,任务需要的内存空间可以通过从HDFS当中读取的数据块大小来进行预估,一般解压后的数据块大小会是原数据块的2~3倍,所以如果我们希望3、4个任务同时运行在工作空间中,假设每个HDFS块大小是128MB,那么需要将Eden大小设置为4×3×128MB。改动之后,我们可以监控GC的频率和时间消耗,看看有没有达到优化的效果。对于优化GC,主要还是从降低全局GC的频率出发,executor中对于GC优化的配置可以通过spark.executor.extraJavaOptions来配置。

5.4 Spark Streaming内存优化

前面介绍了Spark中的优化策略和关于GC方面的调优,对于Spark Streaming的应用程序,这些策略也都是适用的,除此之外还会有一些其他方面的优化点。对于Spark Streaming应用所需要的集群内存,很大程度上取决于要使用哪种类型的transformation操作。比如,假设我们想使用10分钟数据的窗口操作,那么我们的集群必须有足够的空间能够保存10分钟的全部数据;亦或,我们在大量的键值上使用了updateStateByKey操作,那么所需要的内存空间会较大。而如果我们仅仅使用简单的Map、Filter、Store操作,那么所需空间会较小。默认情况下,接收器接收来的数据会以StorageLevel.MEMORY_AND_DISK_SER_2的格式存储,那么如果内存不足时,数据就会序列化到硬盘上,这样会损失SparkStreaming应用的性能。所以通常建议为Spark Streaming应用分配充足的内存,可以在小规模数据集上进行测试和判断。另一方面与Spark程序有显著区别的是,Spark Streaming程序对实时性要求会较高,所以我们需要尽可能降低JVM垃圾回收所导致的延迟。基于此,我们可以通过以下几个参数对内存使用和GC开销进行优化调整。

● DStream的持久化级别:输入数据默认是持久化为字节流的,因为相较于反序列化的开销,其更会降低内存的使用并且减少GC的开销。所以优先使用Kryo序列化方式,可以大大降低序列化后的尺寸和内存开销。另外,如果需要更进一步减少内存开销,可以通过配置spark.rdd.compress进行更进一步的压缩(当然对于目前的集群机器,大多数内存都足够了)。

● 及时清理老数据:默认情况下所有的输入数据和由DStream的Transormation操作产生的持久RDD会被自动清理,即Spark Streaming会决定何时对数据进行清理。例如,假设我们使用10分钟的窗口操作,Spark Streaming会保存之前10分钟的所有数据,并及时清理过时的老数据。数据保存的时间可以通过stremingContext.remember进行设置。

● CMS垃圾回收器:不同于之前我们在Spark中的建议,由于需要减少GC间的停顿,所以这里建议使用并发标记清除类的GC方式。即使并发GC会降低全局系统的生产吞吐量,但是使用这种GC可以使得每个Batch的处理时间更加一致(不会因为某个Batch处理时发生了GC,而导致处理时间剧增)。我们需要确保在Driver节点(在spark-submit中使用—driver-java-options)和Executor节点(在Spark配置中使用spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都设置了CMS GC方式。

● 其他减少GC开销的方式有:可以通过OFF_HEAP存储级别的RDD持久化方式,以及可以在Executor上使用更小的堆内存,从而降低每个JVM堆垃圾回收的压力。

作者 east
Kafka, Spark 3月 3,2021

SparkStreaming Direct方式读取kafka优缺点及示例(Redis保存offset)

在Spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有Receiver这一层,其会周期性地获取Kafka中每个topic(主题)的每个partition(分区)中的最新offsets(偏移量),之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图所示。

这种方法相较于Receiver方式的优势在于:

● 简化的并行。Direct方式中,Kafka中的partition与Spark内部的partition是一一对应的,这点使得我们可以很容易地通过增加Kafka中的partition来提高数据整体传输的并行度,而不像Receiver方式中还需要创建多个Receiver然后利用union再合并成统一的Dstream。

● 高效。Direct方式中,我们可以自由地根据offset来从Kafka中拉取想要的数据(前提是Kafka保留了足够长时间的数据),这对错误恢复提供了很好的灵活性。然而在Receiver的方式中,还需要将数据存入Write Ahead Log中,存在数据冗余的问题。

● 一次性接收精确的数据记录Direct方式中我们直接使用了低阶Kafka的API接口,offset默认会利用Spark Steaming的checkpoints来存储,同样也可以将其存到数据库等其他地方。然而在Receiver的方式中,由于使用了Kafka的高阶API接口,其默认是从ZooKeeper中拉取offset记录(通常Kafka取数据都是这样的),但是Spark Streaming消费数据的情况和ZooKeeper记录的情况是不同步的,当程序发生中断或者错误时,可能会造成数据重复消费的情况。

不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然Zookeeper就保存了当前消费的offset值,如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,是直接从Kafka来读数据,offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到ZooKeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到ZooKeeper的通用类中。下面示范用redis保存offset

object Demo {


  val IP_RANG: Array[String] = "91,92,93,94,95".split(",")
  val PORT_RANG: Array[String] = "22420,22421,22422,22423,22424,22425,22426,22427".split(",")
  val hosts = new util.HashSet[HostAndPort]()

  val sdf:SimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")

  def main(args: Array[String]) {

      val Array(checkPointDir, topic, brokers, groupId, cf, offset, dw_all_tn, dw_track_tn, dw_unique_tn, batchIntervel) = args

      login

      val client: JedisCluster = new JedisCluster(hosts, 5000)


      var topicPartitions: Map[TopicPartition, Long] = Map()

      if (client.exists(topic)) {
        val offsetMap: util.Map[String, String] = client.hgetAll(topic)
        val iterator: util.Iterator[String] = offsetMap.keySet().iterator()
        while (iterator.hasNext) {
          val key: String = iterator.next()
          val value: String = offsetMap.get(key)
          println(key + "------" + value)
          topicPartitions += (new TopicPartition(topic, key.toInt) -> value.toLong)
        }
      }
      client.close()

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "value.deserializer" -> classOf[StringDeserializer],
        "key.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "security.protocol" -> "SASL_PLAINTEXT",
        "sasl.kerberos.service.name" -> "kafka",
        "auto.offset.reset" -> offset,
        "kerberos.domain.name" -> "hadoop.hadoop.com",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )


      def functionToCreateContext(): StreamingContext = {

        //      val topicArr = topic.split(",")
        //      val topicSet = topicArr.toSet


        val locationStrategy = LocationStrategies.PreferConsistent
        //      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

        val sparkConf: SparkConf = new SparkConf().setAppName("jingyi_xn_dw_all&track")

        val ssc = new StreamingContext(sparkConf, Seconds(batchIntervel.toInt))
        //      if (!"nocp".equals(checkPointDir)) {
        //        ssc.checkpoint(checkPointDir)
        //      }


        val config = HBaseConfiguration.create()
        val hbaseContext = new HBaseContext(ssc.sparkContext, config)

        val stream = KafkaUtils.createDirectStream[String, String](ssc,
          locationStrategy,
          //        consumerStrategy
          ConsumerStrategies.Assign[String, String](topicPartitions.keys.toList, kafkaParams, topicPartitions)
        )
    }
}

def setRedisHost: Unit ={
    for (host <- IP_RANG) {
      for (port <- PORT_RANG) {
        hosts.add(new HostAndPort("192.68.196." + host, port.toInt))
      }
    }
  }

       
作者 east
Spark 3月 2,2021

Spark开发规范

规则

Spark应用中,需引入Spark的类

  • 对于Java开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.api.java.JavaSparkContext
// RDD操作时引入的类。
import org.apache.spark.api.java.JavaRDD
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf
  • 对于Scala开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.SparkContext
// RDD操作时引入的类。
import org.apache.spark.SparkContext._
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf

Java与Scala函数有区别,在编写应用时,需要弄清楚每个函数的功能

RDD是不可改变的,也就是说,RDD的元素对象是不能更改的,因此,在用Java和Scala编写需要弄清楚每个函数的功能。下面举个例子。

场景:现有用户位置数据,按照时间排序生成用户轨迹。在Scala中,按时间排序的代码如下:

/* 函数实现的功能是得到某个用户的位置轨迹。
 * 参数trajectory:由两部分组成-用户名和位置点(时间,经度,维度)
 */
private def getTimesOfOneUser(trajectory: (String, Seq[(String, Float, Float)]), zone: Zone, arrive: Boolean): Int =
{
// 先将用户位置点按时间排序
    val sorted: Seq[(String, Float, Float)] = trajectory._2.sortBy(x => x._1);
    …
}

若用java实现上述功能,则需要将trajectory._2重新生成对象,而不能直接对trajectory._2进行排序操作。原因是Collections.sort(trajectory._2)这个操作会改变了trajectory._2这个对象本身,这违背了RDD元素不可更改这条规则;而Scala代码之所以能够正常运行,是因为sortBy( )这个函数生成了一个新的对象,它并不对trajectory._2直接操作。下面分别列出java实现的正确示例和错误示例。

正确示例:

// 将用户的位置点从新生成一个对象。
List<Tuple3< String, Float, Float >> list = new ArrayList<Tuple3< String, Float, Float >>( trajectory._2);
// 对新对象进行排序。
Collections.sort(list);

错误示例:

// 直接对用户位置点按照时间排序。
Collections.sort(trajectory._2);

分布式模式下,应注意Driver和Executor之间的参数传递

在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会使用这种方式,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在分布式模式下,执行程序的jar包会被发送到每个Executor上执行。而该变量只在main函数的节点改变了,并未传给执行任务的函数中,因此Executor将会报空指针异常。

正确示例:

object Test
{
  private var testArg: String = null;
  def main(args: Array[String])
  {
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
    .map(x => testFun(x, testArg));
  }

  private def testFun(line: String, testArg: String): String =
  {
    testArg.split(…);
    return …; 
  }
}

错误示例:

//定义对象。
object Test
{
  // 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。
  private var testArg: String = null;
  // main函数
  def main(args: Array[String])
  {
    
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
      .map(x => testFun(x));
  }

  private def testFun(line: String): String =
  {
    testArg.split(...);
    return …; 
  }
}

运行错误示例,在Spark的local模式下能正常运行,而在分布式模式情况下,会在蓝色代码处报错,提示空指针异常,这是由于在分布式模式下,执行程序的jar包会被发送到每个Executor上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。

应用程序结束之前必须调用SparkContext.stop

利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。

说明:

利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。

利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。

以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。

正确示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

//应用程序结束
sc.stop()

错误示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

如果不添加SparkContext.stop,YARN界面会显示失败。同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。

作者 east
Spark 2月 21,2021

Spark编程语言选择:scala对比python

为Apache Spark选择编程语言是一个主观问题,因为特定的数据科学家或数据分析师喜欢将Python或Scala用于Apache Spark的原因可能并不总是适用于其他人。根据独特的用例或要开发的特定类型的大数据应用程序-数据专家确定哪种语言更适合Apache Spark编程。对于数据科学家来说,学习Scala,Python,R和Java以便在Spark中进行编程并根据任务的功能解决方案的效率选择首选语言非常有用。让我们探索一些重要因素,然后再决定将Scala vs Python作为Apache Spark的主要编程语言。

Python vs Scala

Hadoop更快的表亲Apache Spark框架具有用于以各种语言(Java,Scala和Python)进行数据处理和分析的API。出于讨论的目的,我们将Java从大数据分析和处理的比较列表中删除,因为Java太冗长了。 Java不支持读取 – 评估 – 打印循环(REPL)选择编程语言,大数据处理时,这是一个重大的大忌。

Scala和Python都易于编程,可帮助数据专家快速提高生产率。数据科学家通常更喜欢同时学习Spark的Scala和Spark的Python,但是Python通常是Apache Spark第二受欢迎的语言,因为Scala最早出现在该语言中。但是,以下一些重要因素可以帮助数据科学家或数据工程师根据他们的要求选择最佳的编程语言:

Scala与Python进行Spark编程

1)Scala vs Python-性能
由于使用JVM,Scala编程语言的数据分析和处理速度比Python快10倍。当使用Python编程代码来调用Spark库时,性能是中等的,但是如果涉及的处理量比Python代码要慢得多,则它会比Scala等效代码慢得多。 Python解释器PyPy具有一个内置的JIT(即时)编译器,该编译器速度非常快,但不提供对各种Python C扩展的支持。在这种情况下,带有C扩展库的CPython解释器要优于PyPy解释器。

在Apache Spark上使用Python作为Scala的性能开销,但是重要性取决于您在做什么。当内核数量较少时,Scala比Python更快。随着内核数量的增加,Scala的性能优势开始减弱。

当使用大量内核时,性能不是选择Apache Spark编程语言的主要驱动因素。但是,当存在重要的处理逻辑时,性能是主要因素,而Scala肯定会比Python提供更好的性能,从而可以针对Spark进行编程。

2)Scala vs Python-学习曲线
使用Apache Spark进行编程时,Scala语言具有多种语法功能,因此在学习Scala for Spark时,大数据专业人员必须非常谨慎。程序员有时可能会疯狂地发现Scala用于在Spark中进行编程的语法。 Scala中的库很少,因此很难定义经验不足的程序员可以理解的随机符号运算符。使用Scala时,开发人员需要专注于代码的可读性。与Java或Python相比,Scala是一种语法灵活的复杂语言。对Scala开发人员的需求不断增长,因为大数据公司重视可以掌握Apache Spark中数据分析和处理的高效健壮编程语言的开发人员。

由于Java的语法和标准库,Python比较容易学习。但是,对于高并发和可扩展的系统(例如SoundCloud或Twitter),Python并不是理想的选择。

Learning Scala丰富了程序员对类型系统中各种新颖抽象,新颖的函数式编程功能和不可变数据的知识。

3)Scala vs Python –并发
大数据系统的复杂基础结构需要一种编程语言,该语言具有跨多个数据库和服务进行集成的能力。 Scala凭借Play框架赢得了这场比赛,该框架提供了许多异步库和反应式内核,可以轻松地与各种并发原语(例如Akka在大数据生态系统中的参与者)集成。 Scala允许开发人员编写高效,可读性和可维护性的服务,而无需将程序代码悬挂到不可读的回调蜘蛛网中。相反,Python确实使用uwsgi支持重量级的进程派生,但它不支持真正的多线程。

在将Python用于Spark时,无论进程具有多少线程,Python进程一次只能激活一个CPU。这有助于每个CPU内核处理一个进程,但是这样做的缺点是,每当要部署新代码时,都需要重新启动更多进程,这还需要额外的内存开销。在这些方面,Scala更加高效且易于使用。

4)Scala与Python – TypeSafety
使用Apache Spark进行编程时,开发人员需要根据不断变化的需求不断重构代码。 Scala是一种静态类型的语言,尽管由于经典的类型推断机制,它看起来像一种动态类型的语言。作为静态类型的语言,Scala仍然为编译器提供了捕获编译时错误的功能。

重构静态语言(例如Scala)的程序代码比重构动态语言(例如Python)要容易得多,而且没有麻烦。在修改Python程序代码后,开发人员通常会遇到困难,因为它比修复较旧的bug会产生更多的bug。 Python中的Typecheck实际上征服了Python的鸭子式哲学。使用Scala for Spark时要缓慢而安全,要比使用Python for Spark时要快而死。

对于较小的临时实验,Python是对抗Spark的有效选择,但对于生产中的大型软件工程,它无法像静态类型的语言Scala那样有效地扩展。

5)Scala vs Python –易于使用
Scala和Python语言在Spark上下文中具有同等的表现力,因此通过使用Scala或Python,可以实现所需的功能。无论哪种方式,程序员都会创建Spark内容并在其上调用函数。 Python是比Scala更用户友好的语言。 Python不太冗长,因此开发人员可以轻松地在Python中为Spark编写脚本。易于使用是一个主观因素,因为它取决于程序员的个人喜好。

6)Scala vs Python –高级功能
Scala编程语言具有多种存在性类型,宏和隐式。 Scala的神秘语法可能使尝试使用开发人员可能无法理解的高级功能变得困难。但是,Scala的优势在于在重要的框架和库中使用这些强大的功能。

话虽如此,Scala没有足够的数据科学工具和库(例如Python)用于机器学习和自然语言处理。 SparkMLib –机器学习库仅具有较少的ML算法,但它们是大数据处理的理想选择。 Scala缺乏良好的可视化和本地数据转换。 Scala绝对是Spark Streaming功能的最佳选择,因为Python Spark Streaming支持并不像Scala那样先进和成熟。

总结:针对Apache Spark的Scala与Python
“ Scala速度更快,并且易于使用,而Python速度较慢,但​​是非常易于使用。”

Apache Spark框架是用Scala编写的,因此了解Scala编程语言可以帮助大数据开发人员轻松地深入源代码(如果某些功能无法按预期运行)。使用Python会增加出现更多问题和bug的可能性,因为很难在2种不同语言之间进行翻译。使用Scala for Spark可以访问Spark框架的最新功能,因为它们首先在Scala中可用,然后移植到Python。

选择Scala vs Python for Spark取决于最适合项目需求的功能,因为每个功能各有优缺点。在选择用于使用Apache Spark进行编程的语言之前,开发人员必须学习Scala和Python以熟悉其功能。在学习了Python和Scala之后,就应该很容易决定何时将Scala用于Spark和何时将Python用于Spark。在Apache Spark中编程的语言选择纯粹取决于要解决的问题。

我们很想知道您对您选择哪种语言进行Apache Spark编程的意见。请在下面的评论中提及您的选择。

作者 east
Kafka, Spark 2月 21,2021

spark和kafka在数据流处理对比

2625 / 5000

在对Spark Streaming和Kafka Streaming进行比较并得出何时使用哪个比较之前,让我们首先对Data Streaming的基础知识有一个清晰的了解:它是如何出现的,流是什么,如何运行,其协议和用例。 。 数据流如何诞生? 从那时起,数据一直是操作的重要组成部分。数据构成了整个操作结构的基础,其中数据被进一步处理以在系统的不同实体模块中使用。这就是为什么它已成为IT领域的典型代表。 随着技术的发展,数据的重要性变得更加突出。数据处理中使用的方法已经发生了显着变化,以适应软件机构对数据输入的不断增长的需求。 随着时间的增长,数据处理的时间框架急剧缩短,以至于立即处理的输出有望满足最终用户的更高期望。随着人工智能的出现,人们强烈希望为看起来像人类的最终用户提供实时帮助。 此要求仅取决于数据处理强度。越快越好。因此,结果是处理数据的方式发生了变化。较早之前,在指定的延迟之后,有成批的输入被输入到系统中,从而将处理后的数据作为输出。 目前,这种延迟(延迟)是输入性能,处理时间和输出的结果,这已成为性能的主要标准之一。为了确保高性能,延迟必须最小化到几乎是实时的程度。 这就是数据流出现的方式。在数据流处理中,实时数据流作为输入传递,必须立即进行处理,并实时传递输出信息流。

什么是数据流?

数据流传输是一种方法,其中不按常规的批处理方式发送输入,而是以连续流的形式发布该流,并按原样使用算法进行处理。还以连续数据流的形式检索输出。 该数据流是使用数千个源生成的,这些源同时以小尺寸发送数据。这些文件背对背发送时形成连续的流程。这些可能是大量发送的日志文件以进行处理。 这种作为流出现的数据必须被顺序处理以满足(几乎)连续实时数据处理的要求。

为什么需要数据流?


随着企业在线人数的增加以及随之而来的对数据的依赖,人们已经意识到了数据的方式。数据科学和分析技术的出现导致大量数据的处理,为实时数据分析,复杂数据分析,实时流分析和事件处理提供了可能性。

当输入数据大小庞大时,需要进行数据流传输。我们需要先存储数据,然后再将其移动以进行批处理。由于数据以多批次的形式存储,因此涉及大量时间和基础架构。为了避免所有这些情况,信息以小数据包的形式连续流传输以进行处理。数据流提供超可伸缩性,这仍然是批处理的挑战。

使用数据流传输的另一个原因是要提供近乎实时的体验,其中最终用户在输入数据时会在几秒钟或几毫秒内获得输出流。

当数据源似乎无穷无尽且无法为批处理中断时,也需要进行数据流传输。 IoT传感器在此类别中发挥了重要作用,因为它们会生成连续的读数,需要对其进行处理以得出推论。

数据流如何发生?


为了通过实时处理数据做出即时决策,可以进行数据流传输。 根据系统的规模,复杂性,容错性和可靠性要求,您可以使用工具,也可以自己构建。

自行构建它意味着您需要在编码角色之前将事件放置在诸如Kafka之类的消息代理主题中。 这里的参与者是一段代码,旨在接收来自代理中的问题的事件(即数据流),然后将输出发布回代理。

Spark是第一代Streaming Engine,它要求用户编写代码并将其放置在actor中,他们可以进一步将这些actor连接在一起。 为了避免这种情况,人们经常使用Streaming SQL进行查询,因为它使用户可以轻松地查询数据而无需编写代码。 流SQL是对SQL的扩展支持,可以运行流数据。 此外,由于SQL在数据库专业人员中已得到很好的实践,因此执行流式SQL查询将更加容易,因为它基于SQL。

这是用例的流式SQL代码,在这种情况下,如果池中的温度在2分钟内下降了7度,则必须向用户发送警报邮件。

@App:name("Low Pool Temperature Alert")

@App: description('An application which detects an abnormal decrease in swimming pools temperature.')

@source(type='kafka',@map(type='json'),bootstrap.servers='localhost:9092',topic.list='inputStream',group.id='option_value',threading.option='single.thread')

define stream PoolTemperatureStream(pool string, temperature double);

@sink(type='email', @map(type='text'), ssl.enable='true',auth='true',content.type='text/html', username='sender.account', address='sender.account@gmail.com',password='account.password', subject="Low Pool Temperature Alert", to="receiver.account@gmail.com")

define stream EmailAlertStream(roomNo string, initialTemperature double, finalTemperature double);

--Capture a pattern where the temperature of a pool decreases by 7 degrees within 2 minutes

@info(name='query1')

from every( e1 = PoolTemperatureStream ) -> e2 = PoolTemperatureStream [e1.pool == pool and (e1.temperature + 7.0) >= temperature]

    within 2 min

select e1.pool, e1.temperature as initialTemperature, e2.temperature as finalTemperature

insert into EmailAlertStream;

Spark SQL提供DSL(特定于域的语言),这将有助于以不同的编程语言(例如Scala,Java,R和Python)操纵DataFrame。 它使您可以使用SQL或DataFrame API对Spark程序内部的结构化数据执行查询。 Kafka等新一代流引擎也支持Kafka SQL或KSQL形式的Streaming SQL。

尽管流处理的过程大致相同,但此处重要的是根据用例要求和可用的基础结构选择流引擎。 在得出结论之前,什么时候使用Spark Streaming和什么时候使用Kafka Streaming,让我们首先探索Spark Streaming和Kafka Streaming的基础知识,以更好地理解。

什么是Spark Streaming?

Spark Streaming是核心Spark API的扩展,可让其用户执行实时数据流的流处理。 它从Kafka,Flume,Kinesis或TCP套接字等来源获取数据。 可以使用复杂的算法对这些数据进行进一步处理,这些复杂的算法使用诸如map,reduce,join和window之类的高级功能表示。 最终输出(即处理后的数据)可以推送到诸如HDFS文件系统,数据库和实时仪表板之类的目标。

让我们仔细看看Spark Streaming的工作原理。 Spark Streaming从数据源以数据流的形式获取实时输入,并将其进一步分为几批,然后由Spark引擎处理以生成大量输出。 Spark Streaming允许您将机器学习和图形处理用于数据流以进行高级数据处理。它还提供了代表连续数据流的高级抽象。 数据流的这种抽象称为离散流或DStream。该DStream可以通过对Kafka,Flume和Kinesis等来源的数据流或其他DStream进行高级操作来创建。 这些DStream是RDD(弹性分布式数据集)的序列,RDD是分布在计算机集群上的多个只读数据集。这些RDD以容错方式进行维护,使其具有高度鲁棒性和可靠性。DStreams序列Spark Streaming使用Spark Core的快速数据调度功能来执行流分析。从诸如Kafka,Flume,Kinesis等之类的源中以迷你批的形式摄取的数据用于执行数据流处理所需的RDD转换。


Spark Streaming使您可以根据需要使用Scala,Java或Python编写程序来处理数据流(DStreams)。由于此处将用于批处理的代码用于流处理,因此使用Spark Streaming实现Lambda体系结构(将批处理和流处理混合在一起)变得容易得多。但这是以等于最小批处理持续时间的延迟为代价的。 Spark Streaming中的输入源 Spark支持主要来源,例如文件系统和套接字连接。另一方面,它也支持高级资源,例如Kafka,Flume,Kinesis。只有添加额外的实用程序类,才能获得这些出色的资源。 您可以使用以下工件链接Kafka,Flume和Kinesis。

kafka:spark-streaming-kafka-0-10_2.12

flume:spark-streaming-flume_2.12

Kinesis:spark-streaming-kinesis-asl_2.12

什么是Kafka流媒体?

Kafka Stream是一个客户端库,可让您处理和分析从Kafka接收的数据输入,并将输出发送到Kafka或其他指定的外部系统。 Kafka依赖于流处理概念,例如: 准确区分事件时间和处理时间 窗口支持 高效直接的应用程序状态管理 通过利用Kafka中的生产者和消费者库来利用Kafka的本机功能,从而简化了应用程序开发,从而使其更加直接和快捷。正是由于这种原生的Kafka潜力,使得Kafka流式传输可以提供数据并行性,分布式协调,容错性和操作简便性。 Kafka Streaming中的主要API是提供多个高级运算符的流处理DSL(特定于域的语言)。这些运算符包括:筛选器,映射,分组,窗口,聚合,联接和表的概念。 Kafka中的消息传递层对进一步存储和传输的数据进行分区。根据状态事件在Kafka流中对数据进行分区,以进行进一步处理。通过将拓扑划分为多个任务来缩放拓扑,其中为每个任务分配了输入流中的分区列表(Kafka主题),从而提供了并行性和容错能力。

Kafka可以进行状态转换,与Spark Streaming中的批处理不同。 它在其主题内存储状态,流处理应用程序将其用于存储和查询数据。 因此,其所有操作均受状态控制。 这些状态还用于连接主题以形成事件任务.Kafka中基于状态的操作 这是由于Kafka中基于状态的操作使其具有容错能力,并允许从本地状态存储中自动恢复。 Kafka Streaming中的数据流是使用表和KStreams的概念构建的,这有助于它们提供事件时间处理。

Spark Streaming与Kafka Streaming:

何时使用什么 Spark Streaming使您可以灵活地选择任何类型的系统,包括具有lambda架构的系统。但是,Spark Streaming的延迟范围从毫秒到几秒。 如果延迟不是一个重要的问题,并且您正在寻找在源兼容性方面的灵活性,那么Spark Streaming是最佳选择。可以在EC2,Hadoop YARN,Mesos或Kubernetes上使用独立的集群模式运行Spark Streaming。 它可以访问HDFS,Alluxio,Apache Cassandra,Apache HBase,Apache Hive和许多其他数据源中的数据。它提供了容错能力,还提供了Hadoop分发。 此外,在Spark流式传输的情况下,您不必为批处理和流式传输应用程序分别编写多个代码,在这种情况下,单个系统可以同时满足这两种情况。 另一方面,如果延迟是一个重要问题,并且必须坚持以短于毫秒的时间范围进行实时处理,则必须考虑使用Kafka Streaming。由于事件驱动处理,Kafka Streaming提供了高级的容错能力,但是与其他类型的系统的兼容性仍然是一个重要的问题。此外,在高可伸缩性要求的情况下,Kafka具有最佳的可伸缩性,因此非常适合。

如果您要处理从Kafka到Kafka的本机应用程序(输入和输出数据源都在Kafka中),则Kafka流式传输是您的理想选择。 虽然Kafka Streaming仅在Scala和Java中可用,但Spark Streaming代码可以用Scala,Python和Java编写。 结束语 随着技术的发展,数据也随着时间大量增长。处理此类海量数据的需求以及对实时数据处理的日益增长的需求导致了数据流的使用。通过几种数据流方法,尤其是Spark Streaming和Kafka Streaming,全面了解用例以做出最适合需求的最佳选择变得至关重要。 在用例中优先考虑需求对于选择最合适的流技术至关重要。鉴于事实,Spark Streaming和Kafka Streaming都是高度可靠的,并且广泛推荐作为Streaming方法,它在很大程度上取决于用例和应用程序,以确保最佳效果。 在本文中,我们指出了两种流传输方法的专业领域,以便为您提供更好的分类,这可以帮助您确定优先级并做出更好的决策。

作者 east
Spark 1月 4,2021

Idea配置Scala开发环境注意事项

使用maven方式,注意切注意spark与scala有版本对应关系, 详情参考Spark官网相关说明:https://spark.apache.org/docs/latest/index.htmlscala版本还要跟工程配置Library添加的Scala版本一致,才不会出现“Cannot find Main Class”在pom.xml中添加maven 依赖包时,我就发现不管是否用了翻墙,下载速度都好慢,就1M的东西能下半天,很是苦恼,于是到网上搜资料,然后让我查到了。说是使用阿里的maven镜像就可以了。我于是亲自试了下,速度快的飞起!!!右键项目选中maven选项,然后选择“open settings.xml”或者 “create settings.xml”,然后把如下代码粘贴进去就可以了。重启IDE,感受速度飞起来的感觉吧!!!

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <mirrors>
        <!-- mirror
         | Specifies a repository mirror site to use instead of a given repository. The repository that
         | this mirror serves has an ID that matches the mirrorOf element of this mirror. IDs are used
         | for inheritance and direct lookup purposes, and must be unique across the set of mirrors.
         |
        <mirror>
          <id>mirrorId</id>
          <mirrorOf>repositoryId</mirrorOf>
          <name>Human Readable Name for this Mirror.</name>
          <url>http://my.repository.com/repo/path</url>
        </mirror>
         -->

        <mirror>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>uk</id>
            <mirrorOf>central</mirrorOf>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://uk.maven.org/maven2/</url>
        </mirror>

        <mirror>
            <id>CN</id>
            <name>OSChina Central</name>
            <url>http://maven.oschina.net/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>nexus</id>
            <name>internal nexus repository</name>
            <!-- <url>http://192.168.1.100:8081/nexus/content/groups/public/</url>-->
            <url>http://repo.maven.apache.org/maven2</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

    </mirrors>
</settings>
作者 east
Spark, 数据挖掘 11月 17,2020

Spark新闻App点击率预估实践案例

import org.apache.spark.sql.{SparkSession}

//action:userid~ docid ~behaivor(label)~time~ip
//160520092238579653~160704235940001~0~20160705000040909~1.49.185.165
//160520092238579653~160704235859003~0~20160705000040909~1.49.185.165
//define case class for action data
case class Action(docid: String, label:Int)

//document:docid ~ channelname ~ source ~ keyword:score
//160705131650005~科技~偏执电商~支付宝:0.17621 医疗:0.14105 复星:0.07106 动作:0.05235 邮局:0.04428
//160705024106002~体育~平大爷的刺~阿杜:0.23158 杜兰特:0.09447 巨头:0.08470 拯救者:0.06638 勇士:0.05453
//define case class for document data
case class Dccument(docid: String, channal: String, source: String, tags: String)

object GenTrainingData {
  def main(args: Array[String]): Unit = {

    //2rd_data/ch09/action.txt 2rd_data/ch09/document.txt output/ch11 local[2]
    val Array(actionPath, documentPath, output, mode) = args
    // 创建Spark实例
    val spark = SparkSession.builder
      .master(mode)
      .appName(this.getClass.getName)
      .getOrCreate()

    import spark.implicits._
    val ActionDF = spark.sparkContext.textFile(actionPath).map(_.split("~"))
      .map(x => Action(x(1).trim.toString, x(2).trim.toInt))
      .toDF()
    // Register the DataFrame as a temporary view
    //ActionDF.createOrReplaceTempView("actiondf")

    val documentDF = spark.sparkContext.textFile(documentPath).map(_.split("~")).filter(_.length > 3)
      .map { case x =>
        val xtags = x(3).split(" ").filter(_.length > 0).map { b => b.substring(0, b.indexOf(":")) }.mkString("|")
        Dccument(x(0).trim.toString, x(1).trim.toString, x(2).trim.toString, xtags.toString)
      }
      .toDF()
    // Register the DataFrame as a temporary view
    //documentDF.createOrReplaceTempView("documentdf")

    // 将查询结果放到tempDF中,完成dataframe转化
    //val tempDF = spark.sql("select actiondf.docid,actiondf.label,documentdf.channal,documentdf.source,documentdf.tags from actiondf,documentdf where actiondf.docid = documentdf.docid")
    val tempDF = documentDF.join(ActionDF, documentDF("docid").equalTo(ActionDF("docid")))
    //tempDF.select($"tags").show(100)

    // 编码格式转换
    val minDF = tempDF.select($"tags").rdd
      .flatMap{ x => x.toString.replace("[","").replace("]","").split('|') }.distinct
    //minDF.coalesce(1).saveAsTextFile(output+"/tags")
    val indexes = minDF.collect().zipWithIndex.toMap
    println(indexes.toList.length) //23937
    //
    val libsvmDF = tempDF.select($"label", $"tags").map {
      x =>
        val label = x(0)
        val terms = x(1).toString.replace("[","").replace("]","")
          .split('|') //使用单引号
          .map(v => (indexes.get(v).getOrElse(-1)+1, 1)) //索引从0开始
          .sortBy(_._1) //libsvm 需要升序
          .map(x => x._1 + ":" + x._2)
          .mkString(" ")
        (label.toString + " " + terms)
    }
    libsvmDF.show(100)

    //保存模型时存在:Exception while deleting local spark dir,不影响结果生成,作为已知问题暂时搁置。
    //libsvmDF.coalesce(1).write.format("text").save(output+"/model")
    //libsvmDF.rdd.coalesce(1).saveAsTextFile(output+"/model")
    val Array(trainingdata, testdata) = libsvmDF.randomSplit(Array(0.7, 0.3))
    trainingdata.rdd.coalesce(1).saveAsTextFile(output+"/training")
    testdata.rdd.coalesce(1).saveAsTextFile(output+"/test")
    //
    //spark.stop()
  }
}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

object LRTrainAndTest {

  def main(args: Array[String]) {

    if (args.length < 8) {
      System.err.println("Usage:LRTrainAndTest <trainingPath> <testPath> <output> <numFeatures> <partitions> <RegParam> <NumIterations> <NumCorrections>")
      System.exit(1)
    }

    //2rd_data/ch11/test/part-00000 2rd_data/ch11/training/part-00000 output/ch11/label 23937 50 0.01 100 10
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("ADTest with logistic regression")
    val sc = new SparkContext(conf)
    val numFeatures = args(3).toInt //特征数23937
    val partitions = args(4).toInt //一般50-1000

    //label channal source tags
    //依次为:类别(是否点击,点击为1,没有点击为0)、频道、来源、关键词
    //样例:1 娱乐 腾讯娱乐 曲妖精|棉袄|王子文|老大爷|黑色

    // 导入训练样本和测试样本
    val training = MLUtils.loadLibSVMFile(sc,args(0),numFeatures,partitions)
    val test = MLUtils.loadLibSVMFile(sc,args(1),numFeatures,partitions)

    val lr = new LogisticRegressionWithLBFGS()

    //训练参数设置
    lr.optimizer.setRegParam(args(5).toDouble) //0.01
      .setNumIterations(args(6).toInt) //100
      .setNumCorrections(args(7).toInt) //10

    //训练
    val lrModel = lr.setNumClasses(2).run(training)//2分类
    lrModel.clearThreshold()

    //预测打分
    val predictionAndLabel = test.map(p=>(lrModel.predict(p.features),p.label))
    predictionAndLabel.map(x=>x._1+"\t"+x._2).repartition(1)
      .saveAsTextFile(args(2))
    val metrics = new BinaryClassificationMetrics(predictionAndLabel)

    //计算AUC
    val str = s"the value of auc is ${metrics.areaUnderROC()}"
    println(str)
  }
}
作者 east
Spark, 数据挖掘 11月 17,2020

Spark企业法人建模案例

数据格式如下:

字段含义参考上一节。

样例如下:

package com.koala.ch12

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.LogisticRegression

object CreditModel {

  // 创建评分模型属性class,对字段进行命名
  // 0,1,37,10,0,3,18,7,4
  case class Credit(load_label:Double,gender:Double,age:Double,yearsmarried:Double,children:Double,housenumber:Double,captiallevel:Double,facarnumber:Double,pacarnumber:Double)

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    if (args.length < 3){
      System.err.println("Usage:CreditModel <creaditInPath> <outPut> <model>")
      System.exit(1)
    }

    //2rd_data/ch12/creditdata.txt output/ch12/model local[2]
    val Array(creaditInPath,output,mode) = args

    // 创建Spark实例
    val spark = SparkSession.builder
      .master(mode)
      .appName("CreditModel Example")
      .getOrCreate()

    // 加载文本,并创建RDD数据源,将变量的名称赋予各个字段
    // Create an RDD of Credit objects from a text file, convert it to a Dataframe
    import spark.implicits._
    val creditDF = spark.sparkContext.textFile(creaditInPath).map(_.split(","))
      .map(attributes => Credit(attributes(0).trim.toDouble,attributes(1).trim.toDouble,attributes(2).trim.toDouble,attributes(3).trim.toDouble,attributes(4).trim.toDouble,attributes(5).trim.toDouble,attributes(6).trim.toDouble,attributes(7).trim.toDouble,attributes(8).trim.toDouble))
      .toDF()

    // Register the DataFrame as a temporary view
    // 创建临时视图
    creditDF.createOrReplaceTempView("creditdf")

    // 将查询结果放到sqlDF中,完成dataframe转化
    val sqlDF = spark.sql("select * from creditdf")
    sqlDF.show()

    // 自变量的列名
    val colArray2 = Array("gender","age","yearsmarried","children","housenumber","captiallevel","facarnumber","pacarnumber")
    // 设置DataFrame自变量集,并将这些变量统称为"features"
    val vecDF: DataFrame = new VectorAssembler().setInputCols(colArray2).setOutputCol("features").transform(sqlDF)

    // 按7:3划分成训练集和测试集,训练集为trainingDF,测试集为testDF
    val Array(trainingDF,testDF) = vecDF.randomSplit(Array(0.7, 0.3), seed=132) //seed随机算法从该数字开始生成随机数字

    // 建立逻辑回归模型,设置目标变量(标签)和自变量集,在训练集上训练
    val lrModel = new LogisticRegression().setLabelCol("load_label").setFeaturesCol("features").fit(trainingDF)
    // 输出逻辑回归的系数和截距
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // 惩罚项,如果是0,是L2惩罚,如果是0-1之间是混合,如果是1,则是L1惩罚,默认是L2
    lrModel.getElasticNetParam
    // 正则化的参数,一般大于等于0,默认是0
    lrModel.getRegParam
    // 拟合之前是否需要标准化,默认是true
    lrModel.getStandardization
    // 二分类中设置阈值,范围为[0,1],如果类标签的1的概率大于该阈值,则会判定为1,默认是0.5
    lrModel.getThreshold
    // 设置迭代的收敛容限,默认值为1e-6
    lrModel.getTol

    // 使用测试集进行预测,包括原始的字段,在加上综合的自变量集字段features,预测的原始值,转化的概率值,预测的类别
    lrModel.transform(testDF).show

    //具体的查看features,预测的原始值,转化的概率值,预测的类别
    lrModel.transform(testDF).select("features","rawPrediction","probability","prediction").show(30,false)

    //查看模型训练过程中损失的迭代情况
    val trainingSummary = lrModel.summary
    val objectiveHistory = trainingSummary.objectiveHistory
    objectiveHistory.foreach(loss => println(loss))
    //保存模型
    lrModel.save(output)
    //
    spark.close()
  }
}
作者 east

上一 1 … 4 5 6 … 9 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.