DataX DorisWriter 插件DorisWriterManager类详细解读
DorisWriterManager
的类,用于将数据写入到 Doris 中。以下是代码的具体作用和功能解释:
- 导入必要的包和类: 代码开头导入了所需的包和类,包括日志记录、线程池、字符编码和其他相关工具类。
- 类成员变量定义: 下面是一些类的成员变量定义,这些变量在类的不同方法中使用:
LOG
: 用于记录日志的 Logger 对象。visitor
:DorisStreamLoadObserver
类的实例,用于处理数据写入 Doris 的观察者。options
:Keys
类的实例,包含了一些配置选项。buffer
: 存储待写入 Doris 的数据。batchCount
: 当前批次中的记录数量。batchSize
: 当前批次中的数据大小。closed
: 标志位,表示是否已关闭写入。flushException
: 异步刷新数据时可能发生的异常。flushQueue
: 用于异步刷新数据的队列。scheduler
: 用于定期刷新数据的调度器。scheduledFuture
: 用于取消定时任务的句柄。
- 构造函数
DorisWriterManager
: 构造函数接受一个Keys
对象作为参数,设置了初始化的配置信息,并初始化了visitor
和flushQueue
。接着,它调用startScheduler()
启动定期刷新任务,以及startAsyncFlushing()
启动异步刷新线程。 startScheduler()
方法: 此方法负责启动定时刷新任务。它首先调用stopScheduler()
停止之前的定时任务。然后,创建一个单线程的调度器(scheduler),并设置一个定时任务,定期触发数据刷新操作。在定时任务内部,它会检查是否关闭了写入操作,然后根据配置信息进行数据刷新。如果当前批次为空,重新启动定时任务,确保数据持续刷新。stopScheduler()
方法: 此方法用于停止定时任务。它会取消之前的定时任务并关闭调度器。writeRecord(String record)
方法: 该方法用于将记录写入缓冲区。它首先调用checkFlushException()
方法检查是否存在刷新异常。然后,将记录转换成字节数组并添加到缓冲区中,同时更新批次计数和数据大小。如果当前批次的记录数量或数据大小超过了阈值,就会触发数据刷新。flush(String label, boolean waitUntilDone)
方法: 此方法用于手动触发数据刷新操作。它首先检查是否存在刷新异常,然后根据当前批次的情况决定是否执行刷新。如果当前批次为空,且waitUntilDone
为真,它会等待之前的异步刷新操作完成。否则,它将当前批次的数据放入刷新队列,并根据waitUntilDone
参数决定是否等待刷新操作完成。close()
方法: 此方法用于关闭DorisWriterManager
。它首先检查是否已经关闭,然后触发一次最终的数据刷新操作。如果当前批次有数据,会记录相应日志。最后,它检查是否有刷新异常并抛出相应异常。createBatchLabel()
方法: 此方法用于创建批次标签,用于标识一批数据。它根据配置的前缀和随机 UUID 生成标签。startAsyncFlushing()
方法: 此方法启动一个异步刷新线程。线程会循环调用asyncFlush()
方法,将数据异步刷新到 Doris 中。waitAsyncFlushingDone()
方法: 该方法用于等待之前的异步刷新操作完成。它向刷新队列添加空的WriterTuple
,以确保之前的刷新操作完成。然后,它检查是否存在刷新异常。asyncFlush()
方法: 此方法用于异步刷新数据到 Doris。它从刷新队列中取出WriterTuple
,然后根据批次的标签执行数据刷新操作。如果发生异常,它会尝试多次,直到达到最大重试次数。如果需要重新创建批次标签,则生成新的标签。重试之间会休眠一段时间。成功后,重新启动定时任务。checkFlushException()
方法: 此方法用于检查是否存在刷新异常,如果存在则抛出异常。
这个 DorisWriterManager
类的目的是管理数据写入到 Doris 数据库的操作。它通过定时任务和异步刷新线程来控制数据的批量写入,同时处理异常情况,确保数据的稳定写入。
添加详细注释代码如下:
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DorisWriterManager {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
private final DorisStreamLoadObserver visitor;
private final Keys options;
private final List<byte[]> buffer = new ArrayList<>(); // 缓冲区,用于存储待写入 Doris 的数据
private int batchCount = 0; // 当前批次中的记录数量
private long batchSize = 0; // 当前批次中的数据大小
private volatile boolean closed = false; // 标志位,表示是否已关闭
private volatile Exception flushException; // 异步刷新数据时可能发生的异常
private final LinkedBlockingDeque<WriterTuple> flushQueue; // 用于异步刷新数据的队列
private ScheduledExecutorService scheduler; // 用于定期刷新数据的调度器
private ScheduledFuture<?> scheduledFuture;
public DorisWriterManager(Keys options) {
this.options = options;
this.visitor = new DorisStreamLoadObserver(options);
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
this.startScheduler(); // 启动定期刷新调度器
this.startAsyncFlushing(); // 启动异步刷新线程
}
// 启动定期刷新调度器
public void startScheduler() {
stopScheduler(); // 停止之前的调度器
this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder()
.namingPattern("Doris-interval-flush").daemon(true).build());
this.scheduledFuture = this.scheduler.schedule(() -> {
synchronized (DorisWriterManager.this) {
if (!closed) {
try {
String label = createBatchLabel();
LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label));
if (batchCount == 0) {
startScheduler(); // 如果当前批次为空,重新启动定时任务
}
flush(label, false);
} catch (Exception e) {
flushException = e;
}
}
}
}, options.getFlushInterval(), TimeUnit.MILLISECONDS);
}
// 停止定期刷新调度器
public void stopScheduler() {
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
}
// 写入一条记录到缓冲区
public final synchronized void writeRecord(String record) throws IOException {
checkFlushException(); // 检查是否有刷新异常
try {
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
buffer.add(bts);
batchCount++;
batchSize += bts.length;
if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
String label = createBatchLabel();
LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
flush(label, false); // 当记录数量或数据大小超过阈值时触发刷新
}
} catch (Exception e) {
throw new IOException("Writing records to Doris failed.", e);
}
}
// 手动触发刷新缓冲区的数据
public synchronized void flush(String label, boolean waitUntilDone) throws Exception {
checkFlushException(); // 检查是否有刷新异常
if (batchCount == 0) {
if (waitUntilDone) {
waitAsyncFlushingDone(); // 如果当前批次为空,等待之前的刷新操作完成
}
return;
}
flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer))); // 将数据放入刷新队列
if (waitUntilDone) {
waitAsyncFlushingDone(); // 等待刷新操作完成
}
buffer.clear();
batchCount = 0;
batchSize = 0;
}
// 关闭 DorisWriterManager,触发最后一次刷新操作
public synchronized void close() {
if (!closed) {
closed = true;
try {
String label = createBatchLabel();
if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));
flush(label, true); // 关闭时触发刷新操作
} catch (Exception e) {
throw new RuntimeException("Writing records to Doris failed.", e);
}
}
checkFlushException();
}
// 创建批次标签,通常用于标识一批数据
public String createBatchLabel() {
StringBuilder sb = new StringBuilder();
if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {
sb.append(options.getLabelPrefix());
}
return sb.append(UUID.randomUUID().toString()).toString();
}
// 启动异步刷新线程
private void startAsyncFlushing() {
Thread flushThread = new Thread(new Runnable() {
public void run() {
while (true) {
try {
asyncFlush(); // 异步刷新数据
} catch (Exception e) {
flushException = e;
}
}
}
});
flushThread.setDaemon(true);
flushThread.start();
}
// 等待之前的刷新操作完成
private void waitAsyncFlushingDone() throws InterruptedException {
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
flushQueue.put(new WriterTuple("", 0L, null));
}
checkFlushException();
}
// 异步刷新数据到 Doris
private void asyncFlush() throws Exception {
WriterTuple flushData = flushQueue.take();
if (Strings.isNullOrEmpty(flushData.getLabel())) {
return;
}
stopScheduler(); // 停止定时任务
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
for (int i = 0; i <= options.getMaxRetries(); i++) {
try {
// 利用 DorisStreamLoadObserver 进行数据刷新
visitor.streamLoad(flushData);
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
startScheduler(); //
break;
} catch (Exception e) {
LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);
if (i >= options.getMaxRetries()) {
throw new IOException(e);
}
if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {
String newLabel = createBatchLabel();
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
flushData.setLabel(newLabel);
}
try {
Thread.sleep(1000l * Math.min(i + 1, 10));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
}
}
}
}
private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to Doris failed.", flushException);
}
}
}