解决Flink SQL:Exception in thread “main” org.apache.flink.table.api.ValidationException: Rowtime attribute ‘ptime’ must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type ‘BIGINT’.
在开发Flink SQL时报错:
在flink 1.16版本中执行报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: Rowtime attribute 'ptime' must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type 'BIGINT'.
at org.apache.flink.table.api.TableSchema.validateColumnsAndWatermarkSpecs(TableSchema.java:535)
at org.apache.flink.table.api.TableSchema.access$100(TableSchema.java:73)
at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:802)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.build(MergeTableLikeUtil.java:534)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:154)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:171)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:74)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:330)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:758)
at com.chuneng.saas.doris.FlinkDorisExtremeValueCalculation.main(FlinkDorisExtremeValueCalculation.java:44)
原因分析
错误的核心在于:Flink 要求用于定义 WATERMARK
的字段必须是 TIMESTAMP
或 TIMESTAMP_LTZ
类型,但你的 ptime
字段被定义为 BIGINT
类型。尽管你在 WATERMARK
中尝试将 ptime
转换为 TIMESTAMP
,但 Flink 的 WATERMARK
语法要求直接引用一个已经存在的 TIMESTAMP
类型字段,而不是在 WATERMARK
定义中动态转换类型。
解决方案
你需要通过 计算列(Computed Column) 将 ptime
的 BIGINT
类型转换为 TIMESTAMP
类型,然后将 WATERMARK
绑定到计算列上。
例如像下面这样:
String sourceDDL = "CREATE TABLE ods_t_iot_data (" +
"pid STRING," +
"pvalue DECIMAL(13,3)," +
"ptime BIGINT," +
"ds DATE," +
// 定义计算列,将 BIGINT 类型的 ptime 转换为 TIMESTAMP
"event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ptime / 1000))," +
// 绑定 WATERMARK 到计算列 event_time
"WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
"'connector' = 'doris'," +
"'fenodes' = '10.0.0.1:8030'," +
"'table.identifier' = 'cnsaas.ods_t_iot'," +
"'username' = 'root'," +
"'password' = ''" +
")";