在用flink的sql-client.sh或用Dinky执行flink cdc写入到doris时,报下面的错误:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Unsupported options found for 'doris'.
Unsupported options:
sink.batch.size
Supported options:
connector
doris.batch.size
doris.deserialize.arrow.async
doris.exec.mem.limit
doris.filter.query
doris.read.field
doris.request.connect.timeout.ms
doris.request.query.timeout.s
doris.request.read.timeout.ms
doris.request.retries
doris.request.retriesdoris.deserialize.queue.size
doris.request.tablet.size
fenodes
password
property-version
sink.batch.interval
sink.buffer-count
sink.buffer-size
sink.check-interval
sink.enable-2pc
sink.enable-delete
sink.label-prefix
sink.max-retries
source.use-old-api
table.identifier
username
刚开始以为是jar包问题,先后找了flink-doris-connector-1.14_2.12-1.1.1.jar和flink-doris-connector-1.14_2.12-1.1.0.jar放在flink的jar包里面,还是同样出现上面的。
让我不解的是,用代码来操作时,导入是同样的安装包,但没上面的问题。
后来问了New Bing,New Bing是这样回答的:
您好,根据您的错误信息,您可能需要修改您的Doris表的配置选项,以符合Flink Doris Connector的要求。您可以参考以下的建议:
- 您可能需要将sink.batch.size改为doris.batch.size,这是一个控制每次写入Doris的数据量的选项。
- 您可能需要检查您是否正确地设置了connector选项,这是一个指定连接器类型的选项,对于Doris表,应该设置为connector = ‘doris’。
- 您可能需要检查您是否正确地设置了table.identifier选项,这是一个指定Doris表名的选项,应该设置为table.identifier = ‘db_name.table_name’。
这时注意到问题在“sink.batch.size”,而不是之前以为“Unsupported options found for ‘doris’”是相关的jar包没找到。原来自己用的例子是旧版本例子:
DROP TABLE IF EXISTS doris_sink;
-- 创建Doris表
CREATE TABLE doris_sink (
id INT PRIMARY KEY,
name STRING,
age INT,
modified_time STRING
) WITH (
'connector' = 'doris',
'fenodes' = '10.0.80.54:8030',
'table.identifier' = 'db_test.test_cdc',
'username' = 'root',
'password' = '',
'sink.batch.size' = '1000'
);
需要修改为
DROP TABLE IF EXISTS doris_sink;
-- 创建Doris表
CREATE TABLE doris_sink (
id INT PRIMARY KEY,
name STRING,
age INT,
modified_time STRING
) WITH (
'connector' = 'doris',
'fenodes' = '10.0.80.54:8030',
'table.identifier' = 'db_test.test_cdc',
'username' = 'root',
'password' = ''
);