1 背景
在某些场景下,我们需要保证消费消息的顺序性,可能要使用单线程处理任务。
这个在消息数量较少时,还是一个可行的方案,但在大量的数据消息情况下,单线程就显得力不从心了,所以这时候需要引入多线程。
2 方案
引入多线程又要保证顺序性,那么只能让同类事件路由到同一个线程中去执行。
这时一般需要使用哈希取余得到哈希槽位数组的下标。
为了提高可扩展可定制化,提供一个哈希函数的接口定义,供使用者实现自己的哈希算法。
/*** 可哈希计算的任务*/public interface HashableTask extends Runnable {/*** 哈希码计算所需key** @return key值*/Object getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(Object key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}
另外为提高取余计算的效率一般使用位计算and
,这要求哈希槽位数是2的幂次方,即如下:
//校验线程数是否是2的幂次方if (Integer.bitCount(threadCount) != 1) {throw new IllegalArgumentException("threadCount must be a power of 2");}//计算哈希槽位数组下标int hashcode = task.hash(task.getKey());int index = hashcode & executors.length-1;
自己实现满足顺序消费的线程池,需要实现每个线程对应一个队列,需要维护阻塞队列、线程挂掉、启动新线程,还有优雅停机问题。这些实现都不是那么简单,我们其实可以换个角度思考问题,将线程池
当线程
来使用。这咋理解呢?就是一个线程池只有一个线程,这就实现了所谓的线程池
对线程
的平替。即每个线程池应该这种定义
ExecutorService executor= new ThreadPoolExecutor(1, 1,1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(size), threadFactory);
下面附上完整代码。
public class PartitionThreadPool {private final ExecutorService[] executors;private final int indexMask;public PartitionThreadPool(int threadCount, int singleQueueCapacity, ThreadFactory threadFactory){if (Integer.bitCount(threadCount) != 1) {throw new IllegalArgumentException("threadCount must be a power of 2");}indexMask = threadCount - 1;executors = new ExecutorService[threadCount];for (int i = 0; i < threadCount; i++) {executors[i] = new ThreadPoolExecutor(1, 1,1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(singleQueueCapacity), threadFactory);}}public Future<?> submit(HashableTask task){int hashcode = task.hash(task.getKey());int index = hashcode & indexMask;return executors[index].submit(task);}public void shutdown(){for (ExecutorService executor : executors) {executor.shutdownNow();}}/*** 可哈希计算的任务*/public interface HashableTask extends Runnable {/*** 哈希码计算所需key** @return key值*/Object getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(Object key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}}
上面的代码还有些问题,你们发现了么?