线事务处理(OLTP)系统中,为了解决单表数据量大的问题,通常采用分库分表的方法对单张大表进行拆分,以提高系统的吞吐量。但为了方便数据分析,在同步到数据仓库或数据湖时,一般需要将分库分表的数据合并成一张大表。本教程将向您展示如何使用 Flink CDC 为上述场景构建实时数据湖。本文中的示例将全部基于 Docker 并使用 Flink SQL。无需一行 Java/Scala 代码或安装 IDE。本指南的全部内容包含 docker-compose 文件。整个过程将通过从 MySQL 同步数据到 Iceberg 来展示,如下图所示。
步骤1:创建 docker-compose.yml 文件 创建一个 Docker Compose 文件(docker-compose.yml),内容如下:
Version: ‘2.1’
Services:
sql-client: user: flink
image: yuxialuo/flink-sql-client:1.13.2.v1
depends_on:
– jobmanager
– mysql
environment: FLINK_JOBMANAGER_HOST: jobmanager MYSQL_HOST: mysql volumes: – shared tmpfs:/tmp/iceberg jobmanager: user: flink image: flink:1.13.2-scala_2.11 ports: – “8081:8081” command: jobmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: – shared tmpfs:/tmp/iceberg taskmanager: user: flink image: flink:1.13.2-scala_2.11 depends_on: – jobmanager command: taskmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 volumes: – shared tmpfs:/tmp/iceberg mysql: image: debezium/example-mysql:1.1 ports: – “3306:3306” environment: – MYSQL_ROOT_PASSWORD=123456 – MYSQL_USER=mysql用户 – MYSQL_PASSWORD=mysqlpw volumes: – shared tmpfs driver-options: type: “tmpfs” device: “tmpfs”
这个 docker-compose 文件中的容器包括:
- SQL-Client:Flink SQL Client,用于提交 SQL 查询和查看 SQL 执行结果
- Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用于执行 Flink SQL
- MySQL:作为数据源分库分表,存储用户表
注意:如果您想在自己的 Flink 环境中运行本指南,您需要下载下面列出的包并将其放在 Flink 目录的 lib 目录中,即 FLINK_HOME/lib/。
- flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar
- flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
- iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar
步骤 2:准备 MySQL 数据库中的数据 进入 MySQL 容器,执行以下命令:
docker-compose exec mysql mysql -uroot -p123456
然后在 MySQL 中创建数据、表,并填充数据:
CREATE DATABASE db_1;
USE db_1;
CREATE TABLE user_1 (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234","user_110@foo.com");
CREATE TABLE user_2 (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_2 VALUES (120,"user_120","上海","123567891234","user_120@foo.com");
CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234", NULL);
CREATE TABLE user_2 (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","上海","123567891234","user_220@foo.com");
步骤3:使用 Flink DDL 和 Flink SQL CLI 创建表 进入 Flink SQL CLI 容器,执行以下命令:
docker-compose exec sql-client ./sql-client
在 Flink SQL CLI 中,执行以下命令:
SET execution.checkpointing.interval = 3s;
CREATE TABLE user_source(
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone STRING,
email STRING,
PRIMARY KEY (`id`)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'db_[0-9]+',
'table-name' = 'user_[0-9]+'
);
关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书