Flink读取TDEngine数据实例,解决com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields错误
用flink读取TDEngine,运行报错: 
com.taosdata.jdbc.rs.RestfulDatabaseMetaData@38af9828 is not serializable. The object probably contains or references non serializable fields 
 这意味着 com.taosdata.jdbc.rs.RestfulDatabaseMetaData 类的对象无法被序列化,而 Flink 的作业中涉及到的某些操作需要将对象传递到不同的任务中,这就要求对象是可序列化的(即实现了 Serializable 接口)。在 Flink 中,所有要在分布式环境中传输或持久化的对象都必须是可序列化的。 
- RestfulDatabaseMetaData是 TDengine JDBC 驱动中的一个类,它可能没有实现- Serializable接口,因此在需要将该类对象传输到其他机器时,Flink 无法进行序列化。
解决方法是
使用 transient 关键字避免对不可序列化对象进行传递。
通过标记 connection、preparedStatement 和 resultSet 为 transient,这些对象不会被 Flink 传递到 Task Manager。
完整可执行代码如下:
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class TDengineSourceFunction extends RichParallelSourceFunction<RunData> {
    private transient Connection connection;        // 使用 transient 避免序列化
    private transient PreparedStatement preparedStatement;
    private transient ResultSet resultSet;
    private String query;
    private volatile boolean isRunning = true;
    private String jdbcUrl;
    private String user;
    private String password;
    public TDengineSourceFunction(String jdbcUrl, String user, String password, String query) {
        this.query = query;
        this.jdbcUrl = jdbcUrl;
        this.user = user;
        this.password = password;
        // JDBC连接参数在open()方法中初始化
    }
    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
        // 在这里初始化数据库连接
        this.connection = DriverManager.getConnection(jdbcUrl, user, password);
        // 准备SQL查询语句
        this.preparedStatement = connection.prepareStatement(query);
        this.resultSet = preparedStatement.executeQuery();
    }
    @Override
    public void run(SourceContext<RunData> sourceContext) throws Exception {
        while (isRunning && resultSet.next()) {
            // 从ResultSet中提取数据并转换为RunData对象
            RunData data = convertResultSetToData(resultSet);
            // 将数据发送到Flink的处理流中
            if (data != null) {
                sourceContext.collect(data);
            }
        }
    }
    @Override
    public void cancel() {
        isRunning = false;
        // 关闭资源
        try {
            if (resultSet != null) resultSet.close();
            if (preparedStatement != null) preparedStatement.close();
            if (connection != null) connection.close();
        } catch (SQLException e) {
            // 处理关闭资源时的异常
            e.printStackTrace();
        }
    }
    private RunData convertResultSetToData(ResultSet resultSet) throws SQLException {
        // 提取单行数据
      
        // 将数据转换为 RunData 对象
      //  return new RunData(......);
        return null;
    }
}
