TransformerRegistry
类,用于注册、加载和管理数据转换器。以下是对各个部分的作用解释:
- 首先,该类维护了一个名为
registedTransformer
的映射,用于存储已注册的转换器信息。 - 在静态代码块中,内置了一些原生转换器实例,并注册到
registedTransformer
中。 loadTransformerFromLocalStorage
方法用于从本地存储加载转换器,可以选择加载指定的转换器。它遍历指定目录下的转换器文件,尝试加载
每个转换器,如果加载失败则记录错误日志。
loadTransformer
方法用于加载单个转换器。它根据转换器配置文件的路径加载配置,然后根据配置中的类名加载对应的类。根据类的类型(是否继承自 ComplexTransformer
或 Transformer
),将转换器实例注册到 registedTransformer
中。getTransformer
方法用于获取指定名称的转换器信息,从 registedTransformer
中查找,如果找不到则可能会从磁盘读取(TODO: 根据注释,这部分可能是未实现的功能)。registTransformer
和 registComplexTransformer
方法用于注册转换器。它们会检查转换器名称是否满足命名规则,并将转换器信息构建成 TransformerInfo
实例后添加到 registedTransformer
中。checkName
方法用于检查转换器名称是否满足命名规则,根据 isNative
参数判断是否需要以 “dx_” 开头。buildTransformerInfo
方法用于构建 TransformerInfo
实例,其中包含了转换器的类加载器、是否为原生转换器以及实际的转换器实例。getAllSuportTransformer
方法返回支持的所有转换器的名称列表。
这个类的主要作用是提供了转换器的注册、加载和管理功能,使得数据转换器可以被动态添加和使用。它在数据处理流程中,特别是数据抽取和转换阶段,起到了很重要的作用。
public class TransformerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();
static {
// 添加内置的一些原生转换器
// 本地存储和从服务器加载的转换器将延迟加载
registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
registTransformer(new DigestTransformer());
}
// 从本地存储加载转换器(默认情况下加载所有转换器)
public static void loadTransformerFromLocalStorage() {
loadTransformerFromLocalStorage(null);
}
// 从本地存储加载转换器(可选加载特定转换器)
public static void loadTransformerFromLocalStorage(List<String> transformers) {
String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();
if (null == paths) {
return;
}
for (final String each : paths) {
try {
if (transformers == null || transformers.contains(each)) {
loadTransformer(each);
}
} catch (Exception e) {
LOG.error(String.format("跳过转换器(%s)的加载,loadTransformer 出现异常(%s)", each, e.getMessage()), e);
}
}
}
// 加载指定的转换器
public static void loadTransformer(String each) {
String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
Configuration transformerConfiguration;
try {
transformerConfiguration = loadTransFormerConfig(transformerPath);
} catch (Exception e) {
LOG.error(String.format("跳过转换器(%s),加载 transformer.json 出错,路径 = %s", each, transformerPath), e);
return;
}
String className = transformerConfiguration.getString("class");
if (StringUtils.isEmpty(className)) {
LOG.error(String.format("跳过转换器(%s),未配置 class,路径 = %s,配置 = %s", each, transformerPath, transformerConfiguration.beautify()));
return;
}
String funName = transformerConfiguration.getString("name");
if (!each.equals(funName)) {
LOG.warn(String.format("转换器(%s) 的名称与 transformer.json 配置的名称[%s] 不匹配,将忽略 JSON 的名称,路径 = %s,配置 = %s", each, funName, transformerPath, transformerConfiguration.beautify()));
}
JarLoader jarLoader = new JarLoader(new String[]{transformerPath});
try {
Class<?> transformerClass = jarLoader.loadClass(className);
Object transformer = transformerClass.newInstance();
if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
((ComplexTransformer) transformer).setTransformerName(each);
registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
} else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
((Transformer) transformer).setTransformerName(each);
registTransformer((Transformer) transformer, jarLoader, false);
} else {
LOG.error(String.format("加载 Transformer 类(%s) 出错,路径 = %s", className, transformerPath));
}
} catch (Exception e) {
// 错误的转换器跳过
LOG.error(String.format("跳过转换器(%s),加载 Transformer 类出错,路径 = %s ", each, transformerPath), e);
}
}
private static Configuration loadTransFormerConfig(String transformerPath) {
return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
}
public static TransformerInfo getTransformer(String transformerName) {
TransformerInfo result = registedTransformer.get(transformerName);
// 如果 result == null,则尝试从磁盘读取
// TODO: 这部分可能是未实现的功能,待开发
return result;
}
public static synchronized void registTransformer(Transformer transformer) {
registTransformer(transformer, null, true);
}
public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) {
checkName(transformer.getTransformerName(), isNative);
if (registedTransformer.containsKey(transformer.getTransformerName())) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName());
}
registedTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader));
}
public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) {
checkName(complexTransformer.getTransformerName(), isNative);
if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName());
}
registedTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader));
}
private static void checkName(String functionName, boolean isNative) {
boolean checkResult = true;
if (isNative) {
if (!functionName.startsWith("dx_")) {
checkResult = false;
}
} else {
if (functionName.startsWith("dx_")) {
checkResult = false;
}
}
if (!checkResult) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative);
}
}
private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
TransformerInfo transformerInfo = new TransformerInfo();
transformerInfo.setClassLoader(classLoader);
transformerInfo.setIsNative(isNative);
transformerInfo.setTransformer(complexTransformer);
return transformerInfo;
}
public static List<String> getAllSuportTransformer() {
return new ArrayList<String>(registedTransformer.keySet());
}
}