Kafka存在大量的延迟操作,比如延迟删除、延迟拉取等。Kafka基于时间轮概念自定义了一个用于延迟操作的定时器。
JDK自带的Timer和DelayQueue缺陷
Timer和DelayQueue都可以插入多个定时任务,它们都使用一个优先级队列来管理任务,复杂度为O(logn)。
Timer | 单线程,前置任务会阻塞后置任务,如果任务抛出异常,Timer会中断停止。 |
DelayQueue | 线程安全,可用于多线程,是一个无界阻塞队列。 |
表 Timer和DelayQueue的对比
public static void main(String[] args) throws InterruptedException {DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();delayQueue.offer(new DelayQueueTask("task1",5000));delayQueue.offer(new DelayQueueTask("task2",2000));delayQueue.offer(new DelayQueueTask("task3",4000));System.out.println("开始执行delayQueue任务");while (!delayQueue.isEmpty()) {DelayQueueTask task = delayQueue.take();System.out.println("任务:" + task);}System.out.println("delayQueue任务 任务执行完毕");
}
时间轮结构
任务插入及删除O(logn)的复杂度不能满足Kafka高性能要求。时间轮时间复杂度为O(1)。
图 时间轮(TimingWheel)结构
时间轮类似于机械手表,秒针1s前进一次,分针在秒针前进一圈后前进1格。。
tick | 时间跨度。上图第一层tick=1s,第二次tick=10s。 |
wheelSize | 时间格数。每一层格数一样。每层时间周期interval=tick*wheelSize |
currentTick | 每层当前指向的时间格。 |
bucket | 桶,每个时间格中用于保存待执行任务的列表(TimerTaskList)。 |
表 时间轮中的基本概念
// 时间轮添加任务的伪代码:HashedWheelTimer 的 addTask 方法
public void addTask(TimerTask task, long delayMs) {if (delayMs < current.interval) {// 插入当前层int bucketIndex = (currentTick + delayMs / current.tickMs) % current.wheelSize;current.buckets[bucketIndex].addTask(task);} else {// 检查是否存在上层时间轮if (overflowTimer == null) {// 动态创建上层时间轮long nextTickMs = current.tickMs * current.wheelSize; // 上层 tickMs = 当前层总跨度overflowTimer = new HashedWheelTimer(nextTickMs, current.wheelSize);}// 递归调用:将剩余延迟传递给上层overflowTimer.addTask(task, delayMs - current.interval);}
}
每次创建上层时间轮时,该层的currentTick初始为0。
时间的推进
时间轮如果像机械手表那样,按照最底层时间跨度一格一格推进,那么将非常耗性能。Kafka使用DelayQueue来推进时间。桶中任务链表按照待执行时间进行排序,其中最快执行的任务放在头部。桶的TimerTaskList将作为DelayQueue一个元素插入,该元素的待执行时间为TimerTaskList的头部元素的时间。
当TimerTaskList被取出执行时,此时会维护各层的currentTick。同时会对列表中还有剩余时间的任务进行“时间轮降级”,将它们插入到对于的桶中。