操作指南:使用 Flink CDC 同步 MySQL 分库分表

线事务处理(OLTP)系统中,为了解决单表数据量大的问题,通常采用分库分表的方法对单张大表进行拆分,以提高系统的吞吐量。但为了方便数据分析,在同步到数据仓库或数据湖时,一般需要将分库分表的数据合并成一张大表。本教程将向您展示如何使用 Flink CDC 为上述场景构建实时数据湖。本文中的示例将全部基于 Docker 并使用 Flink SQL。无需一行 Java/Scala 代码或安装 IDE。本指南的全部内容包含 docker-compose 文件。整个过程将通过从 MySQL 同步数据到 Iceberg 来展示,如下图所示。

步骤1:创建 docker-compose.yml 文件 创建一个 Docker Compose 文件(docker-compose.yml),内容如下:

Version: ‘2.1’

Services:

sql-client: user: flink

image: yuxialuo/flink-sql-client:1.13.2.v1

depends_on:

– jobmanager

– mysql

environment: FLINK_JOBMANAGER_HOST: jobmanager MYSQL_HOST: mysql volumes: – shared tmpfs:/tmp/iceberg jobmanager: user: flink image: flink:1.13.2-scala_2.11 ports: – “8081:8081” command: jobmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: – shared tmpfs:/tmp/iceberg taskmanager: user: flink image: flink:1.13.2-scala_2.11 depends_on: – jobmanager command: taskmanager environment: – | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 volumes: – shared tmpfs:/tmp/iceberg mysql: image: debezium/example-mysql:1.1 ports: – “3306:3306” environment: – MYSQL_ROOT_PASSWORD=123456 – MYSQL_USER=mysql用户 – MYSQL_PASSWORD=mysqlpw volumes: – shared tmpfs driver-options: type: “tmpfs” device: “tmpfs”

这个 docker-compose 文件中的容器包括:

  • SQL-Client:Flink SQL Client,用于提交 SQL 查询和查看 SQL 执行结果
  • Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用于执行 Flink SQL
  • MySQL:作为数据源分库分表,存储用户表

注意:如果您想在自己的 Flink 环境中运行本指南,您需要下载下面列出的包并将其放在 Flink 目录的 lib 目录中,即 FLINK_HOME/lib/。

  • flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar
  • flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
  • iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

步骤 2:准备 MySQL 数据库中的数据 进入 MySQL 容器,执行以下命令:

shell复制代码
docker-compose exec mysql mysql -uroot -p123456

然后在 MySQL 中创建数据、表,并填充数据:

sql复制代码
CREATE DATABASE db_1;
USE db_1;
CREATE TABLE user_1 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234","user_110@foo.com");

CREATE TABLE user_2 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (120,"user_120","上海","123567891234","user_120@foo.com");

CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","上海","123567891234", NULL);

CREATE TABLE user_2 (
  id INT NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone VARCHAR(512),
  email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","上海","123567891234","user_220@foo.com");

步骤3:使用 Flink DDL 和 Flink SQL CLI 创建表 进入 Flink SQL CLI 容器,执行以下命令:

shell复制代码
docker-compose exec sql-client ./sql-client

在 Flink SQL CLI 中,执行以下命令:

sql复制代码
-- Flink SQL
SET execution.checkpointing.interval = 3s;

-- 创建源表 user_source 来捕获 MySQL 中所有数据库和表的数据并使用正则表达式来匹配这些数据库和表的配置项中使用的表。
-- 而且表还定义了一个元数据列来区分数据来自哪个数据库和表。
CREATE TABLE user_source(
  database_name STRING METADATA VIRTUAL,
  table_name STRING METADATA VIRTUAL,
  `id` DECIMAL(20, 0) NOT NULL,
  name STRING,
  address STRING,
  phone STRING,
  email STRING,
  PRIMARY KEY (`id`)
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'db_[0-9]+',
  'table-name' = 'user_[0-9]+'
);

关注公众号“大模型全栈程序员”回复“大数据面试”获取800页左右大数据面试宝典 ,回复“大数据”获取多本大数据电子书

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。更多免费资源在http://www.gitweixin.com/?p=2627