gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

年度归档2023

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 2023
  • ( 页面4 )
云计算 8月 28,2023

Dremio Cloud 评论:AWS 上快速灵活的数据湖屋

数据仓库和数据湖都可以保存大量数据进行分析。您可能还记得,数据仓库包含经过整理的结构化数据,具有在写入数据时应用的预先设计的模式,需要大量 CPU、SSD 和 RAM 以提高速度,并且旨在供业务分析师使用。数据湖包含更多非结构化或结构化数据,最初以原始格式存储,通常使用廉价的旋转磁盘,在读取数据时应用模式,过滤和转换原始数据以供分析,并且旨在供使用最初由数据工程师和数据科学家提供,一旦数据经过整理,业务分析师就可以使用这些数据。
数据湖屋,例如本次审查的主题 Dremio,弥合了数据仓库和数据湖之间的差距。他们从数据湖开始,添加快速 SQL、更高效的列式存储格式、数据目录和分析。
Dremio 将其产品描述为一个数据湖屋平台,供了解和喜爱 SQL 的团队使用。

根据 Dremio 的说法,Snowflake、Azure Synapse 和 Amazon Redshift 等云数据仓库会产生锁定,因为数据在仓库内部。我不完全同意这一点,但我同意将大量数据从一个云系统转移到另一个云系统确实很困难。

同样根据 Dremio 的说法,Dremio 和 Spark 等云数据湖提供了更大的灵活性,因为数据存储在多个引擎可以使用的地方。这是真的。 Dremio 声称由此产生的三个优势:
Dremio 的竞争对手包括 Databricks Lakehouse Platform、Ahana Presto、Trino(以前称为 Presto SQL)、Amazon Athena 和开源 Apache Spark。不太直接的竞争对手是支持外部表的数据仓库,例如 Snowflake 和 Azure Synapse。
Dremio 将所有企业数据仓库描绘成他们的竞争对手,但我认为这是营销,如果不是真正的炒作的话。毕竟,数据湖和数据仓库满足不同的用例并服务于不同的用户,尽管数据湖屋至少部分地跨越了这两个类别。

Dremio 服务器软件是适用于 Linux 的 Java 数据湖库应用程序,可以部署在 Kubernetes 集群、AWS 和 Azure 上。 Dremio Cloud 基本上是作为 AWS 上的完全托管服务运行的 Dremio 服务器软件。

Dremio Cloud 的功能分为虚拟私有云(VPC)、Dremio 的和您的,如下图所示。 Dremio 的 VPC 充当控制平面。您的 VPC 充当执行平面。如果您在 Dremio Cloud 中使用多个云帐户,则每个 VPC 都充当一个执行平面。
执行平面拥有多个集群,称为计算引擎。控制平面使用 Sonar 查询引擎处理 SQL 查询,并通过引擎管理器发送它们,引擎管理器根据您的规则将它们分派到适当的计算引擎。

Dremio 声称具有“反射”的亚秒级响应时间,“反射”是源数据或查询的优化物化,类似于物化视图。得益于 Apache Arrow,一种标准化的面向列的内存格式,Dremio 声称其原始速度比 Trino(Presto SQL 引擎的一种实现)快 3 倍。 Dremio 还声称,在没有指定比较点的情况下,由于 SQL DML、dbt 和 Dremio 的语义层,数据工程师可以在很短的时间内摄取、转换和提供数据。
Dremio 本身没有商业智能、机器学习或深度学习功能,但它有支持 BI、ML 和 DL 软件的驱动程序和连接器,例如 Tableau、Power BI 和 Jupyter Notebooks。它还可以连接到 Lakehouse 存储和外部关系数据库中表中的数据源。

Dremio Cloud 分为两个 Amazon 虚拟私有云 (VPC)。 Dremio 的 VPC 托管控制平面,包括 SQL 处理。您的 VPC 托管包含计算引擎的执行平面。
Dremio Arctic 是 Apache Iceberg 的智能元存储,Apache Iceberg 是一种用于大型分析数据集的开放表格式,由原生 Apache Iceberg 目录 Nessie 提供支持。 Arctic 为 Hive Metastore 提供了一种现代的云原生替代方案,由 Dremio 提供永久免费服务。

Arctic 提供以下功能:
Dremio 的大部分性能和功能取决于所使用的磁盘和内存数据文件格式。
Apache Arrow 由 Dremio 创建并为开源做出了贡献,它为平面和分层数据定义了一种独立于语言的列式内存格式,组织起来用于在 CPU 和 GPU 等现代硬件上进行高效的分析操作。 Arrow 内存格式还支持零拷贝读取,以实现闪电般快速的数据访问,而无需序列化开销。
Gandiva 是 Apache Arrow 的基于 LLVM 的矢量化执行引擎。 Arrow Flight 在 Apache Arrow 上实现 RPC(远程过程调用),并建立在 gRPC 之上。 gRPC 是来自 Google 的现代、开源、高性能 RPC 框架,可以在任何环境中运行; gRPC 通常比 REST 消息传输快 7 到 10 倍。

Apache Iceberg 是一种用于大型分析表的高性能格式。 Iceberg 为大数据带来了 SQL 表的可靠性和简单性,同时使 Sonar、Spark、Trino、Flink、Presto、Hive 和 Impala 等引擎可以同时安全地处理相同的表。 Iceberg 支持灵活的 SQL 命令来合并新数据、更新现有行和执行有针对性的删除。

Apache Parquet 是一种开源的、面向列的数据文件格式,专为高效的数据存储和检索而设计。它提供高效的数据压缩和编码方案,具有增强的性能,可以批量处理复杂数据。
据 Dremio 介绍,Apache Iceberg 数据文件格式由 Netflix、Apple 和其他技术巨头创建,支持任何引擎的 INSERT/UPDATE/DELETE,在开源社区中势头强劲。相比之下,再次根据 Dremio 的说法,Delta Lake 数据文件格式是由 Databricks 创建的,当在 AWS 上的 Databricks 平台上运行时,支持使用 Spark 的 INSERT/UPDATE 和使用任何 SQL 查询引擎的 SELECT。
Dremio 指出了开源版本的 Delta Lake 和在 AWS 上的 Databricks 平台上运行的 Delta Lake 版本之间的一个重要技术差异。例如,有一个允许 Trino 读写开源 Delta Lake 文件的连接器,以及一个允许基于 Scala 和 Java 的项目(包括 Apache Flink、Apache Hive、Apache Beam 和 PrestoDB)读写的库开源 Delta Lake。但是,这些工具无法安全地写入 AWS 上 Databricks 平台上的 Delta Lake 文件。
除了源自所用文件格式的查询性能之外,Dremio 还可以使用柱状云缓存和数据反射来加速查询。
Columnar Cloud Cache (C3) 使 Dremio 通过使用内置于云计算实例(例如 Amazon EC2 和 Azure 虚拟机)中的 NVMe/SSD 在 Amazon S3、Azure Data Lake Storage 和 Google Cloud Storage 上实现 NVMe 级 I/O 性能. C3 仅缓存满足您的工作负载所需的数据,甚至可以缓存数据集中的单个微块。如果您的表有 1,000 列并且您只查询这些列的一个子集并过滤特定时间范围内的数据,那么 C3 将只缓存您的表的那部分。根据 Dremio 的说法,通过有选择地缓存数据,C3 还显着降低了云存储 I/O 成本,这可能占您运行的每个查询成本的 10% 到 15%。
Dremio 的列式云缓存 (C3) 功能通过使用云实例中的 NVMe SSD 缓存先前查询使用的数据来加速未来的查询。

数据反射支持亚秒级 BI 查询,无需在分析之前创建多维数据集和汇总。数据反射是一种数据结构,可以智能地预先计算聚合和其他数据操作,因此您不必即时进行复杂的聚合和向下钻取。反射对最终用户是完全透明的。用户无需连接到特定的具体化,而是查询所需的表和视图,Dremio 优化器会选择最佳反射来满足和加速查询。

Dremio 采用多引擎架构,因此您可以为组织中的各种工作负载创建多个大小合适、物理隔离的引擎。您可以轻松设置工作负载管理规则,将查询路由到您定义的引擎,这样您就不必再担心复杂的数据科学工作负载会阻止高管的仪表板加载。除了消除资源争用之外,引擎还可以快速调整大小以处理任何并发性和吞吐量的工作负载,并在您不运行查询时自动停止。
Dremio 引擎本质上是配置为执行程序的可扩展实例集群。规则有助于将查询分派到所需的引擎。

Dremio Cloud 入门指南涵盖
我不会向您展示本教程的每一步,因为您可以自己阅读并在自己的免费帐户中运行它。

作者 east
云计算 8月 28,2023

站点可靠性工程:当今企业 IT 的当务之急

站点可靠性工程 (SRE) 正迅速成为现代 IT 运营的一个重要方面,尤其是在高度扩展的大数据环境中。随着企业和行业转向数字化并采用新的 IT 基础设施和技术以保持运营和竞争力,IT 团队需要一种新方法来找到和管理发布新系统和功能与确保这些系统和功能直观、可靠、对最终用户的友好程度也有所提高。

在过去几年中,对站点可靠性工程及其相关领域的兴趣激增。根据 LinkedIn 最近的一项调查,网站可靠性工程师被列为过去五年内增长最快的 25 个职业之一。但站点可靠性工程到底是什么?它如何影响数字企业完全满足甚至超过其服务水平目标 (SLO) 并实现其业务目标的能力,即使在大规模环境中也是如此?尽管没有完美的技术这样的东西,但拥有正确的流程可能会使世界变得不同。继续阅读以了解有关站点可靠性工程以及如何实施最佳实践以确保所有系统以最高效率和可靠性运行的更多信息。

什么是站点可靠性工程?

站点可靠性工程从软件工程的角度看待和处理 IT 操作。任务是持续监控 IT 系统、工具和功能,主要是它们的可用性、延迟、性能和容量。

站点可靠性工程师依靠软件来管理系统、查明问题并自动执行各种操作任务。 SRE 获取历史上分配给运营团队并由运营团队手动执行的任务,并将它们移交给站点可靠性工程师。然后 SRE 承担任务并利用自动化和标准化来解决问题并进一步提高整个生产系统的可靠性。

SRE 现在被视为创建和管理可扩展且高度可靠的软件系统的关键部分。借助 SRE,IT 团队和系统管理员可以通过代码管理和操作更大的系统。这种做法使他们能够扩展和维护数千或数十万台机器。

站点可靠性工程师做什么?

SRE 负责最大限度地提高计算机系统的可靠性和效率。 SRE 了解所有与计算机系统交互的人对该系统的期望,并努力满足这些期望。因此,SRE 充当软件工程和 IT 运营之间的粘合剂。 SRE 经常描述他们的工作是创造性地填补空白,让人们开心,从开发人员到最终用户再到管理团队成员。当您可以理所当然地认为您的所有系统都以最高效率和可靠性运行时,您就知道您的 SRE 做得很好。

站点可靠性工程师通常与 IT 运营和软件开发团队协同工作。 SRE 团队帮助 IT 运营部门提高其生产系统的可靠性。最重要的是,SR 团队可能会帮助 IT、支持和开发团队减少花在支持票和升级上的时间,从而使他们能够专注、开发和推出新的和改进的功能和服务。

企业任务站点可靠性工程师主动创建和实施旨在促进 IT 运营和支持的软件和服务。这可以从监控功能到在生产过程中代码发生变化时发送通知。 SRE 团队通常从头开始使用自己开发的工具,因为这使他们能够有效地处理软件交付或事件管理中的问题。

还可以部署 SRE 团队来处理支持升级。然而,随着系统的成熟,它们变得可靠。这样一来,生产中的关键事件就会减少,从而转化为支持升级的次数也会减少。站点可靠性工程师在软件工程和 IT 运营方面积累了如此多的知识,以至于他们自己成为了强大的支持团队,帮助组织将问题转给合适的人。

由于涉及软件开发和 IT 的许多方面,站点可靠性工程师还参与了部落知识的文档编制。 SRE 团队还执行文档后工作,例如持续维护和运行手册,以保持知识的质量和完整性得到更新和完整。

站点可靠性工程师通常承担随叫随到的责任。鉴于他们接触过工程和 IT 的各个领域,SRE 团队不断协作以提高系统可靠性并优化随叫随到的流程。

大数据环境中的 SRE 最佳实践

没有完美的 SRE 策略。任何站点可靠性框架都需要不断完善,以确保满足运营需求。以下 SRE 原则和最佳实践将帮助大数据组织根据他们的要求执行和定制他们的 SRE 策略。

站点可靠性工程师与 DevOps 工程师与软件工程师

站点可靠性工程师是专注于开发的 IT 专业人员,他们致力于开发和实施解决可靠性、可用性和规模问题的解决方案。另一方面,DevOps 工程师是专注于解决开发管道问题的运维人员。虽然这两个职业之间存在分歧,但两组工程师都会定期跨越鸿沟,向对方提供他们的专业知识和意见,反之亦然。

站点可靠性工程师保持他们的服务运行并可供用户使用,DevOps 涵盖从端到端的产品生命周期,目标是基于敏捷技术使所有流程连续进行。在整个产品生命周期中提供连续性是加快上市时间和实施快速变更的关键。

虽然站点可靠性工程师和软件工程师的角色在一定程度上重叠,但这两个职业之间存在重大差异。软件工程师设计和编写软件解决方案。在大多数情况下,软件工程师会将部署成本以及应用程序更新和维护成本考虑在内。

SRE 不是对操作了解一两件事的开发人员,也不是编写代码的操作人员。对于您的开发团队来说,这是一门全新的独立学科。 SRE 带来了部署、配置管理、监控和指标方面的专业知识。 SRE 专注于提高应用程序性能,使开发人员能够专注于功能改进和 IT 运营,从而专注于管理基础设施。当 SRE 积极参与时,开发人员和 IT 运营人员可以自由地做他们最擅长的事情。

什么是 SRE 框架?

站点可靠性工程框架基于以下原则构建。

SRE 创建各种框架模块,作为为特定生产领域设计的解决方案的实施指南。 SRE 框架本质上指导工程师如何实现软件组件以及集成这些组件的规范方法。

SRE 框架在效率和一致性方面为工程师和开发人员提供了多种好处。一方面,它们使开发人员不必以特定于服务的临时方式查找、拼凑和配置各个组件。

这些框架为生产问题提供单一解决方案,可在各种服务中重复使用。框架用户使用通用的实施规则和最小的配置差异来执行他们的生产和其他流程。

Spark 大数据应用程序的另一个示例是调整以减少或消除数据倾斜。数据倾斜导致某些应用程序元素的工作时间超过它们应有的时间,而其他计算资源则闲置,未得到充分利用。 Spark 对数据倾斜高度敏感,对于高度分布式和瘫痪的应用程序,它可能具有很大的破坏性。

一旦对计算机系统进行了最佳调整,SRE 最终可能会说:“我们所有的应用程序都在无故障地运行,并且我们始终如一地满足 SLA。”为此,SRE 需要正确的可观察性工具来帮助他们确定内存利用率、数据倾斜和其他可能出现的问题。

作者 east
datax 8月 25,2023

DataX Core TransformerRegistry类详细解读

TransformerRegistry 类,用于注册、加载和管理数据转换器。以下是对各个部分的作用解释:

  • 首先,该类维护了一个名为 registedTransformer 的映射,用于存储已注册的转换器信息。
  • 在静态代码块中,内置了一些原生转换器实例,并注册到 registedTransformer 中。
  • loadTransformerFromLocalStorage 方法用于从本地存储加载转换器,可以选择加载指定的转换器。它遍历指定目录下的转换器文件,尝试加载

每个转换器,如果加载失败则记录错误日志。

  • loadTransformer 方法用于加载单个转换器。它根据转换器配置文件的路径加载配置,然后根据配置中的类名加载对应的类。根据类的类型(是否继承自 ComplexTransformer 或 Transformer),将转换器实例注册到 registedTransformer 中。
  • getTransformer 方法用于获取指定名称的转换器信息,从 registedTransformer 中查找,如果找不到则可能会从磁盘读取(TODO: 根据注释,这部分可能是未实现的功能)。
  • registTransformer 和 registComplexTransformer 方法用于注册转换器。它们会检查转换器名称是否满足命名规则,并将转换器信息构建成 TransformerInfo 实例后添加到 registedTransformer 中。
  • checkName 方法用于检查转换器名称是否满足命名规则,根据 isNative 参数判断是否需要以 “dx_” 开头。
  • buildTransformerInfo 方法用于构建 TransformerInfo 实例,其中包含了转换器的类加载器、是否为原生转换器以及实际的转换器实例。
  • getAllSuportTransformer 方法返回支持的所有转换器的名称列表。

这个类的主要作用是提供了转换器的注册、加载和管理功能,使得数据转换器可以被动态添加和使用。它在数据处理流程中,特别是数据抽取和转换阶段,起到了很重要的作用。

public class TransformerRegistry {

    private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
    private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();

    static {
        // 添加内置的一些原生转换器
        // 本地存储和从服务器加载的转换器将延迟加载
        registTransformer(new SubstrTransformer());
        registTransformer(new PadTransformer());
        registTransformer(new ReplaceTransformer());
        registTransformer(new FilterTransformer());
        registTransformer(new GroovyTransformer());
        registTransformer(new DigestTransformer());
    }

    // 从本地存储加载转换器(默认情况下加载所有转换器)
    public static void loadTransformerFromLocalStorage() {
        loadTransformerFromLocalStorage(null);
    }

    // 从本地存储加载转换器(可选加载特定转换器)
    public static void loadTransformerFromLocalStorage(List<String> transformers) {
        String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();
        if (null == paths) {
            return;
        }

        for (final String each : paths) {
            try {
                if (transformers == null || transformers.contains(each)) {
                    loadTransformer(each);
                }
            } catch (Exception e) {
                LOG.error(String.format("跳过转换器(%s)的加载,loadTransformer 出现异常(%s)", each, e.getMessage()), e);
            }
        }
    }

    // 加载指定的转换器
    public static void loadTransformer(String each) {
        String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
        Configuration transformerConfiguration;
        try {
            transformerConfiguration = loadTransFormerConfig(transformerPath);
        } catch (Exception e) {
            LOG.error(String.format("跳过转换器(%s),加载 transformer.json 出错,路径 = %s", each, transformerPath), e);
            return;
        }

        String className = transformerConfiguration.getString("class");
        if (StringUtils.isEmpty(className)) {
            LOG.error(String.format("跳过转换器(%s),未配置 class,路径 = %s,配置 = %s", each, transformerPath, transformerConfiguration.beautify()));
            return;
        }

        String funName = transformerConfiguration.getString("name");
        if (!each.equals(funName)) {
            LOG.warn(String.format("转换器(%s) 的名称与 transformer.json 配置的名称[%s] 不匹配,将忽略 JSON 的名称,路径 = %s,配置 = %s", each, funName, transformerPath, transformerConfiguration.beautify()));
        }
        JarLoader jarLoader = new JarLoader(new String[]{transformerPath});

        try {
            Class<?> transformerClass = jarLoader.loadClass(className);
            Object transformer = transformerClass.newInstance();
            if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
                ((ComplexTransformer) transformer).setTransformerName(each);
                registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
            } else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
                ((Transformer) transformer).setTransformerName(each);
                registTransformer((Transformer) transformer, jarLoader, false);
            } else {
                LOG.error(String.format("加载 Transformer 类(%s) 出错,路径 = %s", className, transformerPath));
            }
        } catch (Exception e) {
            // 错误的转换器跳过
            LOG.error(String.format("跳过转换器(%s),加载 Transformer 类出错,路径 = %s ", each, transformerPath), e);
        }
    }

    private static Configuration loadTransFormerConfig(String transformerPath) {
        return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
    }

    public static TransformerInfo getTransformer(String transformerName) {
        TransformerInfo result = registedTransformer.get(transformerName);

        // 如果 result == null,则尝试从磁盘读取
        // TODO: 这部分可能是未实现的功能,待开发

        return result;
    }

    public static synchronized void registTransformer(Transformer transformer) {
        registTransformer(transformer, null, true);
    }

    public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) {
        checkName(transformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(transformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName());
        }

        registedTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader));
    }

    public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) {
        checkName(complexTransformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName());
        }

        registedTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader));
    }

    private static void checkName(String functionName, boolean isNative) {
        boolean checkResult = true;
        if (isNative) {
            if (!functionName.startsWith("dx_")) {
                checkResult = false;
            }
        } else {
            if (functionName.startsWith("dx_")) {
                checkResult = false;
            }
        }

        if (!checkResult) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative);
        }
    }

    private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
        TransformerInfo transformerInfo = new TransformerInfo();
        transformerInfo.setClassLoader(classLoader);
        transformerInfo.setIsNative(isNative);
        transformerInfo.setTransformer(complexTransformer);
        return transformerInfo;
    }

    public static List<String> getAllSuportTransformer() {
        return new ArrayList<String>(registedTransformer.keySet());
    }
}
作者 east
doris 8月 25,2023

DataX DorisWriter 插件DorisStreamLoadObserver类详细解读

DorisStreamLoadObserver 类是一个用于将数据加载到 Doris(以前称为 Palo)数据库中并监视加载过程的 Java 类。该类提供了一组方法,用于构建 HTTP 请求、处理 HTTP 响应以及监控数据加载的状态。以下是每个方法的具体作用:

  1. DorisStreamLoadObserver(Keys options): 这是类的构造函数,用于初始化加载数据所需的配置选项。
  2. void streamLoad(WriterTuple data) throws Exception: 该方法是数据加载的主要方法。它将给定的数据(WriterTuple 对象)加载到 Doris 数据库中。它构建了用于将数据发送到 Doris 的 HTTP 请求,并根据响应状态来确定加载是否成功。如果加载失败,它会抛出异常。
  3. private void checkStreamLoadState(String host, String label) throws IOException: 这个方法用于检查数据加载的状态。它会不断地轮询 Doris 服务器,以获取特定加载任务的最终状态。根据加载状态的不同,它可能会抛出异常或者在加载完成时返回。
  4. private byte[] addRows(List<byte[]> rows, int totalBytes): 此方法根据给定的数据行和总字节数,构建用于加载的字节数组。它根据配置中的数据格式(CSV 或 JSON)将数据行连接起来,并添加适当的分隔符。
  5. private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException: 该方法执行 HTTP PUT 请求,将数据加载到 Doris 数据库中。它构建了包含数据的请求实体,发送到指定的加载 URL,并解析响应以获取加载结果。
  6. private String getBasicAuthHeader(String username, String password): 此方法用于生成基本身份验证头部,以便在 HTTP 请求中进行身份验证。
  7. private HttpEntity getHttpEntity(CloseableHttpResponse response): 这是一个实用方法,用于从 HTTP 响应中提取实体内容。
  8. private String getLoadHost(): 该方法从配置选项中获取用于加载数据的主机地址列表,并尝试连接到这些主机以检查其可用性。它会返回第一个可用的主机地址。

DorisStreamLoadObserver 类主要用于处理数据加载任务,它负责构建适当的 HTTP 请求,将数据发送到 Doris 数据库,并监控加载任务的状态。通过这些方法,可以实现将数据从外部系统加载到 Doris 数据库中,并在加载过程中进行必要的状态检查和错误处理。

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class DorisStreamLoadObserver {
    private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);

    private Keys options;

    private long pos;
    private static final String RESULT_FAILED = "Fail";
    private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
    private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
    private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
    private static final String RESULT_LABEL_PREPARE = "PREPARE";
    private static final String RESULT_LABEL_ABORTED = "ABORTED";
    private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";

    public DorisStreamLoadObserver(Keys options) {
        this.options = options;
    }

    // 数据写入 Doris 的主要方法
    public void streamLoad(WriterTuple data) throws Exception {
        String host = getLoadHost();
        if (host == null) {
            throw new IOException("load_url cannot be empty, or the host cannot connect. Please check your configuration.");
        }
        String loadUrl = new StringBuilder(host)
                .append("/api/")
                .append(options.getDatabase())
                .append("/")
                .append(options.getTable())
                .append("/_stream_load")
                .toString();
        LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
        Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
        LOG.info("StreamLoad response :{}", JSONValue.toJSONString(loadResult));
        final String keyStatus = "Status";
        if (null == loadResult || !loadResult.containsKey(keyStatus)) {
            throw new IOException("Unable to flush data to Doris: unknown result status.");
        }
        LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
        if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
            throw new IOException(
                    new StringBuilder("Failed to flush data to Doris.\n").append(JSONValue.toJSONString(loadResult)).toString()
            );
        } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
            LOG.debug("StreamLoad response:{}", JSONValue.toJSONString(loadResult));
            checkStreamLoadState(host, data.getLabel());
        }
    }

    // 检查数据加载状态的方法
    private void checkStreamLoadState(String host, String label) throws IOException {
        int idx = 0;
        while (true) {
            try {
                TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
            } catch (InterruptedException ex) {
                break;
            }
            try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
                HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());
                httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
                httpGet.setHeader("Connection", "close");

                try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
                    HttpEntity respEntity = getHttpEntity(resp);
                    if (respEntity == null) {
                        throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                "could not get the final state of label[%s].\n", label), null);
                    }
                    Map<String, Object> result = (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
                    String labelState = (String) result.get("state");
                    if (null == labelState) {
                        throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
                    }
                    LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
                    switch (labelState) {
                        case LAEBL_STATE_VISIBLE:
                        case LAEBL_STATE_COMMITTED:
                            return;
                        case RESULT_LABEL_PREPARE:
                            continue;
                        case RESULT_LABEL_ABORTED:
                            throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, Error " +
                                    "label[%s] state[%s]\n", label, labelState), null, true);
                        case RESULT_LABEL_UNKNOWN:
                        default:
                            throw new IOException(String.format("Failed to flush data to Doris, Error " +
                                    "label[%s] state[%s]\n", label, labelState), null);
                    }
                }
            }
        }
    }

    // 根据格式将数据行拼接成字节数组
    private byte[] addRows(List<byte[]> rows, int totalBytes) {
        if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
            Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
            byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
            for (byte[] row : rows) {
                bos.put(row);
                bos.put(lineDelimiter);
            }
            return bos.array();
        }

        if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
            ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
            bos.put("[".getBytes(StandardCharsets.UTF_8));
            byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
            boolean isFirstElement = true;
            for (byte[] row : rows) {
                if (!isFirstElement) {
                    bos.put(jsonDelimiter);
                }
                bos.put(row);
                isFirstElement = false;
            }
            bos.put("]".getBytes(StandardCharsets.UTF_8));
            return bos.array();
        }
        throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
    }

private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
        RequestConfig requestConfig = RequestConfig.custom()
                .setSocketTimeout(120 * 1000)
                .setConnectTimeout(120 * 1000)
                .setConnectionRequestTimeout(120 * 1000)
                .build();
        try (CloseableHttpClient httpclient = HttpClientBuilder.create()
                .setDefaultRequestConfig(requestConfig)
                .setRedirectStrategy(new DefaultRedirectStrategy())
                .build()) {
            HttpPut httpPut = new HttpPut(loadUrl);
            httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
            httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
            httpPut.setEntity(new ByteArrayEntity(data));
            try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
                HttpEntity respEntity = getHttpEntity(resp);
                if (respEntity == null) {
                    throw new IOException("Failed to flush data to Doris, Error could not get the response entity.");
                }
                return (Map<String, Object>) JSONValue.parse(EntityUtils.toString(respEntity));
            }
        }
    }

    // 构造 HTTP 请求中的基本认证头部
    private String getBasicAuthHeader(String username, String password) {
        String credentials = username + ":" + password;
        byte[] credentialsBytes = credentials.getBytes(StandardCharsets.UTF_8);
        String base64Credentials = Base64.encodeBase64String(credentialsBytes);
        return "Basic " + base64Credentials;
    }

    // 从 HTTP 响应中获取实体内容
    private HttpEntity getHttpEntity(CloseableHttpResponse response) {
        if (response != null) {
            return response.getEntity();
        }
        return null;
    }

    // 获取用于加载数据的主机地址
    private String getLoadHost() {
        List<String> hosts = options.getDorisStreamLoadUrls();
        for (String host : hosts) {
            try {
                HttpURLConnection connection = (HttpURLConnection) new URL(host).openConnection();
                connection.setRequestMethod("HEAD");
                int responseCode = connection.getResponseCode();
                if (responseCode == HttpURLConnection.HTTP_OK) {
                    return host;
                }
            } catch (IOException e) {
                LOG.warn("Failed to connect to host: {}", host);
            }
        }
        return null;
    }
}
作者 east
doris 8月 25,2023

DataX DorisWriter 插件DorisWriterManager类详细解读

DorisWriterManager 的类,用于将数据写入到 Doris 中。以下是代码的具体作用和功能解释:

  1. 导入必要的包和类: 代码开头导入了所需的包和类,包括日志记录、线程池、字符编码和其他相关工具类。
  2. 类成员变量定义: 下面是一些类的成员变量定义,这些变量在类的不同方法中使用:
    • LOG: 用于记录日志的 Logger 对象。
    • visitor: DorisStreamLoadObserver 类的实例,用于处理数据写入 Doris 的观察者。
    • options: Keys 类的实例,包含了一些配置选项。
    • buffer: 存储待写入 Doris 的数据。
    • batchCount: 当前批次中的记录数量。
    • batchSize: 当前批次中的数据大小。
    • closed: 标志位,表示是否已关闭写入。
    • flushException: 异步刷新数据时可能发生的异常。
    • flushQueue: 用于异步刷新数据的队列。
    • scheduler: 用于定期刷新数据的调度器。
    • scheduledFuture: 用于取消定时任务的句柄。
  3. 构造函数 DorisWriterManager: 构造函数接受一个 Keys 对象作为参数,设置了初始化的配置信息,并初始化了 visitor 和 flushQueue。接着,它调用 startScheduler() 启动定期刷新任务,以及 startAsyncFlushing() 启动异步刷新线程。
  4. startScheduler() 方法: 此方法负责启动定时刷新任务。它首先调用 stopScheduler() 停止之前的定时任务。然后,创建一个单线程的调度器(scheduler),并设置一个定时任务,定期触发数据刷新操作。在定时任务内部,它会检查是否关闭了写入操作,然后根据配置信息进行数据刷新。如果当前批次为空,重新启动定时任务,确保数据持续刷新。
  5. stopScheduler() 方法: 此方法用于停止定时任务。它会取消之前的定时任务并关闭调度器。
  6. writeRecord(String record) 方法: 该方法用于将记录写入缓冲区。它首先调用 checkFlushException() 方法检查是否存在刷新异常。然后,将记录转换成字节数组并添加到缓冲区中,同时更新批次计数和数据大小。如果当前批次的记录数量或数据大小超过了阈值,就会触发数据刷新。
  7. flush(String label, boolean waitUntilDone) 方法: 此方法用于手动触发数据刷新操作。它首先检查是否存在刷新异常,然后根据当前批次的情况决定是否执行刷新。如果当前批次为空,且 waitUntilDone 为真,它会等待之前的异步刷新操作完成。否则,它将当前批次的数据放入刷新队列,并根据 waitUntilDone 参数决定是否等待刷新操作完成。
  8. close() 方法: 此方法用于关闭 DorisWriterManager。它首先检查是否已经关闭,然后触发一次最终的数据刷新操作。如果当前批次有数据,会记录相应日志。最后,它检查是否有刷新异常并抛出相应异常。
  9. createBatchLabel() 方法: 此方法用于创建批次标签,用于标识一批数据。它根据配置的前缀和随机 UUID 生成标签。
  10. startAsyncFlushing() 方法: 此方法启动一个异步刷新线程。线程会循环调用 asyncFlush() 方法,将数据异步刷新到 Doris 中。
  11. waitAsyncFlushingDone() 方法: 该方法用于等待之前的异步刷新操作完成。它向刷新队列添加空的 WriterTuple,以确保之前的刷新操作完成。然后,它检查是否存在刷新异常。
  12. asyncFlush() 方法: 此方法用于异步刷新数据到 Doris。它从刷新队列中取出 WriterTuple,然后根据批次的标签执行数据刷新操作。如果发生异常,它会尝试多次,直到达到最大重试次数。如果需要重新创建批次标签,则生成新的标签。重试之间会休眠一段时间。成功后,重新启动定时任务。
  13. checkFlushException() 方法: 此方法用于检查是否存在刷新异常,如果存在则抛出异常。

这个 DorisWriterManager 类的目的是管理数据写入到 Doris 数据库的操作。它通过定时任务和异步刷新线程来控制数据的批量写入,同时处理异常情况,确保数据的稳定写入。

添加详细注释代码如下:

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DorisWriterManager {

    private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);

    private final DorisStreamLoadObserver visitor;
    private final Keys options;
    private final List<byte[]> buffer = new ArrayList<>(); // 缓冲区,用于存储待写入 Doris 的数据
    private int batchCount = 0; // 当前批次中的记录数量
    private long batchSize = 0; // 当前批次中的数据大小
    private volatile boolean closed = false; // 标志位,表示是否已关闭
    private volatile Exception flushException; // 异步刷新数据时可能发生的异常
    private final LinkedBlockingDeque<WriterTuple> flushQueue; // 用于异步刷新数据的队列
    private ScheduledExecutorService scheduler; // 用于定期刷新数据的调度器
    private ScheduledFuture<?> scheduledFuture;

    public DorisWriterManager(Keys options) {
        this.options = options;
        this.visitor = new DorisStreamLoadObserver(options);
        flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
        this.startScheduler(); // 启动定期刷新调度器
        this.startAsyncFlushing(); // 启动异步刷新线程
    }

    // 启动定期刷新调度器
    public void startScheduler() {
        stopScheduler(); // 停止之前的调度器
        this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder()
                .namingPattern("Doris-interval-flush").daemon(true).build());
        this.scheduledFuture = this.scheduler.schedule(() -> {
            synchronized (DorisWriterManager.this) {
                if (!closed) {
                    try {
                        String label = createBatchLabel();
                        LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label));
                        if (batchCount == 0) {
                            startScheduler(); // 如果当前批次为空,重新启动定时任务
                        }
                        flush(label, false);
                    } catch (Exception e) {
                        flushException = e;
                    }
                }
            }
        }, options.getFlushInterval(), TimeUnit.MILLISECONDS);
    }

    // 停止定期刷新调度器
    public void stopScheduler() {
        if (this.scheduledFuture != null) {
            scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
    }

    // 写入一条记录到缓冲区
    public final synchronized void writeRecord(String record) throws IOException {
        checkFlushException(); // 检查是否有刷新异常
        try {
            byte[] bts = record.getBytes(StandardCharsets.UTF_8);
            buffer.add(bts);
            batchCount++;
            batchSize += bts.length;
            if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
                String label = createBatchLabel();
                LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
                flush(label, false); // 当记录数量或数据大小超过阈值时触发刷新
            }
        } catch (Exception e) {
            throw new IOException("Writing records to Doris failed.", e);
        }
    }

    // 手动触发刷新缓冲区的数据
    public synchronized void flush(String label, boolean waitUntilDone) throws Exception {
        checkFlushException(); // 检查是否有刷新异常
        if (batchCount == 0) {
            if (waitUntilDone) {
                waitAsyncFlushingDone(); // 如果当前批次为空,等待之前的刷新操作完成
            }
            return;
        }
        flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer))); // 将数据放入刷新队列
        if (waitUntilDone) {
            waitAsyncFlushingDone(); // 等待刷新操作完成
        }
        buffer.clear();
        batchCount = 0;
        batchSize = 0;
    }

    // 关闭 DorisWriterManager,触发最后一次刷新操作
    public synchronized void close() {
        if (!closed) {
            closed = true;
            try {
                String label = createBatchLabel();
                if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));
                flush(label, true); // 关闭时触发刷新操作
            } catch (Exception e) {
                throw new RuntimeException("Writing records to Doris failed.", e);
            }
        }
        checkFlushException();
    }

    // 创建批次标签,通常用于标识一批数据
    public String createBatchLabel() {
        StringBuilder sb = new StringBuilder();
        if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {
            sb.append(options.getLabelPrefix());
        }
        return sb.append(UUID.randomUUID().toString()).toString();
    }

    // 启动异步刷新线程
    private void startAsyncFlushing() {
        Thread flushThread = new Thread(new Runnable() {
            public void run() {
                while (true) {
                    try {
                        asyncFlush(); // 异步刷新数据
                    } catch (Exception e) {
                        flushException = e;
                    }
                }
            }
        });
        flushThread.setDaemon(true);
        flushThread.start();
    }

    // 等待之前的刷新操作完成
    private void waitAsyncFlushingDone() throws InterruptedException {
        for (int i = 0; i <= options.getFlushQueueLength(); i++) {
            flushQueue.put(new WriterTuple("", 0L, null));
        }
        checkFlushException();
    }

    // 异步刷新数据到 Doris
    private void asyncFlush() throws Exception {
        WriterTuple flushData = flushQueue.take();
        if (Strings.isNullOrEmpty(flushData.getLabel())) {
            return;
        }
        stopScheduler(); // 停止定时任务
        LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
        for (int i = 0; i <= options.getMaxRetries(); i++) {
            try {
                // 利用 DorisStreamLoadObserver 进行数据刷新
                visitor.streamLoad(flushData);
                LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
                startScheduler(); // 
     break;
            } catch (Exception e) {
                LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);
                if (i >= options.getMaxRetries()) {
                    throw new IOException(e);
                }
                if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {
                    String newLabel = createBatchLabel();
                    LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
                    flushData.setLabel(newLabel);
                }
                try {
                    Thread.sleep(1000l * Math.min(i + 1, 10));
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush, interrupted while doing another attempt", e);
                }
            }
        }
    }

    private void checkFlushException() {
        if (flushException != null) {
            throw new RuntimeException("Writing records to Doris failed.", flushException);
        }
    }
}
作者 east
大数据开发 8月 24,2023

银行业数字化转型

大多数公司会愉快地谈论他们如何进行“数字化转型”。问高管这对他们的组织意味着什么,通常你会得到一份精心策划的词沙拉。这是因为在可以衡量之前,银行业的数字化转型是一个模糊的术语。数字部分相当简单。这是难以量化的变革方面。

推动数字化转型的是需要不断改进所有类别客户的用户体验。无论是零售银行客户、中小型商业实体、交易对手方,还是接受资金和金融服务的上市公司,他们都希望对每笔交易和查询做出比以往更好的高效和定制化响应。前一个。

对于一些公司来说,这是一个让自己脱颖而出并抢占更多市场份额的绝好机会,因为客户可以轻松地从一家供应商转移到另一家供应商。为了帮助读者踏上这段旅程,这篇文章强调了测量的概念,认为它是数字化转型中最实用、最可操作的方面之一。如果底层服务不处于持续优化的状态,客户体验就无法细化,无法衡量的东西就无法优化。

作为一个社会,我们已经从模拟转向数字。但从转型的角度来看,例如,通过传真机运行文档与扫描和通过电子邮件发送文档没有太大区别。这是因为您仍在分发相同的文档,尽管效率更高。

有人会认为机器人处理自动化 (RPA) 是将您推向数字化转型下一个领域的绝佳方式。但如果出于相同原因发送相同类型的文档,则情况并非如此,即使发送速度更快、规模更大。在这种情况下,您可以自豪地将运营效率添加到您的 LinkedIn 个人资料中,以及一系列节省成本和改进资源分配的好处。这是当之无愧的,因为它本身就是一个挑战,但它不是数字化转型。

那么什么是数字化转型?简单来说,就是将数字技术嵌入到业务运营的各个方面,从而导致思维、模式和行为发生变化(转变)。

这不仅仅是将技术应用于业务,因为公司已经这样做了几十年。数字化转型是对公司如何利用人工智能、机器学习和大数据等先进技术构建不断发展的自动化流程的根本性反思。自动化不再是人为驱动的任务,而是一个由人监督的自动化过程,其中每个决策和行动都是通过经验证据和对市场驱动事件的分析来精确确定的。

如果执行得当,就会启动新的业务模型,这些模型会应用从与越来越多的场景中越来越多的客户实时交互中获得的见解。

数字化转型的一个当代例子是汽车保险理赔处理。传统上,这是一项纸张密集型操作,遵循非常线性的检查和批准流程,一次一个文件。然而,对于一些创新型公司来说,它已经转变为一种低接触的全自动端到端流程。 “文书工作”不仅效率更高,而且处理索赔的方式也从根本上从成本控制功能转变为交叉销售和追加销售机会。理赔流程现在是一种方便的自助式体验,还可以建议额外的、定制的服务,这些服务与客户在那个时间点和地点的情绪状态完全相关。

为了满足客户的期望,该行业进行了数字化转型,这个例子在金融服务行业的所有领域都在发生。

早期的汽车保险应用程序允许客户拍摄事故照片,然后由具有多年经验的人手动查看。该人亲自审查了数千起事故的经验使他们能够仅通过查看图片来评估损失,从而得出成本估算。

在第一次变革迭代中,流程得到了技术的增强,而不是转变。但将数字图像上传到数据库的第一步为转型奠定了基础。多年来,同一家公司最终收集了大量数字化的汽车事故数据。通过将所有这些信息存储在大数据集群中,他们就有机会构建和训练复杂的人工智能模型。

一起运行这些模型的集合从而使完整的端到端索赔流程自动化,生成并确认您选择的车身修理厂的工作估算,将资金汇入适当的账户,并管理汽车租赁。一个漫长的高接触手动过程转变为具有实时操作和响应的低接触体验。这种便利会赢得客户的信任,并有更好的机会从他们那里获得更多的收入机会。

但这并不是银行业一夜之间的数字化转型。它经过多年酝酿,需要深入反思如何利用各自的技术应用开展业务。它也不是一个一次性的项目。随着越来越多的公司这样做,该流程需要不断优化和完善才能保持相关性。

虽然从客户的角度来看,上面的示例使它看起来很容易,但完全自动化和实时的流程是一项复杂的工作,需要利用许多公司内部领域的主题专业知识。这意味着您有多个团队在开发同一产品,每个团队同时采用多种技术和数据工作负载。一系列流程中任何时候的一个瓶颈都可能同时毁掉成千上万客户的整个体验。衡量高度复杂的相互依赖的工作负载的进展情况,并在持续实时的基础上优化各种环境中的每一次交互,对于数字化转型至关重要。

一切都必须考虑,从机器学习应用程序性能、网络延迟和第三方支付 API,到管理大量非结构化图像数据和大量其他类型的工作负载。它需要一组专门的软件工具来查看这些组件、生成指标并实时推荐最佳优化路径。

虽然客户满意度有很多变数,但性能和响应时间是客户感知用户体验的重要组成部分。使用上述类型的复杂系统来解决和管理客户期望是一场持久战。

作者 east
Spark 8月 24,2023

spark cacheTable的作用

DataFrame.sqlContext.cacheTable 是一种用于缓存 Spark DataFrame 中数据表的方法。它可以用来在内存中缓存数据,以提高查询性能和加速数据分析过程。

作用:

当你使用 DataFrame.sqlContext.cacheTable 方法缓存一个数据表时,Spark 会将该表的数据加载到内存中,以便在后续的查询和操作中快速访问数据,而不必每次都从磁盘读取。这可以带来以下好处:

  1. 加速查询性能: 缓存数据表可以显著提高查询性能,因为查询操作可以在内存中快速执行,而不需要再次访问磁盘。特别是对于复杂的查询或多次重复的查询,性能的提升会非常明显。
  2. 减少磁盘IO: 缓存表可以减少磁盘IO操作,从而减少整体的数据读取时间。这对于大规模数据集或需要频繁访问的数据表特别有用。
  3. 优化迭代计算: 在迭代计算中,如果同一数据表被多次使用,缓存该表可以避免重复加载和计算,加快迭代过程。
  4. 交互式分析: 在交互式分析环境中,缓存数据表可以让你更快地尝试不同的查询和分析操作,提高工作效率。

使用示例:

以下是一个示例,展示了如何使用 DataFrame.sqlContext.cacheTable 方法来缓存一个数据表:

python复制代码from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("CacheTableExample").getOrCreate()

# 读取数据并创建 DataFrame
data_df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 缓存数据表
data_df.createOrReplaceTempView("my_cached_table")
spark.sql("CACHE TABLE my_cached_table")

# 执行查询
result = spark.sql("SELECT * FROM my_cached_table WHERE age > 25")

# 关闭 SparkSession
spark.stop()

在这个示例中,数据表 “my_cached_table” 被缓存到内存中,然后可以执行快速的查询操作。需要注意的是,缓存数据表会占用一定的内存空间,因此在内存有限的情况下,需要根据实际需求进行权衡和管理。同时,缓存的数据在某些情况下可能会被自动清除,例如当内存不足时。

作者 east
Spark 8月 23,2023

spark sql对表某个字段拆分成多条记录

需求:数据库某个表,有个字段area,里面有多个省份,用逗号分隔,例如”广东省,浙江省,北京市“,要求分隔成多条记录。

要使用Spark SQL对表中的某个字段根据逗号拆分成多条记录,你可以使用split()函数来实现。以下是具体的Scala代码示例:

scala复制代码import org.apache.spark.sql.functions._

// 假设你已经创建了一个SparkSession对象,命名为spark

// 加载原始数据作为DataFrame
val df = spark.read.format("csv").option("header", "true").load("path_to_your_file.csv")

// 使用split函数将目标字段拆分成多列,并使用explode函数将拆分后的数组转换成多行
val explodedDF = df.withColumn("split_col", split(col("your_column"), ",")) 
                   .selectExpr(df.columns: _*)
                   .withColumn("exploded_col", explode(col("split_col")))

// 显示处理后的结果
explodedDF.show()

你需要将代码中的以下部分进行替换以适应你的场景:

  • path_to_your_file.csv:指定你的CSV文件的路径。
  • your_column:要拆分的字段的名称。

这段代码会将指定字段按逗号拆分成多个记录,并把拆分后的记录显示在控制台上。你可以根据自己的需求进一步处理和操作拆分后的数据。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 22,2023

Flink SQL 配方:窗口 Top-N 和连续 Top-N

Flink SQL 已成为低代码流分析的标准,并设法统一批处理和流处理,同时保持 SQL 标准。此外,它还提供了一组丰富的实时数据分析高级功能。简而言之,Flink SQL 是两全其美的:它使您能够使用 SQL 处理流数据,但它还支持批处理。

Ververica Platform 使 Flink SQL 更易于跨团队访问和高效扩展。该平台附带了用于开发 SQL 脚本、管理用户定义函数 (UDF)、目录和连接器以及操作生成的长时间运行查询的附加工具。

我们已经看到了 Flink SQL 的许多用例,我们很高兴向您展示你可以用它构建什么。在本系列博文中,我们将探讨如何使用 Flink SQL 以多种方式处理数据。这篇文章将特别关注两个查询:Window Top-N 和 Continuous Top-N。

提示:访问我们的案例研究,探索其他人如何使用 Apache Flink。

什么是 Window Top-N 和 Continuous Top-N 查询?

Window Top-N 和 Continuous Top-N 是两种相似但略有不同的数据处理方式。在这两种情况下,我们都希望找到数据流中的前 N 项,但存在一些关键差异:

在 Window Top-N 中,我们在固定大小的窗口中处理数据。例如,我们可能希望每分钟找到前 10 个项目。

在 Continuous Top-N 中,我们连续处理数据。我们不使用窗口,而是在数据到达时对其进行处理。连续 Top-N 比窗口 Top-N 更难实现,但它有一些优点。例如,它可以更快地为我们提供结果,因为我们不必等待窗口关闭才能看到结果。

窗口 Top-N 和连续 Top-N 查询的常见用例

窗口 Top-N 和连续 Top-N 查询对于各种任务都很有用。例如,它们可以用于:

  • 欺诈检测:在金融交易流中,我们可能希望找到每分钟按金额排名前 10 的交易。它可以帮助我们识别可疑活动。
  • 用户交互流:在用户交互流中,我们可能希望找到正在查看或购买的前 10 件商品。它可以帮助我们向用户提出建议。
  • 异常检测:在传感器读数流中,我们可能希望找到读数最高的前 10 个传感器。它可以帮助我们识别监控中出现故障的传感器。
  • 日志消息流:在日志消息流中,我们可能希望按数量查找前 10 条日志消息。它可以帮助我们识别系统问题。

如何使用 Flink SQL 编写 Window Top-N 查询

首先我们来看看如何使用 Flink SQL 编写 Window Top-N 查询。我们将向您展示如何计算每个翻滚 5 分钟窗口中销售额最高的前 3 名供应商。

sql复制代码
CREATE TABLE orders (
  bidtime TIMESTAMP(3),
  price DOUBLE,
  item STRING,
  supplier STRING,
  WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS
) WITH (
  'connector' = 'faker',
  'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
  'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
  'fields.item.expression' = '#{Commerce.productName}',
  'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
  'rows-per-second' = '100'
);

SELECT *
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
  FROM (
    SELECT window_start, window_end, supplier, SUM(price) as price, COUNT(*) as cnt
    FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(bidtime), INTERVAL '5' MINUTE))
    GROUP BY window_start, window_end, supplier
  )
) WHERE rownum <= 3;

源表(orders)由 thefaker 连接器支持,该连接器根据 Java Faker 表达式在内存中不断生成行。注意:此示例利用 Window Top-N 功能来显示排名前 3 的供应商每 5 分钟最高销售额。

如何使用 Flink SQL 编写连续 Top-N 查询

编写连续 Top-N 查询比编写 Window Top-N 查询更困难。这样做的原因是,在 Continuous Top-N 中,我们在数据到达时对其进行处理,而不是使用窗口。这个示例将带我们进入神奇的领域,因为流处理通常被外行认为是这样。然而,它实际上只是在数据流上执行的一组指令。

我们将展示如何使用 OVER window 和 ROW_NUMBER() 函数根据给定属性连续计算“Top-N”行。源表 (spells_cast) 由 thefaker 连接器支持,该连接器基于 Java Faker 在内存中连续生成行。

sql复制代码
CREATE TABLE spells_cast (
  wizard STRING,
  spell STRING
) WITH (
  'connector' = 'faker',
  'fields.wizard.expression' = '#{harry_potter.characters}',
  'fields.spell.expression' = '#{harry_potter.spells}'
);

SELECT wizard, spell, COUNT(*) AS times_cast
FROM spells_cast
GROUP BY wizard, spell;

此结果可以在 OVER 窗口中用于计算 Top-N。使用 wizard 列对行进行分区,然后根据施法次数(times_cast DESC)进行排序。内置函数 ROW_NUMBER() 根据排序顺序为每个分区的行分配序号。通过筛选序号小于等于 N 的行,我们可以获得每个巫师施法次数前 N 的法术。

以下是一个示例查询:

sql复制代码
SELECT wizard, spell, times_cast
FROM (
  SELECT wizard, spell, times_cast,
         ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num
  FROM (
    SELECT wizard, spell, COUNT(*) AS times_cast
    FROM spells_cast
    GROUP BY wizard, spell
  )
)
WHERE row_num <= 3;

在此查询中,我们首先计算每个巫师施法次数的统计信息。然后,我们在内部查询中使用 ROW_NUMBER() 函数对每个巫师的法术按照施法次数进行降序排列,为每个法术分配行号。最后,在外部查询中,我们筛选出行号小于等于 3 的记录,以获取每个巫师施法次数前 3 的法术。

这就是如何使用 Flink SQL 编写连续 Top-N 查询的方式。通过以上方法,您可以处理实时数据流并获取持续更新的 Top-N 数据。

请注意,上述示例中的 SQL 查询是根据您提供的上下文进行的翻译和还原,可能会因为特定上下文的变化而略有不同。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
python 8月 22,2023

python自动发文章到微信公众号4-群发消息

使用Python发送群发消息。具体的实现步骤如下:

  1. 导入必要的库:
pythonCopy Codeimport requests
  1. 构建请求的URL和参数:
pythonCopy Codeurl = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token=ACCESS_TOKEN"

需要替换ACCESS_TOKEN为有效的访问令牌。

  1. 构建请求的数据,这里以发送图文消息为例:
pythonCopy Codedata = {
   "filter": {
      "is_to_all": False,
      "tag_id": 2
   },
   "mpnews": {
      "media_id": "123dsdajkasd231jhksad"
   },
   "msgtype": "mpnews",
   "send_ignore_reprint": 0
}

其中,filter用于设置接收者,mpnews用于设置即将发送的图文消息,media_id是需要群发的消息的媒体ID。

  1. 发送POST请求:
pythonCopy Coderesponse = requests.post(url, json=data)
  1. 处理响应:
pythonCopy Coderesult = response.json() if result["errcode"] == 0:     print("群发消息发送成功!")     print("消息ID:", result["msg_id"]) else:     print("群发消息发送失败,错误信息:", result["errmsg"])

微信API文档如下:


根据标签进行群发【订阅号与服务号认证后均可用】
接口调用请求说明
http请求方式: POST https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token=ACCESS_TOKEN
POST数据示例如下:
图文消息(注意图文消息的media_id需要通过上述方法,或通过 “草稿箱 / 新建草稿” 接口来得到,海外微信公众号仅支持发送图文(mpnews)消息):
{ "filter":{ "is_to_all":false, "tag_id":2 }, "mpnews":{ "media_id":"123dsdajkasd231jhksad" }, "msgtype":"mpnews", "send_ignore_reprint":0 }


参数说明

参数是否必须说明
filter是用于设定图文消息的接收者
is_to_all否用于设定是否向全部用户发送,值为true或false,选择true该消息群发给所有用户,选择false可根据tag_id发送给指定群组的用户
tag_id否群发到的标签的tag_id,参见用户管理中用户分组接口,若is_to_all值为true,可不填写tag_id
mpnews是用于设定即将发送的图文消息
media_id是用于群发的消息的media_id
recommend否推荐语,不填则默认为“分享图片”
msgtype是群发的消息类型,图文消息为mpnews,文本消息为text,语音为voice,音乐为music,图片为image,视频为video,卡券为wxcard
title否消息的标题
description否消息的描述
thumb_media_id是视频缩略图的媒体ID
send_ignore_reprint是图文消息被判定为转载时,是否继续群发。 1为继续群发(转载),0为停止群发。 该参数默认为0。

返回说明

参数说明
type媒体文件类型,分别有图片(image)、语音(voice)、视频(video)和缩略图(thumb),图文消息为news
errcode错误码
errmsg错误信息
msg_id消息发送任务的ID
msg_data_id消息的数据ID,该字段只有在群发图文消息时,才会出现。可以用于在图文分析数据接口中,获取到对应的图文消息的数据,是图文分析数据接口中的msgid字段中的前半部分,详见图文分析数据接口中的msgid字段的介绍。

请注意:在返回成功时,意味着群发任务提交成功,并不意味着此时群发已经结束,所以,仍有可能在后续的发送过程中出现异常情况导致用户未收到消息,如消息有时会进行审核、服务器不稳定等。此外,群发任务一般需要较长的时间才能全部发送完毕,请耐心等待。

返回数据示例(正确时的JSON返回结果):

{
   "errcode":0,
   "errmsg":"send job submission success",
   "msg_id":34182, 
   "msg_data_id": 206227730
}

错误码

返回码含义
41040字数≥300字,才能声明文字原创

完整流程请看: 自动发文章到微信公众号

相关视频:
如何自动发表文章到微信公众号实现躺赚1-思路流程篇

代码拿走即用!如何自动发表文章到微信公众号实现躺赚实现篇

作者 east
Flink 8月 21,2023

磁盘对 Flink 中 RocksDB 状态后端的影响:案例研究

RocksDB 在 Flink 中的性能问题分析

正如最近的博客文章所述,RocksDB 是 Flink 中的一个状态后端,它允许作业的状态大于可用内存量,因为状态后端可以将状态溢出到本地磁盘。这意味着磁盘性能可能会对使用 RocksDB 的 Flink 作业的性能产生影响。通过一个案例研究,这篇博文说明了使用 RocksDB 的 Flink 作业的吞吐量下降问题,并演示了我们如何将底层磁盘的性能确定为根本原因。

背景和问题描述

我们正在处理一个典型的物联网 (IoT) 作业,该作业处理从数百万台设备发出的事件流。每个事件都包含设备标识符 (ID)、事件类型以及事件生成时的时间戳。该作业根据设备 ID 对流进行分区,并在状态中存储从每个事件类型到接收到该类型事件时的最新时间戳的映射。事件类型可能有数百种。对于每个传入事件,作业需要从接收事件类型的状态读取时间戳,并将其与传入事件进行比较。如果传入的时间戳较新,它会更新状态中存储的时间戳。

该作业在使用官方 AWS 命令​​行工具 eksctl 创建的 Amazon Elastic Kubernetes Service (EKS) 集群上运行,并具有所有默认设置。 Flink TaskManager 分配有 1.5 个 CPU 核和 4 GB 内存。该作业使用 RocksDB 状态后端,该后端配置为使用 Flink 的托管内存。state.backend.rocksdb.localdir 配置选项未显式设置,因此默认情况下底层 EC2 实例根卷上的 /tmp 目录用于 RocksDB 运行状态(即工作状态)。

吞吐量下降问题的观察

这篇博文指出,该作业最初在 EKS 上运行良好。但一段时间后(几小时或几天,具体取决于传入事件)作业吞吐量突然大幅下降。该下降可以很容易地再现。吞吐量指标图表显示,在某一天的 23:50 后不久,从每秒超过 10k 个事件下降到每秒几百个事件。此外,使用保存点停止作业然后从中恢复并没有帮助:重启后作业吞吐量仍然很低。尽管当作业从空状态重新启动时恢复了高吞吐量,但这不是一个选择,因为(1)作业状态会丢失,(2)作业吞吐量会在较短的时间后再次下降。

性能问题的定位

通过检查CPU指标,我们发现当吞吐量下降时,TaskManager 容器的 CPU 利用率也会降低。由于TaskManager容器可能会使用更多的CPU资源,因此CPU使用率的减少在这里只是一个症状。TaskManager容器的内存使用率在吞吐量下降之前很长时间就达到了分配限制,并且在 23:50 左右没有明显变化。

为了进一步调查性能问题,我们启用了 TaskManager 的 JMX 监控,并使用 VisualVM 进行 CPU 采样。结果显示,93% 的 CPU 时间都被 threadUpdateState 消耗了,这是运行 operatorUpdateState 的线程,该线程读取并更新 RocksDB 中的状态。几乎所有的CPU时间都被本机方法 org.rocksdb.RocksDB.get() 占用。这表明作业在从 RocksDB 读取状态时遇到了瓶颈。

磁盘性能分析

为了深入了解 RocksDB 的性能问题,我们启用了 Flink RocksDB 指标。块缓存是在内存中缓存数据以供读取的地方。块缓存在作业启动后的前几分钟内迅速被填满,主要是状态条目。然而,这并不能完全解释在 23:50 左右吞吐量下降的原因。

我们继续检查根卷的磁盘指标。读取吞吐量下降至每秒约 230 次,写入吞吐量也出现类似的下降。检查磁盘每秒输入/输出操作数 (IOPS) 容量,我们发现默认情况下,使用 eksctl 创建的 EKS 集群中的每个 EC2 实例都是 am5.large 实例,并带有一个通用 (gp2) 弹性块存储 (EBS) 根卷。根卷的大小为 80GB,提供 240 IOPS 的基准速率。这表明作业在磁盘 IO 上遇到了瓶颈。一开始能够实现更高 IOPS 的原因是 AWS 为每个 gp2 卷提供了初始 I/O 信用来维持突发 IO 请求。然而,初始 I/O 积分耗尽后,问题就出现了。

解决方案

为了解决性能问题,我们建议附加具有高 IOPS 率的专用卷,如 gp3 或 io1/io2 卷,并将 Flink 配置 state.backend.rocksdb.localdir 设置为该卷上的目录。需要注意的是,RocksDB 本机指标在默认情况下处于禁用状态,因为它们可能会对作业性能产生负面影响。但是在你面临性能问题时,启用这些指标可以帮助你更好地了解 RocksDB 的内部行为,以便更好地诊断和优化问题。

要实施这个解决方案,你可以按照以下步骤进行操作:

  1. 创建高性能的磁盘卷:
    • 使用 AWS 控制台或 AWS 命令行工具创建一个 gp3 或 io1/io2 卷,它提供足够的 IOPS 来支持你的作业需求。你可以根据作业的负载情况来选择适当的磁盘类型和大小。
  2. 将 RocksDB 目录配置到新卷上:
    • 在 Flink 配置中,将 state.backend.rocksdb.localdir 配置选项设置为新创建的高性能卷的挂载路径。这将使 RocksDB 在新卷上运行,并获得更高的磁盘性能。
  3. 启用 RocksDB 指标(可选):
    • 如果你想深入了解 RocksDB 的性能状况,你可以在 Flink 配置中启用 RocksDB 本机指标。这些指标将提供更多关于 RocksDB 内部运行情况的信息,帮助你更好地监视和优化作业。
  4. 重新部署作业:
    • 在进行了上述更改后,重新部署你的 Flink 作业。确保作业配置正确地指向新的 RocksDB 目录,并验证作业在新磁盘上运行。
  5. 监控和调整:
    • 监控你的作业性能,特别是 CPU 利用率、磁盘 IOPS 和延迟等指标。根据观察到的情况,你可能需要调整作业配置、磁盘类型或作业规模来进一步优化性能。

总之,通过将 RocksDB 目录配置到高性能的磁盘卷上,你可以显著改善 Flink 作业的性能,并避免在处理大量数据时出现吞吐量下降的问题。同时,启用 RocksDB 指标可以让你更深入地了解 RocksDB 在作业中的行为,从而更好地优化和监控作业性能。

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east
Flink 8月 21,2023

Flink SQL 连接 – 第 1 部分

Flink SQL 已成为低代码数据分析的事实标准。它成功地统一了批处理和流处理,同时保持了 SQL 标准。此外,它还为实时用例提供了一组丰富的高级功能。简而言之,Flink SQL 提供了两全其美的功能:它使您能够使用 SQL 处理流数据,但它还支持批处理。由于 Flink SQL 始终遵循 ANSI-SQL 2011 标准,因此所有功能都来自兼容的数据库应该管用。这包括内部联接和外部联接,以及 SQL 标准中描述的所有其他联接类型。

常规连接、间隔连接和查找连接

在这个由三部分组成的博客文章系列中,我们将向您展示 Flink SQL 中不同类型的联接以及如何使用它们以各种方式处理数据。这篇文章将重点介绍常规连接、间隔连接和查找连接。

常规连接

常规连接在 SQL 中用于组合两个或多个表中的数据。使用联接时,您可以指定每个表中要用于创建新表的列。您还可以使用联接来创建包含多个表中的数据的单个表。例如,如果您有一个包含客户信息的表和另一个包含订单信息的表,则可以使用联接创建一个同时包含客户信息和订单信息的表。

sql复制代码
CREATE TABLE NOC (
    agent_id STRING,
    codename STRING
);

CREATE TABLE RealNames (
    agent_id STRING,
    name STRING
);

SELECT
    name,
    codename
FROM NOC
INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;

间隔连接

间隔连接用于比较相隔一定时间的两组数据。每组数据被分为多个区间,每个区间由开始时间和结束时间定义。间隔连接在处理具有时间上下文的事件时非常有用。例如,您可以将销售数据按小时间隔与客户数据按天间隔连接起来。

sql复制代码
CREATE TABLE orders (
    id INT,
    order_time TIMESTAMP
);

CREATE TABLE shipments (
    id INT,
    order_id INT,
    shipment_time TIMESTAMP
);

SELECT
    o.id AS order_id,
    o.order_time,
    s.shipment_time,
    TIMESTAMPDIFF(DAY, o.order_time, s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;

查找连接

查找连接用于在公共键上连接两个数据集,其中一个数据集是静态的,不会随时间变化。通过查找连接,您可以在流数据中丰富外部参考数据表中的信息。这对于实时数据分析非常有用。

sql复制代码
CREATE TABLE subscriptions (
    id STRING,
    user_id INT,
    type STRING,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    payment_expiration TIMESTAMP,
    proc_time AS PROCTIME()
);

CREATE TABLE users (
    user_id INT PRIMARY KEY,
    username VARCHAR(255) NOT NULL,
    age INT NOT NULL
);

SELECT
    s.id AS subscription_id,
    s.type AS subscription_type,
    u.age,
    CASE WHEN u.age < 18 THEN 1 ELSE 0 END AS is_minor
FROM subscriptions s
JOIN users FOR SYSTEM_TIME AS OF s.proc_time AS u ON s.user_id = u.user_id;

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

作者 east

上一 1 … 3 4 5 … 19 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.