Spark开发规范

规则

Spark应用中,需引入Spark的类

  • 对于Java开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.api.java.JavaSparkContext
// RDD操作时引入的类。
import org.apache.spark.api.java.JavaRDD
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf
  • 对于Scala开发语言,正确示例:
// 创建SparkContext时所需引入的类。
import org.apache.spark.SparkContext
// RDD操作时引入的类。
import org.apache.spark.SparkContext._
// 创建SparkConf时引入的类。
import org.apache.spark.SparkConf

Java与Scala函数有区别,在编写应用时,需要弄清楚每个函数的功能

RDD是不可改变的,也就是说,RDD的元素对象是不能更改的,因此,在用Java和Scala编写需要弄清楚每个函数的功能。下面举个例子。

场景:现有用户位置数据,按照时间排序生成用户轨迹。在Scala中,按时间排序的代码如下:

/* 函数实现的功能是得到某个用户的位置轨迹。
 * 参数trajectory:由两部分组成-用户名和位置点(时间,经度,维度)
 */
private def getTimesOfOneUser(trajectory: (String, Seq[(String, Float, Float)]), zone: Zone, arrive: Boolean): Int =
{
// 先将用户位置点按时间排序
    val sorted: Seq[(String, Float, Float)] = trajectory._2.sortBy(x => x._1);

}

若用java实现上述功能,则需要将trajectory._2重新生成对象,而不能直接对trajectory._2进行排序操作。原因是Collections.sort(trajectory._2)这个操作会改变了trajectory._2这个对象本身,这违背了RDD元素不可更改这条规则;而Scala代码之所以能够正常运行,是因为sortBy( )这个函数生成了一个新的对象,它并不对trajectory._2直接操作。下面分别列出java实现的正确示例和错误示例。

正确示例:

// 将用户的位置点从新生成一个对象。
List<Tuple3< String, Float, Float >> list = new ArrayList<Tuple3< String, Float, Float >>( trajectory._2);
// 对新对象进行排序。
Collections.sort(list);

错误示例:

// 直接对用户位置点按照时间排序。
Collections.sort(trajectory._2);

分布式模式下,应注意Driver和Executor之间的参数传递

在Spark编程时,总是有一些代码逻辑中需要根据输入参数来判断,这种时候往往会使用这种方式,将参数设置为全局变量,先给定一个空值(null),在main函数中,实例化SparkContext对象之前对这个变量赋值。然而,在分布式模式下,执行程序的jar包会被发送到每个Executor上执行。而该变量只在main函数的节点改变了,并未传给执行任务的函数中,因此Executor将会报空指针异常。

正确示例:

object Test
{
  private var testArg: String = null;
  def main(args: Array[String])
  {
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
    .map(x => testFun(x, testArg));
  }

  private def testFun(line: String, testArg: String): String =
  {
    testArg.split(…);
    return …; 
  }
}

错误示例:

//定义对象。
object Test
{
  // 定义全局变量,赋为空值(null);在main函数中,实例化SparkContext对象之前对这个变量赋值。
  private var testArg: String = null;
  // main函数
  def main(args: Array[String])
  {
    
    testArg = …;
    val sc: SparkContext = new SparkContext(…);

    sc.textFile(…)
      .map(x => testFun(x));
  }

  private def testFun(line: String): String =
  {
    testArg.split(...);
    return …; 
  }
}

运行错误示例,在Spark的local模式下能正常运行,而在分布式模式情况下,会在蓝色代码处报错,提示空指针异常,这是由于在分布式模式下,执行程序的jar包会被发送到每个Executor上执行,当执行到testFun函数时,需要从内存中取出testArg的值,但是testArg的值只在启动main函数的节点改变了,其他节点无法获取这些变化,因此它们从内存中取出的就是初始化这个变量时的值null,这就是空指针异常的原因。

应用程序结束之前必须调用SparkContext.stop

利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。

说明:

利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。

利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。

以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。

正确示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

//应用程序结束
sc.stop()

错误示例:

//提交spark作业
val sc = new SparkContext(conf)

//具体的任务
...

如果不添加SparkContext.stop,YARN界面会显示失败。同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627

发表评论

邮箱地址不会被公开。 必填项已用*标注