gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆
    • json格式化及任意折叠展开
    • PDF常用工具

Hive 查询介绍——它们是什么以及如何有效地编写它们

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 作者: east
  • ( 页面42 )
Hive 5月 6,2023

Hive 查询介绍——它们是什么以及如何有效地编写它们

在大数据领域,Hive 是一个大问题。精心编写和精心设计的 Hive 查询可加速从数据集中检索数据。 Hive 比 SQL 好得多,因为前者可以更有效地处理复杂数据。此外,Hive 查询有助于降低处理成本。这就是为什么为大数据分析用户和开发人员正确编写和优化 Hive 查询至关重要。

与其他可用的数据处理平台相比,完全优化的数据查询以更快的速度为您提供所需的数据。高效有效的 Hive 查询可以减少 50% 的执行时间。当您的数据处理框架运行得更快时,好处就会增加。

回答这个问题首先要准确理解 Hive 到底是什么。 Apache Hive 是一个在 Hadoop 之上开发的开源数据仓库平台,用于执行数据分析和分布式处理。 Facebook 创建了 Apache Hive 以减少编写 Java MapReduce 平台所需的工作。

大数据流程需要快速准确地处理大量不同的数据,以提供高度可行的见解。如果手动完成,这是一项不可能完成的任务。 Hive 的存在是为了简化大数据处理,并通过快速 Hive 查询将原始数据转化为可操作的内容。

使用 Hive 进行查询和数据分析比使用 MapReduce 框架更容易、更快,即使在处理大型数据集时也是如此。为简单起见,我们将重点关注 MapReduce 作为主要执行引擎,了解 Hive 还可以利用 Tez、Tez LLAP 和 Spark。 MapReduce 是一个低级平台,需要多个自定义程序才能运行。开发人员必须熟悉 Java,它已经是一个复杂的平台,才能充分利用 MapReduce。相比之下,您无需成为 Java 专家即可使用 Hive。

通常,Hive 查询只是对信息的请求。当在数据科学和计算机编程的上下文中使用时,Hive 查询是同一回事。不同之处在于信息直接来自数据库。

Hive 查询不仅仅是随机信息请求。您要检索的信息必须具体。因此,您可以使用一组预定义代码和数据库原生的编程语言来编写和优化 Hive 查询。一旦数据库收到并理解该指令,它就会收集查询中指定的所有信息并发布您请求的数据。

要真正从您的查询中获得最大价值,它们必须写得很好并且经过专业调整。但在此之前,让我们深入了解您需要了解的关于它们的其他信息。

用于创建数据库管理任务和流程的标准编程语言称为结构化查询语言 (SQL)。但是,SQL 并不是使用 Hive 执行查询和数据分析的唯一编程语言。 AQL、Datalog 和 DMX 也是流行的选择。

Hive 查询语言或 HiveQL 是一种类似于 SQL 的声明性语言。 HiveQL 所做的是将这些查询转换为 MapReduce 程序。它还使开发人员能够通过将复杂的 MapReduce 程序替换为 Hive 查询来处理和分析结构化和半结构化数据。

任何熟悉 SQL 命令的开发人员都会发现使用 Hive 查询语言创建请求很容易。

分区、表和桶的创建

您可以在 Hive 中创建查询,以将存储在 Hadoop 文件中的大型数据集分类到表、分区和存储桶中。在每个模型中,您根据分区或列键对相同类型的数据进行分组。可以有一个或多个分区键来帮助查明特定分区。分区数据集加速了对数据切片的查询。

ETL 功能

在将数据加载到其目标数据存储之前,您需要使用 ETL(提取、转换和加载)功能清理、准备和转换该数据。 Hive 查询可以做到这一点。数据通常从源中提取,然后存储在通用或兼容的存储中,例如 Azure Data Lake Storage 或 Azure Storage blob。然后一系列查询转换数据。在此之后,数据在 Apache Hive 中进行组织,然后再批量加载到其目标数据仓库中。

创建用于合并不同数据表的连接

Hive 查询可以包括连接,这是一种用于通过使用每个表共享的值来组合来自两个或多个表的特定字段或记录的功能。联接在速度方面以指数方式提高 Hive 查询的效率,具体取决于查询的编写方式。例如,当它们首先对最小表进行流式处理,最后对最大表进行流式传输时,带有连接子句的查询执行得更快,而不是相反。

有四种类型的连接,对每一种类型的深入了解将帮助用户选择正确的连接来使用——并编写正确的查询。这四种类型的连接是:

按查询排序

HiveQL 中的 ORDER BY 语法使用“SELECT”语句来帮助对数据进行排序。此语法遍历 Hive 表上的列,以按照“Order by”子句中的说明查找和筛选特定列值。查询只会选取 Order by 子句中提到的列名,并以升序或降序显示匹配的列值。

按查询分组

当 Hive 查询带有“GROUP BY”时,它会探索 Hive 表上的列并收集 group by 子句中提到的所有列值。查询将仅查看名称定义为“group by”子句的列,并将通过对特定和匹配的列值进行分组来显示结果。

按查询排序

当 Hive 查询带有“排序依据”子句时,它会遍历查询定义的名称下的列。执行后,查询会探索 Hive 表的列以对输出进行排序。如果您使用“DESC”指令按查询排序,您将按降序排序和显示结果。带有“ASC”的查询将执行升序排序并以类似方式显示结果。

按查询聚类

带有 CLUSTER BY 子句或命令的 Hive 查询通常部署在查询中以同时执行 DISTRIBUTE BY 和 SORT BY 的功能。此特定查询可确保对所有输出数据文件进行绝对排序或排序。

分配方式

DISTRIBUTE BY 指令确定输出如何在 MapReduce 作业中的缩减程序之间分配。 DISTRIBUTE BY 的功能类似于 GROUP BY 子句,因为它管理如何将数据行加载到 reducer 中进行处理。

查看另一篇博文,您将在其中了解如何通过 Hive 查询优化来充分利用 Hive 查询。

以下是您在管理 Hive 查询时会遇到的常见命令的简要介绍。

Hive 数据定义语言 (DDL) 命令

Apache Hive 备忘单是一个包含所有 Apache Hive 语法和函数的五部分摘要。为开发人员和大数据工程师提供参考或指导。

Apache Hive 备忘单分为以下部分:

Hive 查询会消耗资源。在大数据的世界里,资源就是金钱,资源的低效利用可能会阻碍任何组织。因此,Hive 查询优化对于将资源消耗保持在最低水平、运营成本可控以及整体性能处于最佳水平至关重要。

以下是优化 Hive 查询的一些有效方法:

针对我们所有的查询优化您的方法至关重要。 Pepperdata 为企业提供大数据优化解决方案,旨在为他们的查询、Query Spotlight 以及他们的所有堆栈、应用程序、流程等提供全面的可见性和可观察性。用户享受实时性能报告和数据驱动的建议,以优化其大数据基础架构及其所有组件的性能,同时显着降低运营成本。

作者 east
chatgpt 5月 6,2023

当机器人来的时候

像每个人和他们的兄弟一样,我一直在玩 ChatGPT 并向它提出生活中必不可少的问题。它自信地告诉我一些事情,其中​​一些事情非常有见地。然而,错误的人和聊天机器人经常自信地陈述事情。
我看到最近在 Mastodon 上发布的 Google Answer。在这个“答案”中,谷歌建议在海洋中处理用过的汽车电池有好处,包括为电鳗充电和为墨西哥湾流提供动力。 Bing 然后选择了这个答案,还提出了非法处理电池对环境的好处。我想知道 Google 是否意识到您可以使用机油为草坪施肥。 (作者注:请不要将电池扔进海里或用机油给草坪施肥。)
据推测,搜索此内容的人正在寻找这个笑话,但它提出了一个有趣的问题。随着越来越多的人使用 ChatGPT 并在网络上发布他们的结果,越来越多的 AI 问答系统选择他们自己和其他人生成的响应并对其进行训练——会发生什么?
对于一个他们不知道答案的问题,我做了任何人都会做的事情。我问 ChatGPT,“当人们发布 ChatGPT 的输出并且它根据自己的响应进行训练时会发生什么?”
机器人回复:
如果人们发布 ChatGPT 的输出并使用它根据自己的响应进一步训练模型,这被称为自我训练或自我监督学习。在这种情况下,模型将继续从自己的输出中学习,并可能随着时间的推移提高其性能。
但是,在使用自我训练时需要考虑一些潜在的风险。一个风险是模型可能会过度拟合其自身的输出,这意味着它变得过于专业化地回应它之前遇到的问题和陈述的类型。这可能会导致模型在新数据或未见过的数据上表现不佳。
另一个风险是,如果在自我训练期间没有得到适当的监控和监督,模型可能会开始产生无意义或不适当的反应。这可能是由于模型在其自身输出中发现了模式或偏差,并随着时间的推移而放大了它们。
因此,在自我训练像 ChatGPT 这样的语言模型时要谨慎,并仔细监控其性能和输出以确保它继续生成高质量的响应,这一点很重要。
这指出了一些非常可怕的社会后果,因为人们似乎相信 ChatGPT 并认为它神奇地正确(以及情感)。我想知道会有什么社会后果?我问,“人们依赖 ChatGPT 会产生什么社会影响,它开始产生无意义的结果,因为它是根据自己的信息进行训练的?”
机器人说:
如果人们过度依赖像 ChatGPT 这样的语言模型,并且它开始产生无意义的结果,因为它是根据自己的信息进行训练的,这可能会产生一些负面的社会影响。
首先,当 ChatGPT 对他们的问题或陈述提供荒谬或不恰当的回答时,人们可能会感到沮丧或困惑。这可能会导致沟通中断和对技术失去信任。
其次,如果人们依赖 ChatGPT 来提供重要信息或做出重要决策,那么模型产生无意义结果的后果可能会很严重。例如,如果基于 ChatGPT 的聊天机器人用于提供医疗建议或诊断健康状况,依赖无意义的响应可能会导致严重的健康风险。
第三,如果使用 ChatGPT 为社交媒体或其他在线平台生成内容,可能会导致错误信息或假新闻的传播。这可能会产生严重的社会和政治后果,因为虚假信息可能被用来操纵公众舆论或破坏民主进程。
一般来说,谨慎使用 ChatGPT 等语言模型并仔细监控其性能以确保它们继续产生准确和适当的响应非常重要。同样重要的是要认识到这些模型的局限性,而不是依赖它们来替代人类的智慧和专业知识。
哦,所以我们应该负责任地使用技术。我想知道 ChatGPT 认为我们应该怎么做?我问道,“鉴于人类几乎从未负责任地使用过新技术,这种情况下可能出现的结果是什么?”
它回答说:
虽然过去人类确实经常难以负责任地使用新技术,但重要的是要认识到每项新技术都是独一无二的,并且可能会带来一系列风险和挑战。因此,很难确定地预测涉及像 ChatGPT 这样的语言模型的场景的结果。
也就是说,可以采取一些措施来尽量减少这项技术的潜在负面影响。例如,ChatGPT 等语言模型的开发人员和用户可以努力识别和解决在技术开发和部署过程中可能出现的任何偏见或道德问题。这可能包括实施保护措施以防止错误信息或不当内容的传播,并确保模型是根据多样化和有代表性的数据进行训练的。
此外,政府和监管机构可以发挥作用,确保以负责任的方式使用 ChatGPT 等语言模型,并采取适当的保护措施来保护公众。这可能包括有关在特定行业或应用程序中使用语言模型的法规,以及对这些技术的开发和部署的透明度和问责制的要求。
总之,虽然使用 ChatGPT 等语言模型肯定存在风险,但可以通过负责任的开发和部署以及监管监督和公众意识来减轻这些风险。
哇哦,ChatGPT 认为它应该受到监管?我给它的创建者 OpenAI 发了邮件,问他们是否同意,如果不同意,那么他们知道什么是他们的语言模型不知道的?截至发稿时,他们尚未回复。
长期以来,我们一直在自我训练以“用谷歌搜索答案”,现在我们可能开始训练自己“只问 ChatGPT”。然而,重要的是要记住,这个工具只是吸取互联网的一部分并将其反哺给你。 ChatGPT 可以是一个不错的研究工具,但它是垃圾输入和垃圾输出。
像所有技术一样,这是 BYOB。带上你自己的大脑,不要给鳗鱼喂汽车电池。

作者 east
大数据开发 5月 4,2023

dolphinscheduler如何安装(分布式)

DolphinScheduler 是一个分布式任务调度系统,可以对任务或者工作流进行统一管理和调度。它支持丰富的任务调度方式,包括定时执行、依赖关系、补偿机制等。以下是 Dolphinscheduler 的安装教程:

  • 确认服务器配置:首先需要确认服务器的配置,包括 CPU 核心数、内存大小、磁盘空间等。Dolphinscheduler 需要运行在至少 2 台服务器上,其中一个需要作为 Master 节点,其他为 Worker 节点。
  • 下载安装包:从 Dolphinscheduler 的官网下载最新的稳定版本。通常情况下,推荐下载二进制包进行安装。下载完成后,将压缩包上传到服务器上,并解压到指定的目录。
  • 配置环境变量:需要配置环境变量,以便在任何位置启动或停止 DolphinScheduler。假设你解压到 /opt 目录下,可以修改 /etc/profile 文件,并加入以下内容:
export DOLPHINSCHEDULER_HOME=/opt/dolphinscheduler
export PATH=$DOLPHINSCHEDULER_HOME/bin:$PATH
  • 初始化数据库:在安装 DolphinScheduler 之前,需要初始化数据库并创建相应的用户和权限。你可以使用如下的命令进行初始化:
cd /opt/dolphinscheduler/bin
sh create-dolphin-db.sh
  • 修改配置文件:在 conf 目录下,有两个核心的配置文件:dolphinscheduler.properties 和 worker.properties。其中,dolphinscheduler.properties 主要配置 Master 节点相关的参数,而 worker.properties 则配置 Worker 节点相关参数。需要根据实际情况进行修改。
  • 启动 DolphinScheduler:在 Master 节点上,使用以下命令启动 DolphinScheduler:
cd /opt/dolphinscheduler
./bin/dolphinscheduler-daemon.sh start master

如果需要启动 Worker 节点,可以使用以下命令:

cd /opt/dolphinscheduler
./bin/dolphinscheduler-daemon.sh start worker
  • 访问 DolphinScheduler:启动完成后,在浏览器中输入 Master 节点的 IP 地址和端口号(默认为 12345),即可访问 DolphinScheduler 的 Web 界面进行任务管理和调度。

总体来说,DolphinScheduler 的安装比较简单,但需要注意配置文件和数据库的初始化,以便保证系统的正常运行。

作者 east
大数据开发 5月 4,2023

在CDH6.3.2的大数据集群中,怎样安装dolphinscheduler调度器

  • 安装MySQL (5.6.24+),JDK (1.8.0_292+),ZooKeeper (3.4.5+),Hadoop (3.0.0+)等基础组件。
  • 下载dolphinscheduler的二进制包,并解压到部署目录,如 /opt/module/dolphinscheduler。
  • 在每台部署机器上创建部署用户 dolphinscheduler,并配置sudo免密和ssh免密登录。
  • 在MySQL中创建数据库 dolphinscheduler,并授权给用户 dolphinscheduler。
  • 修改 conf 目录下的 datasource.properties 和 dolphinscheduler_env.sh 文件,配置数据库连接信息和环境变量。
  • 将 mysql-connector-java 驱动 jar 包复制到 lib 目录下。
  • 执行 script/create-dolphinscheduler.sh 脚本,创建表和导入基础数据。
  • 执行 bin/install.sh 脚本,一键部署。
  • 访问 http://localhost:12345/dolphinscheduler ,使用默认账号 admin 和密码 dolphinscheduler123 登录。


dolphinscheduler 下载地址

https://www.apache.org/dyn/closer.lua/dolphinscheduler/3.1.5/apache-dolphinscheduler-3.1.5-bin.tar.gz


dolphinscheduler 部署指南

https://dolphinscheduler.apache.org/zh-cn/docs/3.1.5/%E9%83%A8%E7%BD%B2%E6%8C%87%E5%8D%97_menu

作者 east
Java 4月 28,2023

用java写一个统计这个文件夹中所有csv总共有多少行

  • 创建一个BufferedReader对象,用来读取文件夹中的csv文件。
  • 使用一个循环,每次读取一行数据,直到文件结束。
  • 使用一个计数器变量,每次读取一行数据就加一。
  • 返回计数器变量的值,就是csv文件的行数。

以下是一个可能的代码示例:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

public class CSVLineCounter {

    // 定义一个方法,接受一个文件夹路径作为参数,返回该文件夹中所有csv文件的总行数
    public static int countLinesInFolder(String folderPath) throws IOException {
        // 创建一个File对象,表示文件夹
        File folder = new File(folderPath);
        // 创建一个数组,存储文件夹中的所有文件
        File[] files = folder.listFiles();
        // 创建一个计数器变量,初始化为0
        int count = 0;
        // 遍历文件数组
        for (File file : files) {
            // 判断文件是否是csv格式
            if (file.getName().endsWith(".csv")) {
                // 调用countLinesInFile方法,统计该文件的行数,并累加到计数器变量上
                count += countLinesInFile(file.getPath());
            }
        }
        // 返回计数器变量的值
        return count;
    }

    // 定义一个方法,接受一个文件路径作为参数,返回该文件的行数
    public static int countLinesInFile(String filePath) throws IOException {
        // 创建一个BufferedReader对象,用来读取文件
        BufferedReader br = new BufferedReader(new FileReader(filePath));
        // 创建一个计数器变量,初始化为0
        int count = 0;
        // 使用一个循环,每次读取一行数据,直到文件结束
        while (br.readLine() != null) {
            // 每次读取一行数据就加一
            count++;
        }
        // 关闭BufferedReader对象
        br.close();
        // 返回计数器变量的值
        return count;
    }

    public static void main(String[] args) throws IOException {
        // 测试代码,假设有一个名为test的文件夹,里面有若干个csv文件
        String folderPath = "test";
        // 调用countLinesInFolder方法,打印出总行数
        System.out.println("The total number of lines in the folder is: " + countLinesInFolder(folderPath));
    }
}
作者 east
Flink 4月 28,2023

CDH上Flink执行sql-client命令提示[ERROR] Could not execute SQL statement. Reason: java.net.ConnectException: 拒绝连接

在cdh6.3.2上执行sql-client的命令,提示:

[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: 拒绝连接

这可能是由于以下原因:

  • 您没有启动flink集群,或者flink集群不可用。您可以检查flink集群的状态,或者尝试重新启动flink集群。
  • 您没有正确地配置flink sql-client的环境,或者您的配置文件有误。您可以检查您的sql-client-defaults.yaml文件,或者参考官方文档来设置flink sql-client的环境。
  • 您的网络有问题,或者您的防火墙阻止了flink sql-client的连接。您可以检查您的网络连接,或者允许flink sql-client通过防火墙。

原来是需要启动本地集群:

./start-cluster.sh
cd /opt/cloudera/parcels/FLINK-1.14.4-BIN-SCALA_2.12/lib/flink/bin

然后再次启动sql客户端即可执行成功。

作者 east
doris 4月 28,2023

flink cdc写入到doris报错:[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Unsupported options found for ‘doris’.

在用flink的sql-client.sh或用Dinky执行flink cdc写入到doris时,报下面的错误:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Unsupported options found for 'doris'.

Unsupported options:

sink.batch.size

Supported options:

connector
doris.batch.size
doris.deserialize.arrow.async
doris.exec.mem.limit
doris.filter.query
doris.read.field
doris.request.connect.timeout.ms
doris.request.query.timeout.s
doris.request.read.timeout.ms
doris.request.retries
doris.request.retriesdoris.deserialize.queue.size
doris.request.tablet.size
fenodes
password
property-version
sink.batch.interval
sink.buffer-count
sink.buffer-size
sink.check-interval
sink.enable-2pc
sink.enable-delete
sink.label-prefix
sink.max-retries
source.use-old-api
table.identifier
username

刚开始以为是jar包问题,先后找了flink-doris-connector-1.14_2.12-1.1.1.jar和flink-doris-connector-1.14_2.12-1.1.0.jar放在flink的jar包里面,还是同样出现上面的。

让我不解的是,用代码来操作时,导入是同样的安装包,但没上面的问题。

后来问了New Bing,New Bing是这样回答的:

您好,根据您的错误信息,您可能需要修改您的Doris表的配置选项,以符合Flink Doris Connector的要求。您可以参考以下的建议:

  • 您可能需要将sink.batch.size改为doris.batch.size,这是一个控制每次写入Doris的数据量的选项。
  • 您可能需要检查您是否正确地设置了connector选项,这是一个指定连接器类型的选项,对于Doris表,应该设置为connector = ‘doris’。
  • 您可能需要检查您是否正确地设置了table.identifier选项,这是一个指定Doris表名的选项,应该设置为table.identifier = ‘db_name.table_name’。

这时注意到问题在“sink.batch.size”,而不是之前以为“Unsupported options found for ‘doris’”是相关的jar包没找到。原来自己用的例子是旧版本例子:

DROP TABLE IF EXISTS doris_sink;
-- 创建Doris表
CREATE TABLE doris_sink (
  id INT PRIMARY KEY,
  name STRING,
  age INT,
  modified_time STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = '10.0.80.54:8030',
  'table.identifier' = 'db_test.test_cdc',
  'username' = 'root',
  'password' = '',
  'sink.batch.size' = '1000'
);

需要修改为

DROP TABLE IF EXISTS doris_sink;
-- 创建Doris表
CREATE TABLE doris_sink (
  id INT PRIMARY KEY,
  name STRING,
  age INT,
  modified_time STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = '10.0.80.54:8030',
  'table.identifier' = 'db_test.test_cdc',
  'username' = 'root',
  'password' = ''
);
作者 east
doris 4月 26,2023

配置dinky的集群配置管理时,提示下面的错误:Handler dispatch failed; nested exception is java.lang.NoClassDefFoundError: org/apache/flink/configuration/GlobalConfiguration

Dinky是一个一站式的实时计算平台,基于Apache Flink构建,支持流批一体和数据湖数据仓库统一1。你在配置Dinky的集群配置管理时,遇到了java.lang.NoClassDefFoundError: org/apache/flink/configuration/GlobalConfiguration的错误,

这是因为Dinky 需要具备自身的 Flink 环境,该 Flink 环境的实现需要用户自己在 Dinky 根目录下 plugins/flink${FLINK_VERSION} 文件夹并上传相关的 Flink 依赖,例如在/data/dinky/plugins/flink1.14/dinky/

上传flink安装包lib的依赖,例如flink-dist_2.12-1.14.4.jar、flink-table_2.12-1.14.4.jar等安装包。

如果又出现
ERROR: org.springframework.web.util.NestedServletException: Handler dispatch failed; nested exception is java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/YarnConfiguration 的错误。

需要将 flink-shade-hadoop 上传到到 plugins 文件目录下,使用 flink-shade-hadoop-3 地址如下:

https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber?repo=cloudera-repos

作者 east
Flink 4月 25,2023

运行flink的word count提示: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation

写了一个flink简单word count的例子:


import org.apache.flink.api.scala._

object FlinkWordCount {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val words = List("hello", "world", "flink", "scala", "hello", "flink")

    // 为DataStream和DataSet的上下文提供隐式的类型信息
    implicit val typeInfo = createTypeInformation[WordCount]

    val wordDataSet = env.fromCollection(words)

    val wordCountDataSet = wordDataSet
      .map(word => WordCount(word, 1))
      .groupBy(0)
      .sum(1)

   
  }

  case class WordCount(word: String, count: Int)
}

提示could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation

各种修改后还是报错,后来想到scala版本问题。
Flink 1.10.0以上版本需要Scala 2.12.x,如果使用Scala 2.11.x版本,就会报错。请确认一下你的Scala版本是否为2.12.x,如果不是请升级Scala版本到2.12.x后再运行代码。

作者 east
Flink 4月 24,2023

Flink CDC获取mysql 主从分库,分库分表的binlog

Flink CDC可以获取MySQL主从分库,分库分表的binlog,但是需要注意以下几点:

  • Flink CDC需要配置MySQL的binlog模式为row,以及开启GTID(全局事务标识符),以便正确地识别和处理binlog事件
  • Flink CDC需要配置MySQL的主从复制关系,以及指定主库或从库的地址,以便正确地连接和读取binlog数据
  • Flink CDC需要配置MySQL的分库分表规则,以及指定分库分表的键,以便正确地合并和转换binlog数据

一个可能的Flink CDC获取MySQL主从分库,分库分表的binlog的示例如下:

-- 创建MySQL主从分库,分库分表的表对象
CREATE TABLE mysql_user (
  id INT PRIMARY KEY NOT ENFORCED,
  name STRING,
  age INT
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'master-host', -- 主库地址
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'test', -- 分库名
  'table-name' = 'user_${0-9}', -- 分表名
  'server-id' = '5400', -- 从库ID
  'scan.startup.mode' = 'initial', -- 初始扫描模式
  'scan.split.size' = '8096', -- 扫描块大小
  'debezium.snapshot.locking.mode' = 'none', -- 快照锁定模式
  'debezium.gtid.source.includes' = '', -- GTID包含范围
  'debezium.gtid.source.excludes' = '' -- GTID排除范围
);

-- 使用Flink SQL或DataStream API将MySQL的增量数据写入到其他目标,如Hive、Doris等
INSERT INTO hive_user SELECT * FROM mysql_user;

-- DataStream API
DataStream<Row> stream = tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM mysql_user"), Row.class);
tEnv.createTemporaryView("stream", stream);
tEnv.executeSql("INSERT INTO doris_user SELECT * FROM stream");
作者 east
Flink 4月 24,2023

对比flink cdc和canal获取mysql binlog优缺点

Flink CDC和Canal都是用于获取MySQL binlog的工具,但是有以下几点优缺点对比:

  • Flink CDC是一个基于Flink的库,可以直接在Flink中使用,无需额外的组件或服务,而Canal是一个独立的服务,需要单独部署和运行,增加了系统的复杂度和成本
  • Flink CDC支持多种数据库的数据变化捕获,如MySQL、PostgreSQL、MongoDB等,而Canal只支持MySQL和MariaDB的数据变化捕获
  • Flink CDC支持Exactly-Once语义,保证数据的一致性和准确性,而Canal只支持At-Least-Once语义,可能会出现数据的重复或丢失
  • Flink CDC支持自动元数据更新,无需手动创建或维护表结构,而Canal需要手动创建或维护表结构,增加了开发和维护的工作量
  • Flink CDC支持Flink SQL和DataStream API的使用,方便进行数据处理和转换,而Canal只提供了Kafka、RocketMQ等消息队列的接口,需要额外的消费者进行数据处理和转换
作者 east
运维 4月 24,2023

能使用socket5协议,ssh海外vps服务器的软件

由于xshell有漏洞,公司不给使用,所以寻找一款替代软件。

除了 Xshell 之外,还有一些其他软件可以连接到国外 VPS 并设置 Socks5 代理。以下是几个比较常用的软件:

  1. PuTTY:PuTTY 是一款常用的 SSH 和 Telnet 客户端软件,可以在 Windows 平台上运行,并且可以连接到远程 SSH 服务器。PuTTY 同时也支持设置 Socks5 代理。
  2. Bitvise SSH Client:Bitvise SSH Client 也是一种常用的 SSH 客户端,可以用于在 Windows 上连接到远程 SSH 服务器。Bitvise SSH Client 提供了 Socks5 和 HTTP 代理配置选项,并且支持在 Windows 上设置全局 Socks5 代理。
  3. WinSCP:WinSCP 是用于在 Windows 平台上使用 SSH 的免费开源工具,它还具有文件传输和文件管理的功能,支持 SCP, SFTP, FTPS, FTP 等协议。WinSCP 支持使用 Socks5 代理连接到 SFTP 服务器。

经过自己试验,觉得 Bitvise SSH Client 使用方便,用代理速度很快。之前在centos装ss5软件,访问起来龟速,用 Bitvise SSH Client 代理访问很快。

作者 east

上一 1 … 41 42 43 … 93 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年12月
  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (497)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (39)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.