欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > 如何简单手搓一个自定义线程池-保姆级教程

如何简单手搓一个自定义线程池-保姆级教程

2024/10/25 18:26:26 来源:https://blog.csdn.net/qq_62775328/article/details/140915248  浏览:    关键词:如何简单手搓一个自定义线程池-保姆级教程

1. 引入必要的库

确保你已经引入了Java的并发库,通常这是标准的JDK库,不需要额外添加依赖。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

2. 定义线程工厂

线程工厂是创建新线程的工厂类,便于定制线程的属性(例如线程名称、优先级等)。

class CustomThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public CustomThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName(namePrefix + "-thread-" + threadNumber.getAndIncrement());t.setDaemon(false);t.setPriority(Thread.NORM_PRIORITY);return t;}
}

3. 创建自定义线程池

使用 ThreadPoolExecutor 创建自定义的线程池,配置核心线程数、最大线程数、存活时间等参数。

public class CustomThreadPool {private static final int CORE_POOL_SIZE = 10;private static final int MAX_POOL_SIZE = 20;private static final long KEEP_ALIVE_TIME = 1L;private static final TimeUnit TIME_UNIT = TimeUnit.MINUTES;private static final int QUEUE_CAPACITY = 100;public static ExecutorService createCustomThreadPool() {return new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TIME_UNIT,new ArrayBlockingQueue<>(QUEUE_CAPACITY),new CustomThreadFactory("CustomPool"),new ThreadPoolExecutor.AbortPolicy());}public static void main(String[] args) {ExecutorService executorService = createCustomThreadPool();// 使用 CompletableFuture 和自定义线程池CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println("Task executed in custom thread pool: " + Thread.currentThread().getName());}, executorService);// 等待所有任务完成future.join();// 关闭线程池executorService.shutdown();}
}

4. 使用 CompletableFuture 与自定义线程池

通过 CompletableFuture 提交任务到自定义线程池执行。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println("Task executed in custom thread pool: " + Thread.currentThread().getName());
}, executorService);

5. 管理和监控线程池(可选

为了确保线程池的稳定运行,通常需要监控线程池的状态和执行情况,可以通过JMX或者自定义监控工具实现。

public static void monitorThreadPool(ThreadPoolExecutor threadPool) {
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {System.out.println(String.format("[Monitor] Pool Size: %d, Active Threads: %d, Completed Tasks: %d, Total Tasks: %d",threadPool.getPoolSize(),threadPool.getActiveCount(),threadPool.getCompletedTaskCount(),threadPool.getTaskCount()));
}, 0, 1, TimeUnit.SECONDS);
}

相关问题:

线程id为什么要使用AtomicInteger?

在自定义线程工厂中使用 AtomicInteger 作为线程 ID 的计数器,是为了确保线程 ID 的生成是线程安全的。在并发环境中,多线程同时访问共享变量时,如果不加以保护,可能会导致数据竞争和不一致性。具体来说,以下是使用 AtomicInteger 的原因和优势:

线程安全性

AtomicIntegerjava.util.concurrent.atomic 包中的一个类,提供了原子操作,确保在并发环境下对整数变量的操作是线程安全的。它使用底层的硬件原语(如CAS操作)来实现原子性,避免了传统锁机制的开销。

避免竞争条件

竞争条件(Race Condition)是指多个线程竞争访问和修改共享资源,导致数据不一致的现象。在这个案例中,线程工厂为每个新创建的线程分配一个唯一的ID。如果使用普通的整数作为计数器,多个线程同时访问和修改这个计数器,可能会导致分配重复ID或遗漏某些ID。AtomicInteger 确保了每次自增操作都是独立、原子的,从而避免了这种问题。

性能优化

相对于使用 synchronized 关键字或者 Lock 对象,AtomicInteger 提供了更高效的方式来保证线程安全。由于 AtomicInteger 基于硬件原语实现,通常比传统锁机制性能更好,特别是在高并发环境下。

易用性

AtomicInteger 提供了一组易于使用的方法,如 incrementAndGet()decrementAndGet()addAndGet(int delta) 等,使得原子操作的编写和维护变得更加简单和直观。

线程队列的几种常见队列,及其使用场景?

1. 有界队列(Bounded Queue)

常见实现:ArrayBlockingQueue
特性
  • 容量限制:队列有固定的容量。
  • 先进先出:遵循FIFO原则。
优点
  • 控制资源使用:通过限制队列大小,可以控制任务提交速度,防止资源耗尽。
  • 预见性:提供更可预见的性能,避免内存溢出。
缺点
  • 可能阻塞:如果队列满了,提交新任务的线程会被阻塞,直到有空间可用。
使用场景
  • 资源受限系统:需要严格控制资源使用的系统。
  • 可预测负载:任务负载相对稳定且可预测的场景。

2. 无界队列(Unbounded Queue)

常见实现:LinkedBlockingQueue
特性
  • 无容量限制:队列可以无限增长。
  • 先进先出:遵循FIFO原则。
优点
  • 简单实现:没有容量限制,插入操作永远不会阻塞。
  • 高吞吐量:适用于高并发场景,可以最大化利用线程池资源。
缺点
  • 潜在资源耗尽:如果任务提交速度超过处理速度,可能导致内存耗尽。
  • 不可预见性:负载高峰期间,可能出现性能不可预见的情况。
使用场景
  • 高并发系统:适用于需要处理大量并发任务的系统。
  • 短期任务峰值:系统能够应对短期任务峰值负载。

3. 延迟队列(Delayed Queue)

常见实现:DelayQueue
特性
  • 定时任务:任务按照设定的延迟时间排序,只有在延迟时间到达后才会被执行。
  • 时间优先级:基于时间的优先级处理。
优点
  • 时间调度:适用于需要在指定时间后执行的任务。
  • 灵活性:支持定时和周期性任务调度。
缺点
  • 实现复杂度:实现和维护比普通队列复杂。
  • 内存开销:每个任务需要记录延迟时间,可能增加内存开销。
使用场景
  • 定时任务调度:如定时器、延迟消息处理等。
  • 周期性任务:需要在特定时间间隔执行的任务。

4. 同步队列(Synchronous Queue)

常见实现:SynchronousQueue
特性
  • 无存储空间:每个插入操作必须等待一个对应的删除操作,反之亦然。
  • 直接传递:任务直接从生产者传递到消费者。
优点
  • 低延迟:直接任务传递,适合低延迟场景。
  • 高吞吐量:避免了队列的存储开销。
缺点
  • 潜在阻塞:如果没有空闲线程来处理任务,提交新任务的线程会被阻塞。
使用场景
  • 实时系统:需要低延迟、高吞吐量的实时系统。
  • 生产者-消费者模式:适用于生产者和消费者速率相近的场景。

5. 优先级队列(Priority Queue)

常见实现:PriorityBlockingQueue
特性
  • 优先级排序:任务根据优先级排序,优先级高的任务优先执行。
  • 动态调整:允许动态调整任务优先级。
优点
  • 任务管理:支持按优先级处理任务,灵活管理任务执行顺序。
  • 适应性强:适应不同优先级任务的混合处理。
缺点
  • 实现复杂度:实现和维护比普通队列复杂。
  • 性能开销:插入和删除操作可能比FIFO队列耗时更长。
使用场景
  • 复杂任务调度:需要按优先级处理的复杂任务调度场景。
  • 资源优化:需要优化资源使用的场景,通过优先级调整任务执行顺序。

通过线程工厂管理线程池的方法中为什么createThreadPool方法返回的 不是ThreadPoolExecutor而是ExecutorService ?

在Java并发编程中,ExecutorService 是一个更高级的接口,它扩展了基本的 Executor 接口,提供了更丰富的线程池管理功能。ThreadPoolExecutorExecutorService 的一个具体实现,但通常建议返回 ExecutorService 而不是具体的 ThreadPoolExecutor

1. 接口与实现分离的原则

将方法的返回类型定义为 ExecutorService 而不是具体的 ThreadPoolExecutor,是遵循了面向接口编程的原则。

  • 灵活性:调用者可以依赖接口而不是具体实现,增加代码的灵活性和可扩展性。以后如果需要更换为其他类型的线程池(例如 ScheduledThreadPoolExecutor),只需改变实例化的具体实现,不需要修改依赖此方法的代码。
  • 封装性:隐藏具体实现细节,使得调用者无需关心底层实现,提高了代码的封装性和可维护性。

2. ExecutorService 介绍

ExecutorService 是Java并发包中的一个接口,提供了管理和控制线程池的更高层次的抽象。它扩展了 Executor 接口,增加了以下功能:

任务提交

ExecutorService 提供了多种提交任务的方法,支持一次性任务和批量任务的提交。

  • submit():可以提交一个 RunnableCallable 任务,并返回一个 Future 对象,允许任务执行完毕后获取结果或检查任务状态。
Future<?> future = executorService.submit(new RunnableTask());
Future<String> future = executorService.submit(new CallableTask());
任务关闭

ExecutorService 提供了有序关闭线程池的方法,确保所有已经提交的任务在关闭时被执行完毕。

  • shutdown():启动有序关闭,不再接受新任务,但会继续执行已经提交的任务。
executorService.shutdown();
  • shutdownNow():尝试停止所有正在执行的任务,并返回等待执行的任务列表。
List<Runnable> pendingTasks = executorService.shutdownNow();
任务管理

ExecutorService 提供了等待所有任务完成的方法,允许调用者阻塞直到所有任务完成。

  • awaitTermination():等待所有任务在指定时间内完成,如果超时则返回。
executorService.awaitTermination(1, TimeUnit.MINUTES);

拒绝策略

1. AbortPolicy

行为
  • 直接拒绝:当任务不能被提交时,直接抛出 RejectedExecutionException
优点
  • 简单明了:立即通知调用者任务提交失败,便于错误处理。
缺点
  • 不处理任务:未提供任何任务处理或缓解机制,可能导致任务丢失。
使用场景
  • 任务丢失可接受:系统能容忍部分任务丢失,或需要立即发现任务提交失败的情况。
示例代码
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy()
);

2. CallerRunsPolicy

行为
  • 调用者执行:当任务不能被提交时,由提交任务的线程(调用者线程)来执行任务。
优点
  • 缓解压力:减轻线程池负担,避免任务完全丢失。
  • 均衡负载:调用者线程参与执行,可能降低任务提交速度,从而减轻线程池压力。
缺点
  • 延迟增加:调用者线程被占用执行任务,可能导致调用者线程延迟或阻塞。
使用场景
  • 任务重要性高:任务不能被丢弃,但可以接受延迟执行的场景。
示例代码
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.CallerRunsPolicy()
);

3. DiscardPolicy

行为
  • 直接丢弃:当任务不能被提交时,直接丢弃任务,不做任何处理。
优点
  • 简单高效:避免了额外的错误处理逻辑和开销。
缺点
  • 任务丢失:任务被直接丢弃,可能导致重要任务未执行。
使用场景
  • 任务非关键:系统能够容忍任务丢失,且任务对系统的影响较小。
示例代码
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.DiscardPolicy()
);

4. DiscardOldestPolicy

行为
  • 丢弃最旧任务:当任务不能被提交时,丢弃队列中最旧的任务,然后重新尝试提交当前任务。
优点
  • 保持新任务:确保新提交的任务有机会被执行,避免任务队列中的任务因长时间等待而过时。
缺点
  • 任务顺序改变:可能导致任务执行顺序被打乱,最早提交的任务可能被丢弃。
使用场景
  • 任务时效性:新任务优先级更高,系统需要优先处理新提交任务的场景。
示例代码
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.DiscardOldestPolicy()
);

总结

每种拒绝策略都有其独特的行为和适用场景,选择合适的拒绝策略应根据系统的具体需求和任务的特性来决定:

  • AbortPolicy:适用于需要立即发现和处理任务提交失败的场景。
  • CallerRunsPolicy:适用于不能丢弃任务且可以接受任务延迟执行的场景。
  • DiscardPolicy:适用于任务非关键且可以容忍任务丢失的场景。
  • DiscardOldestPolicy:适用于任务时效性强,新任务优先级高的场景。

通过合理选择和配置拒绝策略,可以有效管理线程池的行为,提高系统的稳定性和响应性。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com