RDD要获取Driver端的Map、List,或者Spark UDF 需要加载外部资源(如配置参数、白名单)初始化它们的实例。 Spark UDF的输入参数必须是数据列column,在UDF中进行如Redis查询、白/黑名单过滤前,若它们都能被序列化,从Driver端初始化+broadcast的方式可以完成构建。直接传递变量时,在spark用local方式时正常,用yarn-cluster时出现问题。
例如像下面这样:
class MyUDF implements UDF1<Long, String> {
private Map<Long, String> broadCastMap;
public MyUDF(Broadcast<Map<Long, String>> broadCastMap) {
this.broadCastMap = broadCastMap.value();
}
public String call(Long id) {
return id +" -> " + broadCastMap.getOrDefault(id, "No mapping");
}
}
Map<Long, String> map = new HashMap<>();
map.put(1L, "b");
map.put(2L, "c");
Broadcast<Map&<Long, String>> broadCastMap = new JavaSparkContext(spark.sparkContext()).broadcast(map);
UserDefinedFunction myUdf = udf(new MyUDF(broadCastMap), DataTypes.StringType);
spark.sqlContext().udf().register("myUdf", myUdf);
如果传递的是类,必须 实现了serializable接口
- 反序列化时serializable版本号不一致时会导致不能反序列化。
- 子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。 注意:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化
- 被关键字transient修饰的变量不能被序列化。
- 静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。
例如像下面这样:
import java.io.Serializable;
import java.util.*;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class AtKwdBo implements Serializable {
private Set<String> keywords;
private Set<String> stopwords;
/**
* just for test
* @return
*/
public static List<AtKwdBo> generateKeyWord() {
// Keyword
List<AtKwdBo> atKwdBos = new ArrayList<>();
AtKwdBo atKwdBo = new AtKwdBo();
Set<String> keywords = new HashSet<>();
keywords.add("小米手机");
keywords.add("雷军");
keywords.add("小米10周年");
atKwdBo.setKeywords(keywords);
Set<String> stopwords = new HashSet<>();
stopwords.add("华为手机");
atKwdBo.setStopwords(stopwords);
atKwdBos.add(atKwdBo);
return atKwdBos;
}
}