Flink读取TDEngine数据实例,解决com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields错误
用flink读取TDEngine,运行报错:
com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields
这意味着 com.taosdata.jdbc.rs.RestfulDatabaseMetaData
类的对象无法被序列化,而 Flink 的作业中涉及到的某些操作需要将对象传递到不同的任务中,这就要求对象是可序列化的(即实现了 Serializable
接口)。在 Flink 中,所有要在分布式环境中传输或持久化的对象都必须是可序列化的。
RestfulDatabaseMetaData
是 TDengine JDBC 驱动中的一个类,它可能没有实现Serializable
接口,因此在需要将该类对象传输到其他机器时,Flink 无法进行序列化。
解决方法是
使用 transient
关键字避免对不可序列化对象进行传递。
通过标记 connection
、preparedStatement
和 resultSet
为 transient
,这些对象不会被 Flink 传递到 Task Manager。
完整可执行代码如下:
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class TDengineSourceFunction extends RichParallelSourceFunction<RunData> {
private transient Connection connection; // 使用 transient 避免序列化
private transient PreparedStatement preparedStatement;
private transient ResultSet resultSet;
private String query;
private volatile boolean isRunning = true;
private String jdbcUrl;
private String user;
private String password;
public TDengineSourceFunction(String jdbcUrl, String user, String password, String query) {
this.query = query;
this.jdbcUrl = jdbcUrl;
this.user = user;
this.password = password;
// JDBC连接参数在open()方法中初始化
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
// 在这里初始化数据库连接
this.connection = DriverManager.getConnection(jdbcUrl, user, password);
// 准备SQL查询语句
this.preparedStatement = connection.prepareStatement(query);
this.resultSet = preparedStatement.executeQuery();
}
@Override
public void run(SourceContext<RunData> sourceContext) throws Exception {
while (isRunning && resultSet.next()) {
// 从ResultSet中提取数据并转换为RunData对象
RunData data = convertResultSetToData(resultSet);
// 将数据发送到Flink的处理流中
if (data != null) {
sourceContext.collect(data);
}
}
}
@Override
public void cancel() {
isRunning = false;
// 关闭资源
try {
if (resultSet != null) resultSet.close();
if (preparedStatement != null) preparedStatement.close();
if (connection != null) connection.close();
} catch (SQLException e) {
// 处理关闭资源时的异常
e.printStackTrace();
}
}
private RunData convertResultSetToData(ResultSet resultSet) throws SQLException {
// 提取单行数据
// 将数据转换为 RunData 对象
// return new RunData(......);
return null;
}
}