1.多线程运行器
package com.visy.utils;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@Getter
public class MultiThreadRunner {private String prefix;private Integer corePoolSize;private Integer maxPoolSize;private Integer capacity;private Long keepAliveTime;private TimeUnit timeUnit;private RejectedExecutionHandler handler;private ThreadPoolExecutor pool;public static MultiThreadRunner builder(){return new MultiThreadRunner();}public MultiThreadRunner prefix(String prefix){if(pool == null){this.prefix = prefix;}return this;}public MultiThreadRunner corePoolSize(Integer corePoolSize){if(pool == null){this.corePoolSize = corePoolSize;}return this;}public MultiThreadRunner maxPoolSize(Integer maxPoolSize){if(pool == null){this.maxPoolSize = maxPoolSize;}return this;}public MultiThreadRunner capacity(Integer capacity){if(pool == null){this.capacity = capacity;}return this;}public MultiThreadRunner keepAliveTime(Long keepAliveTime){if(pool == null){this.keepAliveTime = keepAliveTime;}return this;}public MultiThreadRunner timeUnit(TimeUnit timeUnit){if(pool == null){this.timeUnit = timeUnit;}return this;}public MultiThreadRunner handler(RejectedExecutionHandler handler){if(pool == null){this.handler = handler;}return this;}public synchronized MultiThreadRunner build(){if(pool != null){return this;}pool = new ThreadPoolExecutor(corePoolSize==null ? corePoolSize=getSystemCores()*2 : corePoolSize, maxPoolSize==null ? maxPoolSize=corePoolSize+1 : maxPoolSize, keepAliveTime==null ? keepAliveTime=0L : keepAliveTime, timeUnit==null ? timeUnit=TimeUnit.MILLISECONDS : timeUnit, new LinkedBlockingQueue<>(capacity==null ? capacity=100 : capacity), prefix==null ? Executors.defaultThreadFactory() : new ThreadFactoryBuilder().setNameFormat(prefix+"-%d").build(),handler==null ? handler=new ThreadPoolExecutor.AbortPolicy() : handler );return this;}public int getSystemCores(){return Runtime.getRuntime().availableProcessors();}public int getPoolSize(){return pool==null ? 0 : pool.getPoolSize();}public int getActiveCount(){return pool==null ? 0 : pool.getActiveCount();}public void shutdown(){if(pool != null){pool.shutdown();}}public Future<Void> runAsync(Runnable executor){if(pool == null){return CompletableFuture.runAsync(executor);}return CompletableFuture.runAsync(executor, pool);}public <U> List<Future<Void>> runAsync(List<U> list, Consumer<U> executor){return list.stream().map(item -> {return runAsync(() -> executor.accept(item));}).collect(Collectors.toList());}public <R> Future<R> supplyAsync(Supplier<R> executor){if(pool == null){return CompletableFuture.supplyAsync(executor);}return CompletableFuture.supplyAsync(executor, pool);}public <U,R> List<Future<R>> supplyAsync(List<U> list, Function<U,R> executor){return list.stream().map(item -> {return supplyAsync(() -> executor.apply(item));}).collect(Collectors.toList());}public void awaitRunAsync(Runnable executor)throws ExecutionException, InterruptedException {runAsync(executor).get();}public void awaitRunAsync(Runnable executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {runAsync(executor).get(timeout, timeUnit);}public <U> void awaitRunAsync(List<U> list, Consumer<U> executor)throws ExecutionException, InterruptedException {for (Future<Void> future : runAsync(list, executor)) {future.get();}}public <U> void awaitRunAsync(List<U> list, Consumer<U> executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {for (Future<Void> future : runAsync(list, executor)) {future.get(timeout, timeUnit);}}public <R> R awaitSupplyAsync(Supplier<R> executor)throws ExecutionException, InterruptedException {return supplyAsync(executor).get();}public <R> R awaitSupplyAsync(Supplier<R> executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {return supplyAsync(executor).get(timeout, timeUnit);}public <U,R> List<R> awaitSupplyAsync(List<U> list, Function<U,R> executor)throws ExecutionException, InterruptedException {List<R> resultList = new ArrayList<>();for (Future<R> future : supplyAsync(list, executor)) {resultList.add(future.get());}return resultList;}public <U,R> List<R> awaitSupplyAsync(List<U> list, Function<U,R> executor, long timeout, TimeUnit timeUnit)throws ExecutionException, InterruptedException, TimeoutException {List<R> resultList = new ArrayList<>();for (Future<R> future : supplyAsync(list, executor)) {resultList.add(future.get(timeout, timeUnit));}return resultList;}
}
2.测试
package com.visy.utils;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
@Slf4j
public class MultiThreadRunnerTest {private static final MultiThreadRunner multiThreadRunner = MultiThreadRunner.builder().prefix("test").corePoolSize(5).maxPoolSize(8).capacity(10).build(); public static void main(String[] args) throws Exception {List<Integer> list = new ArrayList<>();for(int i=0; i<20; i++){list.add(i);}System.out.println("执行单个异步任务(无返回)");multiThreadRunner.runAsync(() -> {System.out.println(66);}).get();System.out.println("等待单个异步任务执行完毕(无返回)");multiThreadRunner.awaitRunAsync(() -> {System.out.println(77);});System.out.println("执行单个异步任务(有返回)");System.out.println(multiThreadRunner.supplyAsync(() -> 88).get());System.out.println("等待单个异步任务执行完毕(有返回)");System.out.println(multiThreadRunner.awaitSupplyAsync(() -> 99));System.out.println("执行多个异步任务(无返回)");for (Future<Void> future : multiThreadRunner.runAsync(list, i -> System.out.print((i + 1) + ";"))) {future.get();}System.out.println("\n等待多个异步任务执行完毕(无返回)");multiThreadRunner.awaitRunAsync(list, i -> System.out.print((i+2)+";"));System.out.println("\n执行多个异步任务(有返回)");List<Future<Integer>> futures = multiThreadRunner.supplyAsync(list, i -> i+3);for (Future<Integer> future : futures) {System.out.print(future.get()+";");}System.out.println("\n等待多个异步任务执行完毕(有返回)");multiThreadRunner.awaitSupplyAsync(list, i -> i+4).forEach(i -> {System.out.print(i+";");});multiThreadRunner.shutdown();System.out.println("\n关闭后执行任务");multiThreadRunner.awaitRunAsync(() -> {System.out.println(999);});}
}
3.打印结果
执行单个异步任务(无返回)
66
等待单个异步任务执行完毕(无返回)
77
执行单个异步任务(有返回)
88
等待单个异步任务执行完毕(有返回)
99
执行多个异步任务(无返回)
3;1;5;6;2;8;9;11;7;4;15;12;14;13;10;19;18;17;16;20;
等待多个异步任务执行完毕(无返回)
2;3;6;8;5;4;11;10;9;7;16;15;13;12;20;21;19;14;18;17;
执行多个异步任务(有返回)
3;4;5;6;7;8;9;10;11;12;13;14;15;16;17;18;19;20;21;22;
等待多个异步任务执行完毕(有返回)
4;5;6;7;8;9;10;11;12;13;14;15;16;17;18;19;20;21;22;23;
关闭后执行任务
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncRun@22f71333 rejected from java.util.concurrent.ThreadPoolExecutor@13969fbe[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 84]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)at com.server.utils.MultiThreadRunner.runAsync(MultiThreadRunner.java:173)at com.server.utils.MultiThreadRunner.awaitRunAsync(MultiThreadRunner.java:222)at com.server.utils.MultiThreadRunnerTest.main(MultiThreadRunnerTest.java:69)Process finished with exit code 1