解决flink报错:org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] cannot be applied to (com.chuneng.saas.dao.SinkToTDengine) iotStream.sinkTo(new SinkToTDengine)
在scala用下面的代码,
iotStream.sinkTo(new SinkToTDengine)
出现报错:
overloaded method value sinkTo with alternatives:
(sink: org.apache.flink.api.connector.sink2.Sink[com.chuneng.saas.vo.IotData])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData] <and>
(sink: org.apache.flink.api.connector.sink.Sink[com.chuneng.saas.vo.IotData, _, _, _])org.apache.flink.streaming.api.datastream.DataStreamSink[com.chuneng.saas.vo.IotData]
cannot be applied to (com.chuneng.saas.dao.SinkToTDengine)
iotStream.sinkTo(new SinkToTDengine)
遇到的错误是由于 sinkTo
方法期望的参数类型与提供的 SinkToTDengine
类型不匹配。具体来说,sinkTo
方法期望的是一个实现了 Sink
接口的类型,而您的 SinkToTDengine
类继承自 RichSinkFunction
,这导致类型不兼容。
错误原因分析
这表明 sinkTo
方法期望的是 Sink[IotData]
或 Sink[IotData, _, _, _]
类型,而您传递的是 SinkToTDengine
,它继承自 RichSinkFunction<IotData>
,因此类型不匹配。
解决方案
要解决这个问题,您需要将 SinkToTDengine
转换为 Flink 支持的 Sink
类型。
使用 addSink
方法
Flink 提供了 addSink
方法,可以直接接受实现了 SinkFunction
的自定义 Sink。这是最直接和常用的方式。
修改后的代码示例:
iotStream
.map(rd => {
val iotData = new IotData()
iotData.setPid(rd.getDeviceCode.replaceAll(".", "-"))
iotData.setVal(rd.getCompensationMW.toString)
// 将13位时间戳转换为%Y-%m-%d %H:%M:%S.%f格式,毫秒保留3位小数
val timestamp = rd.getTime // 假设rd.getTime()返回的是13位时间戳(毫秒级)
val instant = Instant.ofEpochMilli(timestamp)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withLocale(Locale.CHINA)
.withZone(ZoneId.systemDefault())
val formattedTime = formatter.format(instant)
iotData.setTs(formattedTime)
iotData
})
.print()
.addSink(new SinkToTDengine()) // 使用 addSink 方法