提示:ChangeEventQueue是 Debezium 中的一个关键类,ChangeEventQueue的作用是用于存储和分发变更事件。
文章目录
- 前言
- 一、核心功能
- 二、代码分析
- 总结
前言
提示:
ChangeEventQueue是一个用于处理变更事件的队列,它允许将记录加入队列中,以便通过特定的方法来获取和处理这些记录。当队列满时,该方法会阻塞,确保所有加入的记录都能被正确处理。这种队列机制适用于需要处理大量变更事件的情况,能够有效地管理和分发这些事件,确保每个事件都能被适当地处理。
提示:以下是本篇文章正文内容
一、核心功能
核心功能详细说明
-
生产者向队列添加记录:
- 使用
signalAll()
方法通知所有等待的消费者线程,告知它们可以尝试消费队列中的记录。 - 使用
await()
方法使生产者线程等待,直到队列有足够的空间或者被其他线程唤醒。 - 记录被添加到队列中。
- 如果启用了按字节大小控制队列的功能,还会更新相关的统计信息。
- 当队列达到批处理大小或字节大小阈值时,再次使用
signalAll()
方法通知消费者线程开始处理。
- 使用
-
消费者从队列中获取记录:
- 加锁以保证并发安全性。
- 初始化一个列表用于存储从队列中获取的记录,并预分配空间以优化性能。
- 检查并可能抛出之前发生的生产者异常。
- 使用循环调用
drainRecords()
方法来从队列中获取记录,直到达到批次大小限制或队列大小上限,或者等待超时。 - 在等待过程中,使用
signalAll()
方法通知生产者线程可以添加更多的记录。 - 使用
await()
方法使消费者线程等待,直到队列中有新的记录或者等待超时。 - 完成后,再次使用
signalAll()
方法通知生产者线程可以添加更多的记录。 - 返回收集到的记录列表。
-
辅助方法:
drainRecords()
方法用于从队列中移除并返回指定数量的记录,并更新队列的字节大小统计。producerException()
方法用于记录生产者抛出的异常。throwProducerExceptionIfPresent()
方法用于检查并抛出记录的生产者异常。- 其他方法提供了队列状态的查询,例如总容量、剩余容量、最大队列大小(按字节数)、当前队列大小(按字节数)等。
二、代码分析
/*** 将记录添加到队列中。如果队列已满,将等待直到队列有空间后再进行添加。** @param record 要添加到队列中的数据记录。* @throws InterruptedException 如果当前线程在等待时被中断。*/
protected void doEnqueue(T record) throws InterruptedException {// 如果启用了追踪日志记录,则记录入队操作if (LOGGER.isTraceEnabled()) {LOGGER.trace("正在入队源记录 '{}'", record);}try {// 获取锁,以确保线程安全this.lock.lock();// 当队列达到大小或字节大小阈值时,等待并通知可能的poll()操作消费队列while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {this.isFull.signalAll(); // 通知消费线程尝试消费队列this.isNotFull.await(pollInterval.toMillis(), TimeUnit.MILLISECONDS); // 等待队列有空间或被唤醒}// 添加记录到队列queue.add(record);// 如果设置了按字节控制队列大小的功能,则更新相关统计if (maxQueueSizeInBytes > 0) {long messageSize = record.objectSize();sizeInBytesQueue.add(messageSize);currentQueueSizeInBytes += messageSize;}// 达到批处理大小或队列字节大小阈值时,立即通知消费线程开始处理if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {this.isFull.signalAll(); // 通知消费线程}}finally {// 无论是否成功,都要释放锁以避免死锁this.lock.unlock();}
}public List<T> poll() throws InterruptedException {// 保存当前日志上下文。LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();try {// 记录日志,表示正在轮询记录...LOGGER.debug("polling records...");// 初始化计时器,用于控制最大等待时间。final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.min(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));try {// 加锁以确保并发安全。this.lock.lock();// 初始化用于存储记录的列表,预分配空间以优化性能。List<T> records = new ArrayList<>(Math.min(maxBatchSize, queue.size()));// 检查并抛出生产者异常(如果存在)throwProducerExceptionIfPresent();// 循环尝试 drained 记录,直到达到批次大小、队列大小上限或等待超时while (drainRecords(records, maxBatchSize - records.size()) < maxBatchSize&& (maxQueueSizeInBytes == 0 || currentQueueSizeInBytes < maxQueueSizeInBytes)&& !timeout.expired()) {// 再次检查并抛出生产者异常(如果存在)。throwProducerExceptionIfPresent();// 如果没有更多记录或未达到批次大小,记录日志并短暂等待。LOGGER.debug("no records available or batch size not reached yet, sleeping a bit...");long remainingTimeoutMills = timeout.remaining().toMillis();if (remainingTimeoutMills > 0) {// signal doEnqueue() to add more records// 通知生产者可以添加更多记录。this.isNotFull.signalAll();// no records available or batch size not reached yet, so wait a bit// 等待一段时间或直至超时。this.isFull.await(remainingTimeoutMills, TimeUnit.MILLISECONDS);}// 继续检查是否有更多记录。LOGGER.debug("checking for more records...");}// signal doEnqueue() to add more records// 最后一次通知生产者可以添加更多记录。this.isNotFull.signalAll();// 返回收集到的记录列表。return records;}finally {// 解锁以允许其他线程访问。this.lock.unlock();}}finally {// 还原之前的日志上下文。previousContext.restore();}
}/*** 从队列中取出指定数量的元素并添加到指定列表中* 此方法旨在有效地清空队列,以避免内存溢出或性能下降的问题* * @param records 接收从队列中取出的元素的列表* @param maxElements 单次操作允许取出的最大元素数量* @return 返回清空队列后,records列表的大小*/
private long drainRecords(List<T> records, int maxElements) {// 获取当前队列中的元素数量int queueSize = queue.size();// 如果队列为空,则直接返回records列表的大小if (queueSize == 0) {return records.size();}// 计算本次操作需要取出的元素数量,保证不超过队列当前的大小和允许的最大取出数量int recordsToDrain = Math.min(queueSize, maxElements);// 创建一个临时数组,用于存储即将从队列中取出的元素T[] drainedRecords = (T[]) new Sizeable[recordsToDrain];// 从队列中取出元素,并存入临时数组中for (int i = 0; i < recordsToDrain; i++) {T record = queue.poll();drainedRecords[i] = record;}// 如果设置了最大队列大小(以字节为单位),则相应地更新当前队列大小if (maxQueueSizeInBytes > 0) {for (int i = 0; i < recordsToDrain; i++) {Long objectSize = sizeInBytesQueue.poll();currentQueueSizeInBytes -= (objectSize == null ? 0L : objectSize);}}// 将临时数组中的元素添加到records列表中records.addAll(Arrays.asList(drainedRecords));// 返回操作完成后records列表的大小return records.size();
}/*** 将记录加入队列,以便通过{@link #poll()}方法获取。如果队列已满,此方法将阻塞。** @param record 要加入队列的记录* @throws InterruptedException 如果当前线程已被中断,则抛出此异常*/
public void enqueue(T record) throws InterruptedException {// 如果记录为null,则直接返回,不进行后续操作if (record == null) {return;}// 如果当前线程已被中断,则抛出InterruptedException异常,中止操作if (Thread.interrupted()) {throw new InterruptedException();}// 如果启用了缓冲,则使用bufferedEvent进行记录的临时存储和替换if (buffering) {// 用新的记录替换bufferedEvent中旧的记录,如果bufferedEvent为空,则说明是第一个到来的事件,直接返回record = bufferedEvent.getAndSet(record);if (record == null) {// 只有第一个到来的事件会遇到的情况,直接返回return;}}// 实际执行将记录加入队列的操作doEnqueue(record);
}
总结
-
队列管理:
- 管理一个内部队列,用于存储生产者产生的记录。
- 提供方法来添加记录到队列 (
doEnqueue
) 和从队列中获取记录 (poll
)。
-
生产者行为:
- 生产者线程通过调用
doEnqueue
方法向队列添加记录。 - 在队列满时等待,直到队列有足够的空间。
- 在队列达到批处理大小或字节大小阈值时通知消费者线程开始处理。
- 生产者线程通过调用
-
消费者行为:
- 消费者线程通过调用
poll
方法从队列中获取记录。 - 在队列为空时等待,直到队列中有新的记录。
- 在获取到记录后进行处理。
- 消费者线程通过调用
-
同步机制:
- 使用
signalAll()
方法通知所有等待的线程。 - 使用
await()
方法使线程进入等待状态,直到被其他线程唤醒或等待超时。
- 使用
-
异常处理:
- 提供方法记录生产者抛出的异常,并在适当的时候抛出这些异常给消费者线程。
-
队列状态查询:
- 提供方法查询队列的总容量、剩余容量、最大队列大小(按字节数)、当前队列大小(按字节数)等。
-
按字节数控制队列大小:
- 支持按记录数量和字节数控制队列的最大容量,以实现更细粒度的资源管理。
-
并发控制:
- 使用锁机制确保多线程环境下的数据一致性。
ChangeEventQueue的设计考虑了性能和效率,通过设置最大队列大小和最大批处理大小等参数,优化了事件的处理和分发过程。这些参数的设置有助于控制队列的容量和处理速度,避免因事件过多而导致处理延迟或资源耗尽的问题。
总的来说,ChangeEventQueue通过其设计机制,有效地管理和分发变更事件,确保了事件处理的效率和正确性,是处理大量变更事件时的有效工具