欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 美景 > 基于Java线程池的多线程运行器工具类

基于Java线程池的多线程运行器工具类

2024/10/24 2:02:44 来源:https://blog.csdn.net/xhom_w/article/details/140995716  浏览:    关键词:基于Java线程池的多线程运行器工具类
1.多线程运行器
package com.visy.utils;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;/*** 多线程运行器* @author visy.wang* @date 2024/8/7 13:07*/
@Getter
public class MultiThreadRunner {/*** 线程池内线程命名前缀*/private String prefix;/*** 核心线程数* 核心线程会一直存活,即使没有任务需要执行*/private Integer corePoolSize;/*** 最大线程数* corePoolSize <= 当前线程数 < maxPoolSize,且任务队列已满时,线程池会创建新线程来处理任务* 当前线程数 = maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常(具体见拒绝策略RejectedExecutionHandler)*/private Integer maxPoolSize;/*** 任务队列容量(阻塞队列)* 当前线程数 = corePoolSize,新任务会放在队列中排队等待执行*/private Integer capacity;/*** 线程空闲时间* 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量 = corePoolSize*/private Long keepAliveTime;/*** 时间单位*/private TimeUnit timeUnit;/*** 任务拒绝处理器* 两种情况会拒绝处理任务:*   1.当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务*   2.当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。*     如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务,线程池会调用“任务拒绝处理器”来处理这个任务*/private RejectedExecutionHandler handler;/*** 线程池*/private ThreadPoolExecutor pool;public static MultiThreadRunner builder(){return new MultiThreadRunner();}public MultiThreadRunner prefix(String prefix){if(pool == null){this.prefix = prefix;}return this;}public MultiThreadRunner corePoolSize(Integer corePoolSize){if(pool == null){this.corePoolSize = corePoolSize;}return this;}public MultiThreadRunner maxPoolSize(Integer maxPoolSize){if(pool == null){this.maxPoolSize = maxPoolSize;}return this;}public MultiThreadRunner capacity(Integer capacity){if(pool == null){this.capacity = capacity;}return this;}public MultiThreadRunner keepAliveTime(Long keepAliveTime){if(pool == null){this.keepAliveTime = keepAliveTime;}return this;}public MultiThreadRunner timeUnit(TimeUnit timeUnit){if(pool == null){this.timeUnit = timeUnit;}return this;}public MultiThreadRunner handler(RejectedExecutionHandler handler){if(pool == null){this.handler = handler;}return this;}/*** 创建运行器(创建内部持有的线程池)* @return 运行器*/public synchronized MultiThreadRunner build(){if(pool != null){//线程池只创建一次return this;}pool = new ThreadPoolExecutor(corePoolSize==null ? corePoolSize=getSystemCores()*2 : corePoolSize, //核心线程数,默认CPU核心数的2倍maxPoolSize==null ? maxPoolSize=corePoolSize+1 : maxPoolSize, //最大线程数,默认核心线程数+1keepAliveTime==null ? keepAliveTime=0L : keepAliveTime, //线程空闲时间,默认0timeUnit==null ? timeUnit=TimeUnit.MILLISECONDS : timeUnit, //时间单位,默认毫秒new LinkedBlockingQueue<>(capacity==null ? capacity=100 : capacity), //阻塞队列长度,默认100prefix==null ? Executors.defaultThreadFactory() : new ThreadFactoryBuilder().setNameFormat(prefix+"-%d").build(),handler==null ? handler=new ThreadPoolExecutor.AbortPolicy() : handler //任务拒绝处理器,默认AbortPolicy);return this;}/*** 获取系统CPU核心数*/public int getSystemCores(){return Runtime.getRuntime().availableProcessors();}/*** 线程池的线程数量* 如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减*/public int getPoolSize(){return pool==null ? 0 : pool.getPoolSize();}/*** 获取活动的线程数* 即正在执行任务的线程数*/public int getActiveCount(){return pool==null ? 0 : pool.getActiveCount();}/*** 关闭线程池*/public void shutdown(){if(pool != null){pool.shutdown();}}/*** 执行单个异步任务(无返回)* @param executor 执行器* @return Future*/public Future<Void> runAsync(Runnable executor){if(pool == null){return CompletableFuture.runAsync(executor);}return CompletableFuture.runAsync(executor, pool);}/*** 执行多个异步任务(无返回)* @param list 任务列表* @param executor 执行器* @return Future列表* @param <U> 输入类型*/public <U> List<Future<Void>> runAsync(List<U> list, Consumer<U> executor){return list.stream().map(item -> {return runAsync(() -> executor.accept(item));}).collect(Collectors.toList());}/*** 执行单个异步任务(有返回)* @param executor 执行器* @return 持有输出结果的Future* @param <R> 输出类型*/public <R> Future<R> supplyAsync(Supplier<R> executor){if(pool == null){return CompletableFuture.supplyAsync(executor);}return CompletableFuture.supplyAsync(executor, pool);}/*** 执行多个异步任务(有返回)* @param list 任务列表* @param executor 执行器* @return 持有输出结果的Future列表* @param <U> 输入类型* @param <R> 输出类型*/public <U,R> List<Future<R>> supplyAsync(List<U> list, Function<U,R> executor){return list.stream().map(item -> {return supplyAsync(() -> executor.apply(item));}).collect(Collectors.toList());}/*** 等待单个异步任务执行完毕(无返回)* @param executor 执行器*/public void awaitRunAsync(Runnable executor)throws ExecutionException, InterruptedException {runAsync(executor).get();}/*** 等待单个异步任务执行完毕(无返回),可设置超时时间* @param executor 执行器* @param timeout 超时时间* @param timeUnit 时间单位*/public void awaitRunAsync(Runnable executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {runAsync(executor).get(timeout, timeUnit);}/*** 等待多个异步任务执行完毕(无返回)* @param list 任务列表* @param executor 执行器* @param <U> 输入类型*/public <U> void awaitRunAsync(List<U> list, Consumer<U> executor)throws ExecutionException, InterruptedException {for (Future<Void> future : runAsync(list, executor)) {future.get();}}/*** 等待多个异步任务执行完毕(无返回),可设置超时时间* @param list 任务列表* @param executor 执行器* @param timeout 超时时间* @param timeUnit 时间单位* @param <U> 输入类型*/public <U> void awaitRunAsync(List<U> list, Consumer<U> executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {for (Future<Void> future : runAsync(list, executor)) {future.get(timeout, timeUnit);}}/*** 等待单个异步任务执行完毕(有返回)* @param executor 执行器* @return 执行结果* @param <R> 输出类型*/public <R> R awaitSupplyAsync(Supplier<R> executor)throws ExecutionException, InterruptedException {return supplyAsync(executor).get();}/*** 等待单个异步任务执行完毕(有返回),可设置超时时间* @param executor 执行器* @param timeout 超时时间* @param timeUnit 时间单位* @return 执行结果* @param <R> 输出类型*/public <R> R awaitSupplyAsync(Supplier<R> executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {return supplyAsync(executor).get(timeout, timeUnit);}/*** 等待多个异步任务执行完毕(有返回)* @param list 任务列表* @param executor 执行器* @return 执行结果列表* @param <U> 输入类型* @param <R> 输出类型*/public <U,R> List<R> awaitSupplyAsync(List<U> list, Function<U,R> executor)throws ExecutionException, InterruptedException {List<R> resultList = new ArrayList<>();for (Future<R> future : supplyAsync(list, executor)) {resultList.add(future.get());}return resultList;}/*** 等待多个异步任务执行完毕(有返回),可设置超时时间* @param list 任务列表* @param executor 执行器* @param timeout 超时时间* @param timeUnit 时间单位* @return 执行结果列表* @param <U> 输入类型* @param <R> 输出类型*/public <U,R> List<R> awaitSupplyAsync(List<U> list, Function<U,R> executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {List<R> resultList = new ArrayList<>();for (Future<R> future : supplyAsync(list, executor)) {resultList.add(future.get(timeout, timeUnit));}return resultList;}
}
2.测试
package com.visy.utils;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;/*** @author visy.wang* @date 2024/8/7 14:26*/
@Slf4j
public class MultiThreadRunnerTest {//初始化多线程运行器//必须调用build()才能完成内部线程池的创建private static final MultiThreadRunner multiThreadRunner = MultiThreadRunner.builder().prefix("test").corePoolSize(5).maxPoolSize(8).capacity(10).build(); public static void main(String[] args) throws Exception {List<Integer> list = new ArrayList<>();for(int i=0; i<20; i++){list.add(i);}System.out.println("执行单个异步任务(无返回)");multiThreadRunner.runAsync(() -> {System.out.println(66);}).get();System.out.println("等待单个异步任务执行完毕(无返回)");multiThreadRunner.awaitRunAsync(() -> {System.out.println(77);});System.out.println("执行单个异步任务(有返回)");System.out.println(multiThreadRunner.supplyAsync(() -> 88).get());System.out.println("等待单个异步任务执行完毕(有返回)");System.out.println(multiThreadRunner.awaitSupplyAsync(() -> 99));System.out.println("执行多个异步任务(无返回)");for (Future<Void> future : multiThreadRunner.runAsync(list, i -> System.out.print((i + 1) + ";"))) {future.get();}System.out.println("\n等待多个异步任务执行完毕(无返回)");multiThreadRunner.awaitRunAsync(list, i -> System.out.print((i+2)+";"));System.out.println("\n执行多个异步任务(有返回)");List<Future<Integer>> futures = multiThreadRunner.supplyAsync(list, i -> i+3);for (Future<Integer> future : futures) {System.out.print(future.get()+";");}System.out.println("\n等待多个异步任务执行完毕(有返回)");multiThreadRunner.awaitSupplyAsync(list, i -> i+4).forEach(i -> {System.out.print(i+";");});//关闭任务执行器(注意:关闭后不能再执行任何任务)multiThreadRunner.shutdown();System.out.println("\n关闭后执行任务");multiThreadRunner.awaitRunAsync(() -> {System.out.println(999);});}
}
3.打印结果
执行单个异步任务(无返回)
66
等待单个异步任务执行完毕(无返回)
77
执行单个异步任务(有返回)
88
等待单个异步任务执行完毕(有返回)
99
执行多个异步任务(无返回)
3;1;5;6;2;8;9;11;7;4;15;12;14;13;10;19;18;17;16;20;
等待多个异步任务执行完毕(无返回)
2;3;6;8;5;4;11;10;9;7;16;15;13;12;20;21;19;14;18;17;
执行多个异步任务(有返回)
3;4;5;6;7;8;9;10;11;12;13;14;15;16;17;18;19;20;21;22;
等待多个异步任务执行完毕(有返回)
4;5;6;7;8;9;10;11;12;13;14;15;16;17;18;19;20;21;22;23;
关闭后执行任务
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncRun@22f71333 rejected from java.util.concurrent.ThreadPoolExecutor@13969fbe[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 84]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)at com.server.utils.MultiThreadRunner.runAsync(MultiThreadRunner.java:173)at com.server.utils.MultiThreadRunner.awaitRunAsync(MultiThreadRunner.java:222)at com.server.utils.MultiThreadRunnerTest.main(MultiThreadRunnerTest.java:69)Process finished with exit code 1

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com