package com.example.roomstudy.threadpool.threadpool3
import android.util.Log
import java.util.concurrent.*
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
/** deepseek
-
线程池管理工具类
-
支持任务调度、暂停、取消等操作
*/
class ThreadPoolManager private constructor(builder: Builder) {// region 配置参数
private val corePoolSize: Int
private val maxPoolSize: Int
private val keepAliveTime: Long
private val workQueue: BlockingQueue
private val threadFactory: ThreadFactory
private val exceptionHandler: ((Thread, Throwable) -> Unit)?
// endregion// region 核心组件
private val executor: ThreadPoolExecutor
val coroutineDispatcher: CoroutineContext
internal val taskMap = ConcurrentHashMap<String, Future<*>>()
// endregioninit {
corePoolSize = builder.corePoolSize
maxPoolSize = builder.maxPoolSize
keepAliveTime = builder.keepAliveTime
workQueue = builder.workQueue ?: LinkedBlockingQueue()
threadFactory = builder.threadFactory ?: DefaultThreadFactory(builder.exceptionHandler)
exceptionHandler = builder.exceptionHandlerexecutor = ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.MILLISECONDS,workQueue,threadFactory).apply {allowCoreThreadTimeOut(builder.allowCoreThreadTimeout)}coroutineDispatcher = executor.asCoroutineDispatcher()
}
// region 预设模板
companion object {
/**
* IO密集型任务配置(适合网络请求)
*/
fun newIOThreadPool(): ThreadPoolManager {
val cpuCount = Runtime.getRuntime().availableProcessors()
return Builder().apply {
corePoolSize = cpuCount * 2
maxPoolSize = cpuCount * 4
keepAliveTime = 30L
workQueue = LinkedBlockingQueue(128)
}.build()
}/*** CPU密集型任务配置(适合数据库操作)*/fun newCPUThreadPool(): ThreadPoolManager {val cpuCount = Runtime.getRuntime().availableProcessors()return Builder().apply {corePoolSize = cpuCount + 1maxPoolSize = cpuCount * 2keepAliveTime = 10LworkQueue = LinkedBlockingQueue(64)}.build()}
}
// endregion// region 任务管理
@Synchronized
fun submit(tag: String, task: Runnable): Boolean {
if (taskMap.containsKey(tag)) return false
taskMap[tag] = executor.submit(wrapTask(tag, task))
return true
}fun submit(tag: String, task: Callable): Future {
val future = executor.submit(wrapTask(tag, task))
taskMap[tag] = future
return future
}fun pause(tag: String) = taskMap[tag]?.cancel(true)
fun resume(tag: String, task: Runnable) = submit(tag, task)
fun cancel(tag: String) {
taskMap[tag]?.cancel(true)
taskMap.remove(tag)
}fun shutdown(immediate: Boolean = false) {
if (immediate) {
executor.shutdownNow()
} else {
executor.shutdown()
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow()
}
} catch (e: InterruptedException) {
executor.shutdownNow()
Thread.currentThread().interrupt()
}
}
taskMap.clear()
}
// endregion// region 内部实现
private fun wrapTask(tag: String, task: Runnable): Runnable = Runnable {
try {
task.run()
} finally {
taskMap.remove(tag)
}
}private fun wrapTask(tag: String, task: Callable): Callable = Callable {
try {
task.call()
} finally {
taskMap.remove(tag)
}
}class Builder {
var corePoolSize: Int = Runtime.getRuntime().availableProcessors()
var maxPoolSize: Int = corePoolSize * 2
var keepAliveTime: Long = 10L
var workQueue: BlockingQueue? = null
var threadFactory: ThreadFactory? = null
var allowCoreThreadTimeout: Boolean = false
var exceptionHandler: ((Thread, Throwable) -> Unit)? = nullfun build() = ThreadPoolManager(this)
}
private class DefaultThreadFactory(
private val exceptionHandler: ((Thread, Throwable) -> Unit)?
) : ThreadFactory {
private val group: ThreadGroup
private val threadNumber = AtomicInteger(1)
private val namePrefix: Stringinit {val s = System.getSecurityManager()group = s?.threadGroup ?: Thread.currentThread().threadGroupnamePrefix = "pool-${System.identityHashCode(this)}-thread-"}override fun newThread(r: Runnable): Thread {return Thread(group, r, namePrefix + threadNumber.getAndIncrement()).apply {isDaemon = falsepriority = Thread.NORM_PRIORITYuncaughtExceptionHandler = Thread.UncaughtExceptionHandler { t, e ->exceptionHandler?.invoke(t, e) ?: run {Log.e("ThreadPool", "Uncaught exception in thread ${t.name}", e)}}}}
}
// endregion
}
// region 协程扩展
suspend fun ThreadPoolManager.executeNetworkRequest(
tag: String,
block: suspend () -> Unit
) = coroutineScope {
val job = launch(coroutineDispatcher) {
try {
block()
} catch (e: Exception) {
Log.e(“ThreadPool”, “Network request failed”, e)
throw e
}
}
taskMap[tag] = job.asCancellableFuture()
try {
job.join()
} finally {
taskMap.remove(tag)
}
}
fun Job.asCancellableFuture(): Future = object : Future {
override fun get() = throw UnsupportedOperationException(“Use async API instead”)
override fun get(timeout: Long, unit: TimeUnit) = throw UnsupportedOperationException()
override fun cancel(mayInterrupt: Boolean): Boolean = also { cancel() }.isCancelled
override fun isDone(): Boolean = isCompleted
override fun isCancelled(): Boolean = isCancelled
}
suspend fun ThreadPoolManager.executeDbOperation(
tag: String,
block: suspend () -> Unit
) = withContext(coroutineDispatcher) {
try {
block()
} catch (e: Exception) {
Log.e(“ThreadPool”, “DB operation failed”, e)
throw e
}
}
// endregion