解决Flink SQL报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”
在开发Flink SQL时报错”Exception in thread “main” java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record”
这个错误表明您的Flink作业涉及更新或删除操作(如CDC场景),而JDBC Sink表没有定义主键。以下是详细分析和解决方案:
错误原因分析
当Flink作业涉及UPDATE/DELETE操作时(常见于CDC场景或使用Retract流),JDBC Sink表必须显式声明主键,因为:
- 主键用于定位目标数据库中需要更新的记录
- 没有主键时,Flink无法确定如何执行更新/删除操作
解决方案
在表定义中添加 PRIMARY KEY
声明(需与目标数据库表主键一致),例如:
val sinkDDL =
"""
|CREATE TABLE dws_iot_extreme_info_min (
| device_code STRING,
| `day` DATE,
| ptime BIGINT,
| station_sn STRING,
| unit STRING,
| cu_device_code STRING,
| max_u_value DECIMAL(5,3),
| min_u_value DECIMAL(5,3),
| max_t_value DECIMAL(5,2),
| min_t_value DECIMAL(5,2),
| max_soc_value DECIMAL(5,2),
| min_soc_value DECIMAL(5,2),
| max_u_time BIGINT,
| min_u_time BIGINT,
| max_t_time BIGINT,
| min_t_time BIGINT,
| max_soc_time BIGINT,
| min_soc_time BIGINT,
| dt DATE,
| PRIMARY KEY (device_code, `day`) NOT ENFORCED -- 添加主键声明
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://10.0.2.2:3306/cnsaas',
| 'table-name' = 'dws_bigdata_device_extreme_info_min',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'username' = 'root',
| 'password' = '',
| 'sink.buffer-flush.max-rows' = '1000',
| 'sink.buffer-flush.interval' = '1s',
| 'sink.max-retries' = '3'
|)
|""".stripMargin
关键修改点说明
- 主键声明:PRIMARY KEY (device_code, `day`) NOT ENFORCED
- 主键字段需与目标数据库表的主键一致
NOT ENFORCED
表示Flink不会校验数据主键约束,由数据库负责
- 目标表要求:
- MySQL数据库中
dws_iot_extreme_info_min
表必须有相同的主键定义 - 可通过以下SQL确保主键存在:
- ALTER TABLE dws_iot_extreme_info_min ADD PRIMARY KEY (device_code, day);
- MySQL数据库中