gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档datax

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  大数据开发
  • 分类归档: "datax"
datax, Hive 5月 8,2024

解决datax写入hdfs到hive查不到数据

datax写入到Hive表的过程中。datax日志显示成功,使用hdfs dfs命令可以查看到文件,但是在Hive中查询数据为空。这种情况可能有以下几个可能的原因和解决方案:

  1. 数据格式不匹配:
    • 原因:可能是由于数据格式不匹配导致Hive无法正确解析数据。
    • 解决方案:确保数据文件中的列分隔符与Hive表中定义的字段分隔符一致。在这里,配置中指定了字段分隔符为\t,而Hive表中也使用了相同的字段分隔符,这一点已经满足。
  2. 数据位置不正确:
    • 原因:数据文件存储的位置与Hive表的分区定义不匹配。
    • 解决方案:检查数据文件的存储路径是否与Hive表的分区定义一致。
  3. 分区信息未正确加载:
    • 原因:Hive可能没有正确加载数据文件所在的分区信息。
    • 解决方案:使用MSCK REPAIR TABLE命令来修复表的分区信息,让Hive重新加载分区信息。
  4. 数据文件权限问题:
    • 原因:数据文件的权限设置不正确,导致Hive无法读取数据。
    • 解决方案:确保数据文件对Hive用户具有读取权限,可以通过设置文件权限或者在Hive用户组中添加权限。
  5. 数据写入问题:
    • 原因:数据写入到Hive表时出现了错误,导致数据并未正确写入。
    • 解决方案:检查DataX任务的日志,确认数据是否成功写入到Hive表中。如果写入失败,根据错误信息进行排查并修复。

datax的json配置如下:

   "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS":"hdfs://nameservice1",
            "hadoopConfig":{
              "dfs.nameservices": "nameservice1",
              "dfs.ha.namenodes.nameservice1": "namenode1,namenode2",
              "dfs.namenode.rpc-address.nameservice1.namenode1": "cdh01:8020",
              "dfs.namenode.rpc-address.nameservice1.namenode2": "cdh09:8020",
              "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
            },
            "fileType": "text",
            "path": "/user/hive/warehouse/test.db/tb_test",
            "fileName": "result",
            "column": [
              {
                "name": "pid",
                "type": "STRING"
              },
              {
                "name": "dqf",
                "type": "STRING"
              },
              {
                "name": "ptime",
                "type": "STRING"
              },
              {
                "name": "pvalue",
                "type": "STRING"
              },
              {
                "name": "ds",
                "type": "STRING"
              }
            ],
            "writeMode": "truncate",
            "fieldDelimiter": "\t"
          }
        }
      }

在hive表结构如下:

CREATE TABLE IF NOT EXISTS test.tb_test (
	pid STRING COMMENT '点号ID',
	dqf STRING COMMENT '数据质量码',
	ptime BIGINT COMMENT '时间',
	pvalue STRING COMMENT '数据值'
) COMMENT '昨日Po|rlfq数据历史表'
partitioned by (ds string COMMENT '日期')
row format delimited
fields terminated by "\t"
STORED AS TEXTFILE;

在这里,数据文件的存储路径为/user/hive/warehouse/test.db/tb_test,而Hive表定义的分区为partitioned by (ds string COMMENT '日期'),需要确认数据文件是否存储在/user/hive/warehouse/test.db/
tb_test
/ds=xxxx目录下。

把上面的表修改为非分区表,再次写入时果然有数据了。

作者 east
datax, 大数据开发, 运维 4月 14,2024

可视化ETL解决方案:Apache NiFi、DataX(加上DataX-Web)、Kettle这3个解决方案对比

1.Apache NiFi:

Apache NiFi是一个易于使用、功能强大的可视化ETL工具,它提供了一套直观的图形界面,让用户可以轻松地设计、管理和监控数据流。NiFi支持多种数据源和目标系统,具有强大的数据处理能力,如数据过滤、转换、聚合等。此外,NiFi还支持实时数据处理、批量数据处理以及两者的混合处理。

适用场景:

  • 数据源和目标系统种类繁多,需要灵活的数据处理能力 ,实现数据的抽取、转换和加载(ETL)工作,方便数据的迁移和同步 ;
  • 需要实时数据处理和监控的场景 , 可以通过监听数据源(如Kafka、Flume等)的实时数据流,实现数据的实时采集、处理和传输; 适用于需要实时数据处理的业务场景,如实时监控和报警系统 ;
  • 需要构建大规模、可扩展的数据流处理系统的场景
    ,如日志收集和分析、物联网数据处理等 。

支持的数据库类型:

  • 关系型数据库:如MySQL、PostgreSQL、Oracle、SQL Server等;
  • NoSQL数据库:如MongoDB、HBase等;
  • 列式存储数据库:如ClickHouse等;
  • 文件系统:如HDFS、本地文件系统等。

2. DataX(包括DataX-Web):

DataX是阿里巴巴开源的一款高性能、分布式、易用的数据同步工具,它支持多种数据源和目标系统,具有优秀的性能和稳定性。DataX-Web是DataX的Web版本,提供了可视化的操作界面,简化了数据同步任务的配置和管理。

适用场景:

  • 数据源和目标系统种类较多,但不需要像NiFi那样的复杂数据处理能力;
  • 需要进行大规模数据迁移和同步的场景;
  • 对于实时性要求不高,但需要保证数据一致性和可靠性的场景;
  • 需要简化数据同步任务配置和管理的场景。
  • 由于DataX的扩展性强,适合需要定制化数据同步任务的企业,可以通过编写自定义插件来满足特定的业务需求

支持的数据库类型:

  • 关系型数据库:如MySQL、PostgreSQL、Oracle、SQL Server等;
  • NoSQL数据库:如MongoDB、Cassandra等;
  • 列式存储数据库:如Infobright等;
  • 文件系统:如CSV文件、Excel文件等。

3. Kettle(Pentaho Data Integration):

Kettle是一款成熟、稳定的开源ETL工具,它提供了丰富的数据处理组件和可视化界面,支持多种数据源和目标系统。Kettle具有强大的调度和监控功能,可以满足复杂的数据处理需求。

适用场景:

  • 数据源和目标系统种类繁多,需要丰富的数据处理组件;
  • 需要进行复杂的ETL作业调度和监控的场景;
  • 对于实时性要求不高,但需要保证数据质量和一致性的场景;
  • 需要与其他Pentaho组件(如报表、数据挖掘等)集成的场景。

支持的数据库类型:

  • 关系型数据库:如MySQL、PostgreSQL、Oracle、SQL Server等;
  • NoSQL数据库:如MongoDB、Cassandra等;
  • 列式存储数据库:如Infobright等;
  • 文件系统:如CSV文件、Excel文件等。

总结:

  • 如果需要实时数据处理和监控,以及灵活的数据处理能力,可以选择Apache NiFi;
  • 如果需要进行大规模数据迁移和同步,以及简化数据同步任务配置和管理,可以选择DataX(包括DataX-Web);
  • 如果需要进行复杂的ETL作业调度和监控,以及与其他Pentaho组件集成,可以选择Kettle。
作者 east
datax 8月 30,2023

DataX对接数据脱敏数据的实例

datax对接mysql数据,对姓名只保留姓,名变成**。对这种简单的脱敏,可以不用修改datax源码,直接在配置文件上实现。

//要脱敏的字段在第2个,也就是record.getColumn(1)
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "name", // 姓名的字段
              "age"
            ],
            "splitPk": "",
            "connection": [
              {
                "table": [
                  "test"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "doriswriter",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "INT"
              },
              {
                "name": "name",
                "type": "VARCHAR"
              },
              {
                "name": "age",
                "type": "INT"
              }
            ],
            // 省略其他参数
          }
        },
        // 添加transformer部分
        "transformer": [
                                 {
                                "name": "dx_groovy",
                                "parameter": {
                                  "code": "Column name = record.getColumn(1);def first = name.asString()[0];def last =null; last= \"*\" * (name.asString().length() - 1);def masked = null; masked = first + last; record.setColumn(1, new StringColumn(masked)); return record;"             
                                 }
                                }],
    // 省略其他部分
  }
}
作者 east
datax 8月 25,2023

DataX Core TransformerRegistry类详细解读

TransformerRegistry 类,用于注册、加载和管理数据转换器。以下是对各个部分的作用解释:

  • 首先,该类维护了一个名为 registedTransformer 的映射,用于存储已注册的转换器信息。
  • 在静态代码块中,内置了一些原生转换器实例,并注册到 registedTransformer 中。
  • loadTransformerFromLocalStorage 方法用于从本地存储加载转换器,可以选择加载指定的转换器。它遍历指定目录下的转换器文件,尝试加载

每个转换器,如果加载失败则记录错误日志。

  • loadTransformer 方法用于加载单个转换器。它根据转换器配置文件的路径加载配置,然后根据配置中的类名加载对应的类。根据类的类型(是否继承自 ComplexTransformer 或 Transformer),将转换器实例注册到 registedTransformer 中。
  • getTransformer 方法用于获取指定名称的转换器信息,从 registedTransformer 中查找,如果找不到则可能会从磁盘读取(TODO: 根据注释,这部分可能是未实现的功能)。
  • registTransformer 和 registComplexTransformer 方法用于注册转换器。它们会检查转换器名称是否满足命名规则,并将转换器信息构建成 TransformerInfo 实例后添加到 registedTransformer 中。
  • checkName 方法用于检查转换器名称是否满足命名规则,根据 isNative 参数判断是否需要以 “dx_” 开头。
  • buildTransformerInfo 方法用于构建 TransformerInfo 实例,其中包含了转换器的类加载器、是否为原生转换器以及实际的转换器实例。
  • getAllSuportTransformer 方法返回支持的所有转换器的名称列表。

这个类的主要作用是提供了转换器的注册、加载和管理功能,使得数据转换器可以被动态添加和使用。它在数据处理流程中,特别是数据抽取和转换阶段,起到了很重要的作用。

public class TransformerRegistry {

    private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
    private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();

    static {
        // 添加内置的一些原生转换器
        // 本地存储和从服务器加载的转换器将延迟加载
        registTransformer(new SubstrTransformer());
        registTransformer(new PadTransformer());
        registTransformer(new ReplaceTransformer());
        registTransformer(new FilterTransformer());
        registTransformer(new GroovyTransformer());
        registTransformer(new DigestTransformer());
    }

    // 从本地存储加载转换器(默认情况下加载所有转换器)
    public static void loadTransformerFromLocalStorage() {
        loadTransformerFromLocalStorage(null);
    }

    // 从本地存储加载转换器(可选加载特定转换器)
    public static void loadTransformerFromLocalStorage(List<String> transformers) {
        String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();
        if (null == paths) {
            return;
        }

        for (final String each : paths) {
            try {
                if (transformers == null || transformers.contains(each)) {
                    loadTransformer(each);
                }
            } catch (Exception e) {
                LOG.error(String.format("跳过转换器(%s)的加载,loadTransformer 出现异常(%s)", each, e.getMessage()), e);
            }
        }
    }

    // 加载指定的转换器
    public static void loadTransformer(String each) {
        String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
        Configuration transformerConfiguration;
        try {
            transformerConfiguration = loadTransFormerConfig(transformerPath);
        } catch (Exception e) {
            LOG.error(String.format("跳过转换器(%s),加载 transformer.json 出错,路径 = %s", each, transformerPath), e);
            return;
        }

        String className = transformerConfiguration.getString("class");
        if (StringUtils.isEmpty(className)) {
            LOG.error(String.format("跳过转换器(%s),未配置 class,路径 = %s,配置 = %s", each, transformerPath, transformerConfiguration.beautify()));
            return;
        }

        String funName = transformerConfiguration.getString("name");
        if (!each.equals(funName)) {
            LOG.warn(String.format("转换器(%s) 的名称与 transformer.json 配置的名称[%s] 不匹配,将忽略 JSON 的名称,路径 = %s,配置 = %s", each, funName, transformerPath, transformerConfiguration.beautify()));
        }
        JarLoader jarLoader = new JarLoader(new String[]{transformerPath});

        try {
            Class<?> transformerClass = jarLoader.loadClass(className);
            Object transformer = transformerClass.newInstance();
            if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
                ((ComplexTransformer) transformer).setTransformerName(each);
                registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
            } else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
                ((Transformer) transformer).setTransformerName(each);
                registTransformer((Transformer) transformer, jarLoader, false);
            } else {
                LOG.error(String.format("加载 Transformer 类(%s) 出错,路径 = %s", className, transformerPath));
            }
        } catch (Exception e) {
            // 错误的转换器跳过
            LOG.error(String.format("跳过转换器(%s),加载 Transformer 类出错,路径 = %s ", each, transformerPath), e);
        }
    }

    private static Configuration loadTransFormerConfig(String transformerPath) {
        return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
    }

    public static TransformerInfo getTransformer(String transformerName) {
        TransformerInfo result = registedTransformer.get(transformerName);

        // 如果 result == null,则尝试从磁盘读取
        // TODO: 这部分可能是未实现的功能,待开发

        return result;
    }

    public static synchronized void registTransformer(Transformer transformer) {
        registTransformer(transformer, null, true);
    }

    public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) {
        checkName(transformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(transformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName());
        }

        registedTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader));
    }

    public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) {
        checkName(complexTransformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName());
        }

        registedTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader));
    }

    private static void checkName(String functionName, boolean isNative) {
        boolean checkResult = true;
        if (isNative) {
            if (!functionName.startsWith("dx_")) {
                checkResult = false;
            }
        } else {
            if (functionName.startsWith("dx_")) {
                checkResult = false;
            }
        }

        if (!checkResult) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative);
        }
    }

    private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
        TransformerInfo transformerInfo = new TransformerInfo();
        transformerInfo.setClassLoader(classLoader);
        transformerInfo.setIsNative(isNative);
        transformerInfo.setTransformer(complexTransformer);
        return transformerInfo;
    }

    public static List<String> getAllSuportTransformer() {
        return new ArrayList<String>(registedTransformer.keySet());
    }
}
作者 east

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?
  • C++ 中避免悬挂引用的企业策略有哪些?
  • 嵌入式电机:如何在低速和高负载状态下保持FOC(Field-Oriented Control)算法的电流控制稳定?
  • C++如何在插件式架构中使用反射实现模块隔离?
  • C++如何追踪内存泄漏(valgrind/ASan等)并定位到业务代码?
  • C++大型系统中如何组织头文件和依赖树?

文章归档

  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (43)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (4)
  • 大数据开发 (488)
    • CDH (6)
    • datax (4)
    • doris (30)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (40)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (99)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (70)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (6)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.