欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > Java实现自定义线程池

Java实现自定义线程池

2025/4/12 3:43:56 来源:https://blog.csdn.net/weixin_46294086/article/details/142101350  浏览:    关键词:Java实现自定义线程池

Java实现自定义线程池

image-20240910154502115

ThreadPool

public interface ThreadPool {void execute(Runnable runnable);void shutdown();int getInitSize();int getMaxSize();int getCoreSize();int getQueueSize();int getActiveCount();boolean isShutdown();
}

RunnableQueue

public interface RunnableQueue {void offer(Runnable runnable);Runnable take() throws InterruptedException;int size();
}

ThreadFactory

public interface ThreadFactory {Thread createThread(Runnable r);}

DennyPolicy

public interface DennyPolicy {void reject(Runnable r, ThreadPool pool);class DiscardPolicy implements DennyPolicy {@Overridepublic void reject(Runnable r, ThreadPool pool) {// do nothing}}class AbortPolicy implements DennyPolicy {@Overridepublic void reject(Runnable r, ThreadPool pool) {throw new RejectedExecutionException();}}class RunnerDenyPolicy implements DennyPolicy {@Overridepublic void reject(Runnable r, ThreadPool pool) {if (!pool.isShutdown()) {r.run();}}}}

RunnableDenyException

public class RunnableDenyException extends RuntimeException {public RunnableDenyException(String message) {super(message);}}

InternalTask

public class InternalTask implements Runnable{private final RunnableQueue runnableQueue;private volatile boolean running = true;public InternalTask(RunnableQueue runnableQueue) {this.runnableQueue = runnableQueue;}@Overridepublic void run() {while (running && !Thread.currentThread().isInterrupted()) {try {Runnable task = runnableQueue.take();task.run();} catch (InterruptedException e) {running = false;break;}}}public void stop() {running = false;}}

LinkedRunnableQueue

public class LinkedRunnableQueue implements RunnableQueue {private final int limit;private final DennyPolicy dennyPolicy;private final LinkedList<Runnable> runnableList = new LinkedList<>();private final ThreadPool threadPool;public LinkedRunnableQueue(final int limit, final DennyPolicy dennyPolicy, ThreadPool threadPool) {this.limit = limit;this.dennyPolicy = dennyPolicy;this.threadPool = threadPool;}@Overridepublic void offer(Runnable runnable) {synchronized (runnableList) {if (runnableList.size() >= limit) {dennyPolicy.reject(runnable, threadPool);} else {runnableList.add(runnable);runnableList.notify();}}}@Overridepublic Runnable take() throws InterruptedException {synchronized (runnableList) {while (runnableList.isEmpty()) {try {runnableList.wait();} catch (InterruptedException e) {throw e;}}}return runnableList.removeFirst();}@Overridepublic int size() {synchronized (runnableList) {return runnableList.size();}}
}

BasicThreadPool

public class BasicThreadPool extends Thread implements ThreadPool {private final int initSize;private final int maxSize;private final int coreSize;private int activeCount;private final ThreadFactory threadFactory;private final RunnableQueue runnableQueue;private volatile boolean isShutdown = false;private final Queue<ThreadTask> threadQueue = new ArrayDeque<ThreadTask>();private final static DennyPolicy DEFAULT_DENNY_POLICY = new DennyPolicy.DiscardPolicy();private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();private final long keepAliveTime;private final TimeUnit timeUnit;public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENNY_POLICY, 10, TimeUnit.SECONDS);}public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DennyPolicy dennyPolicy, long keepAliveTime, TimeUnit unit) {this.initSize = initSize;this.maxSize = maxSize;this.coreSize = coreSize;this.threadFactory = threadFactory;this.runnableQueue = new LinkedRunnableQueue(queueSize, dennyPolicy, this);this.keepAliveTime = keepAliveTime;this.timeUnit = unit;this.init();}private void init() {start();for (int i = 0; i < initSize; i++) {newThread();}}private void newThread() {InternalTask internalTask = new InternalTask(runnableQueue);Thread thread = this.threadFactory.createThread(internalTask);ThreadTask threadTask = new ThreadTask(thread, internalTask);threadQueue.offer(threadTask);this.activeCount++;thread.start();}private void removeThread() {ThreadTask threadTask = threadQueue.remove();threadTask.internalTask.stop();this.activeCount--;}@Overridepublic void run() {while (!isShutdown && !isInterrupted()) {try {timeUnit.sleep(keepAliveTime);} catch (InterruptedException e) {isShutdown = true;break;}synchronized (this) {if (isShutdown) {break;}if (runnableQueue.size() > 0 && activeCount < coreSize) {for (int i = initSize; i < coreSize; i++) {newThread();}continue;}if (runnableQueue.size() > 0 && activeCount < maxSize) {for (int i = coreSize; i < maxSize; i++) {newThread();}}if (runnableQueue.size() == 0 && activeCount > coreSize) {for (int i = coreSize; i < activeCount; i++) {removeThread();}}}}}private static class ThreadTask {Thread thread;InternalTask internalTask;public ThreadTask(Thread thread, InternalTask internalTask) {this.thread = thread;this.internalTask = internalTask;}}@Overridepublic void execute(Runnable runnable) {if (this.isShutdown) {throw new IllegalStateException("ThreadPool is closed");}this.runnableQueue.offer(runnable);}@Overridepublic void shutdown() {synchronized (this) {if (isShutdown) {return;}isShutdown = true;threadQueue.forEach(threadTask -> {threadTask.internalTask.stop();threadTask.thread.interrupt();});this.interrupt();}}@Overridepublic int getInitSize() {if (isShutdown) {throw new IllegalStateException("ThreadPool is closed");}return this.initSize;}@Overridepublic int getMaxSize() {if (isShutdown) {throw new IllegalStateException("ThreadPool is closed");}return this.maxSize;}@Overridepublic int getCoreSize() {if (isShutdown) {throw new IllegalStateException("ThreadPool is closed");}return this.coreSize;}@Overridepublic int getQueueSize() {if (isShutdown) {throw new IllegalStateException("ThreadPool is closed");}return runnableQueue.size();}@Overridepublic int getActiveCount() {synchronized (this) {return activeCount;}}@Overridepublic boolean isShutdown() {return this.isShutdown;}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);private static final ThreadGroup group = new ThreadGroup("MyThreadPool" + GROUP_COUNTER.getAndDecrement());private static final AtomicInteger COUNTER = new AtomicInteger(1);@Overridepublic Thread createThread(Runnable runnable) {return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement());}}}

ThreadPoolTest

public class ThreadPoolTest {public static void main(String[] args) throws InterruptedException {final BasicThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);for (int i = 0; i < 20; i++) {threadPool.execute(() -> {try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}});}for (; ; ) {System.out.println("getActiveCount() = " + threadPool.getActiveCount());System.out.println("getQueueSize() = " + threadPool.getQueueSize());System.out.println("getCoreSize() = " + threadPool.getCoreSize());System.out.println("getMaxsize() = " + threadPool.getMaxSize());System.out.println("=============================================");TimeUnit.SECONDS.sleep(5);}}
}

运行结果

getActiveCount() = 2
getQueueSize() = 18
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 2
getQueueSize() = 18
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 4
getQueueSize() = 14
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 4
getQueueSize() = 14
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 6
getQueueSize() = 8
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 6
getQueueSize() = 8
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 6
getQueueSize() = 2
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 6
getQueueSize() = 2
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 5
getQueueSize() = 0
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 5
getQueueSize() = 0
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 4
getQueueSize() = 0
getCoreSize() = 4
getMaxsize() = 6
=============================================
getActiveCount() = 4
getQueueSize() = 0
getCoreSize() = 4
getMaxsize() = 6
=============================================

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词