欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > 利用单线程池实现多线程并发顺序消费消息

利用单线程池实现多线程并发顺序消费消息

2024/10/25 14:25:03 来源:https://blog.csdn.net/Xiaowu_First/article/details/139843474  浏览:    关键词:利用单线程池实现多线程并发顺序消费消息

1 背景

在某些场景下,我们需要保证消费消息的顺序性,可能要使用单线程处理任务。
这个在消息数量较少时,还是一个可行的方案,但在大量的数据消息情况下,单线程就显得力不从心了,所以这时候需要引入多线程。

2 方案

引入多线程又要保证顺序性,那么只能让同类事件路由到同一个线程中去执行。
这时一般需要使用哈希取余得到哈希槽位数组的下标。
为了提高可扩展可定制化,提供一个哈希函数的接口定义,供使用者实现自己的哈希算法。

    /*** 可哈希计算的任务*/public interface HashableTask extends Runnable {/*** 哈希码计算所需key** @return key值*/Object getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(Object key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}

另外为提高取余计算的效率一般使用位计算and,这要求哈希槽位数是2的幂次方,即如下:

		//校验线程数是否是2的幂次方if (Integer.bitCount(threadCount) != 1) {throw new IllegalArgumentException("threadCount  must be a power of 2");}//计算哈希槽位数组下标int hashcode = task.hash(task.getKey());int index = hashcode & executors.length-1;

自己实现满足顺序消费的线程池,需要实现每个线程对应一个队列,需要维护阻塞队列、线程挂掉、启动新线程,还有优雅停机问题。这些实现都不是那么简单,我们其实可以换个角度思考问题,将线程池线程来使用。这咋理解呢?就是一个线程池只有一个线程,这就实现了所谓的线程池线程的平替。即每个线程池应该这种定义

 ExecutorService executor= new ThreadPoolExecutor(1, 1,1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(size), threadFactory);

下面附上完整代码。

public class PartitionThreadPool {private final ExecutorService[] executors;private final int indexMask;public PartitionThreadPool(int threadCount, int singleQueueCapacity, ThreadFactory threadFactory){if (Integer.bitCount(threadCount) != 1) {throw new IllegalArgumentException("threadCount  must be a power of 2");}indexMask = threadCount - 1;executors = new ExecutorService[threadCount];for (int i = 0; i < threadCount; i++) {executors[i] = new ThreadPoolExecutor(1, 1,1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(singleQueueCapacity), threadFactory);}}public Future<?> submit(HashableTask task){int hashcode = task.hash(task.getKey());int index = hashcode & indexMask;return executors[index].submit(task);}public void shutdown(){for (ExecutorService executor : executors) {executor.shutdownNow();}}/*** 可哈希计算的任务*/public interface HashableTask extends Runnable {/*** 哈希码计算所需key** @return key值*/Object getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(Object key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}}

上面的代码还有些问题,你们发现了么?

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com