欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > kotlin 线程池封装

kotlin 线程池封装

2025/3/20 3:14:46 来源:https://blog.csdn.net/qq_26296197/article/details/146345938  浏览:    关键词:kotlin 线程池封装

package com.example.roomstudy.threadpool.threadpool3

import android.util.Log
import java.util.concurrent.*
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger

/** deepseek

  • 线程池管理工具类

  • 支持任务调度、暂停、取消等操作
    */
    class ThreadPoolManager private constructor(builder: Builder) {

    // region 配置参数
    private val corePoolSize: Int
    private val maxPoolSize: Int
    private val keepAliveTime: Long
    private val workQueue: BlockingQueue
    private val threadFactory: ThreadFactory
    private val exceptionHandler: ((Thread, Throwable) -> Unit)?
    // endregion

    // region 核心组件
    private val executor: ThreadPoolExecutor
    val coroutineDispatcher: CoroutineContext
    internal val taskMap = ConcurrentHashMap<String, Future<*>>()
    // endregion

    init {
    corePoolSize = builder.corePoolSize
    maxPoolSize = builder.maxPoolSize
    keepAliveTime = builder.keepAliveTime
    workQueue = builder.workQueue ?: LinkedBlockingQueue()
    threadFactory = builder.threadFactory ?: DefaultThreadFactory(builder.exceptionHandler)
    exceptionHandler = builder.exceptionHandler

     executor = ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.MILLISECONDS,workQueue,threadFactory).apply {allowCoreThreadTimeOut(builder.allowCoreThreadTimeout)}coroutineDispatcher = executor.asCoroutineDispatcher()
    

    }

    // region 预设模板
    companion object {
    /**
    * IO密集型任务配置(适合网络请求)
    */
    fun newIOThreadPool(): ThreadPoolManager {
    val cpuCount = Runtime.getRuntime().availableProcessors()
    return Builder().apply {
    corePoolSize = cpuCount * 2
    maxPoolSize = cpuCount * 4
    keepAliveTime = 30L
    workQueue = LinkedBlockingQueue(128)
    }.build()
    }

     /*** CPU密集型任务配置(适合数据库操作)*/fun newCPUThreadPool(): ThreadPoolManager {val cpuCount = Runtime.getRuntime().availableProcessors()return Builder().apply {corePoolSize = cpuCount + 1maxPoolSize = cpuCount * 2keepAliveTime = 10LworkQueue = LinkedBlockingQueue(64)}.build()}
    

    }
    // endregion

    // region 任务管理
    @Synchronized
    fun submit(tag: String, task: Runnable): Boolean {
    if (taskMap.containsKey(tag)) return false
    taskMap[tag] = executor.submit(wrapTask(tag, task))
    return true
    }

    fun submit(tag: String, task: Callable): Future {
    val future = executor.submit(wrapTask(tag, task))
    taskMap[tag] = future
    return future
    }

    fun pause(tag: String) = taskMap[tag]?.cancel(true)

    fun resume(tag: String, task: Runnable) = submit(tag, task)

    fun cancel(tag: String) {
    taskMap[tag]?.cancel(true)
    taskMap.remove(tag)
    }

    fun shutdown(immediate: Boolean = false) {
    if (immediate) {
    executor.shutdownNow()
    } else {
    executor.shutdown()
    try {
    if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
    executor.shutdownNow()
    }
    } catch (e: InterruptedException) {
    executor.shutdownNow()
    Thread.currentThread().interrupt()
    }
    }
    taskMap.clear()
    }
    // endregion

    // region 内部实现
    private fun wrapTask(tag: String, task: Runnable): Runnable = Runnable {
    try {
    task.run()
    } finally {
    taskMap.remove(tag)
    }
    }

    private fun wrapTask(tag: String, task: Callable): Callable = Callable {
    try {
    task.call()
    } finally {
    taskMap.remove(tag)
    }
    }

    class Builder {
    var corePoolSize: Int = Runtime.getRuntime().availableProcessors()
    var maxPoolSize: Int = corePoolSize * 2
    var keepAliveTime: Long = 10L
    var workQueue: BlockingQueue? = null
    var threadFactory: ThreadFactory? = null
    var allowCoreThreadTimeout: Boolean = false
    var exceptionHandler: ((Thread, Throwable) -> Unit)? = null

     fun build() = ThreadPoolManager(this)
    

    }

    private class DefaultThreadFactory(
    private val exceptionHandler: ((Thread, Throwable) -> Unit)?
    ) : ThreadFactory {
    private val group: ThreadGroup
    private val threadNumber = AtomicInteger(1)
    private val namePrefix: String

     init {val s = System.getSecurityManager()group = s?.threadGroup ?: Thread.currentThread().threadGroupnamePrefix = "pool-${System.identityHashCode(this)}-thread-"}override fun newThread(r: Runnable): Thread {return Thread(group, r, namePrefix + threadNumber.getAndIncrement()).apply {isDaemon = falsepriority = Thread.NORM_PRIORITYuncaughtExceptionHandler = Thread.UncaughtExceptionHandler { t, e ->exceptionHandler?.invoke(t, e) ?: run {Log.e("ThreadPool", "Uncaught exception in thread ${t.name}", e)}}}}
    

    }
    // endregion
    }

// region 协程扩展
suspend fun ThreadPoolManager.executeNetworkRequest(
tag: String,
block: suspend () -> Unit
) = coroutineScope {
val job = launch(coroutineDispatcher) {
try {
block()
} catch (e: Exception) {
Log.e(“ThreadPool”, “Network request failed”, e)
throw e
}
}
taskMap[tag] = job.asCancellableFuture()
try {
job.join()
} finally {
taskMap.remove(tag)
}
}

fun Job.asCancellableFuture(): Future = object : Future {
override fun get() = throw UnsupportedOperationException(“Use async API instead”)
override fun get(timeout: Long, unit: TimeUnit) = throw UnsupportedOperationException()
override fun cancel(mayInterrupt: Boolean): Boolean = also { cancel() }.isCancelled
override fun isDone(): Boolean = isCompleted
override fun isCancelled(): Boolean = isCancelled
}

suspend fun ThreadPoolManager.executeDbOperation(
tag: String,
block: suspend () -> Unit
) = withContext(coroutineDispatcher) {
try {
block()
} catch (e: Exception) {
Log.e(“ThreadPool”, “DB operation failed”, e)
throw e
}
}
// endregion

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词