当前位置: 首页 > java >正文

Kotlin 协程异步任务工具类:高效处理异步操作与超时控制

下面是完整的 Kotlin 协程异步任务工具类代码,包含详细注释和使用示例。

import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext/*** 异步任务回调接口* 提供任务执行的各种状态回调** @param T 任务返回结果的类型*/
interface TaskCallback<T> {/*** 任务执行成功回调* @param result 任务执行结果*/fun onSuccess(result: T)/*** 任务执行失败回调* @param error 异常信息*/fun onFailure(error: Throwable)/*** 任务执行超时回调*/fun onTimeout()/*** 任务被取消回调*/fun onCancelled()
}/*** Kotlin 协程异步任务工具类* 提供强大的异步任务执行能力,支持超时控制、重试机制、并行执行等功能*/
class CoroutineTaskUtil private constructor() : CoroutineScope {// 使用 SupervisorJob 允许子协程独立失败而不影响其他协程private val job = SupervisorJob()// 协程上下文配置override val coroutineContext: CoroutineContextget() = Dispatchers.Default + job + CoroutineExceptionHandler { _, throwable ->println("未捕获的协程异常: ${throwable.message}")}companion object {@Volatileprivate var instance: CoroutineTaskUtil? = null/*** 获取工具类单例实例* @return CoroutineTaskUtil 实例*/fun getInstance(): CoroutineTaskUtil {return instance ?: synchronized(this) {instance ?: CoroutineTaskUtil().also { instance = it }}}}/*** 执行异步任务(支持超时控制)** @param task 要执行的挂起任务* @param timeoutMs 超时时间(毫秒),null 表示不设置超时* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeWithTimeout(task: suspend () -> T,timeoutMs: Long? = null,callback: TaskCallback<T>? = null): Job {return launch {try {val result = if (timeoutMs != null) {withTimeout(timeoutMs) { task() }} else {task()}callback?.onSuccess(result)} catch (e: TimeoutCancellationException) {callback?.onTimeout()} catch (e: CancellationException) {callback?.onCancelled()} catch (e: Exception) {callback?.onFailure(e)}}}/*** 执行异步任务(简化版,不带超时)** @param task 要执行的挂起任务* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> execute(task: suspend () -> T,callback: TaskCallback<T>? = null): Job {return executeWithTimeout(task, null, callback)}/*** 在IO线程执行任务,在主线程回调** @param task 要执行的挂起任务* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeOnIoWithMainCallback(task: suspend () -> T,callback: TaskCallback<T>? = null): Job {return CoroutineScope(Dispatchers.IO).launch {try {val result = task()withContext(Dispatchers.Main) {callback?.onSuccess(result)}} catch (e: Exception) {withContext(Dispatchers.Main) {when (e) {is CancellationException -> callback?.onCancelled()else -> callback?.onFailure(e)}}}}}/*** 执行多个并行任务,等待所有任务完成** @param tasks 要执行的多个任务* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeAll(vararg tasks: suspend () -> T,callback: TaskCallback<List<T>>? = null): Job {return launch {try {val deferredResults = tasks.map { async { it() } }val results = deferredResults.awaitAll()callback?.onSuccess(results)} catch (e: Exception) {callback?.onFailure(e)}}}/*** 执行带重试机制的任务** @param task 要执行的任务* @param retries 重试次数* @param delayMs 重试间隔(毫秒)* @param callback 回调接口* @return Job 对象,可用于取消任务*/fun <T> executeWithRetry(task: suspend () -> T,retries: Int = 3,delayMs: Long = 1000,callback: TaskCallback<T>? = null): Job {return launch {var currentRetry = 0var lastError: Throwable? = nullwhile (currentRetry <= retries) {try {val result = task()callback?.onSuccess(result)return@launch} catch (e: Exception) {lastError = ecurrentRetry++if (currentRetry > retries) breakdelay(delayMs)}}callback?.onFailure(lastError ?: Exception("在 $retries 次重试后仍失败"))}}/*** 取消所有正在执行的任务*/fun cancelAll() {job.cancel("手动取消所有任务")}/*** 关闭工具类,释放资源*/fun shutdown() {cancelAll()}
}/*** 简化使用的顶层扩展函数*//*** 为任意类型添加异步执行能力*/
suspend fun <T> T.executeAsync(timeoutMs: Long? = null,callback: TaskCallback<T>? = null
): Job {return CoroutineTaskUtil.getInstance().executeWithTimeout(task = { this },timeoutMs = timeoutMs,callback = callback)
}/*** 简化版的异步任务执行函数*/
suspend fun <T> executeTask(timeoutMs: Long? = null,task: suspend () -> T,onSuccess: (T) -> Unit = {},onFailure: (Throwable) -> Unit = {},onTimeout: () -> Unit = {},onCancelled: () -> Unit = {}
): Job {return CoroutineTaskUtil.getInstance().executeWithTimeout(task = task,timeoutMs = timeoutMs,callback = object : TaskCallback<T> {override fun onSuccess(result: T) = onSuccess(result)override fun onFailure(error: Throwable) = onFailure(error)override fun onTimeout() = onTimeout()override fun onCancelled() = onCancelled()})
}// ==================================================
// 使用示例
// ==================================================/*** 工具类使用示例*/
class CoroutineTaskExamples {/*** 示例1:基本用法*/fun exampleBasicUsage() {CoroutineTaskUtil.getInstance().execute(task = {delay(1000)"异步任务完成"},callback = object : TaskCallback<String> {override fun onSuccess(result: String) {println("任务成功: $result")}override fun onFailure(error: Throwable) {println("任务失败: ${error.message}")}override fun onTimeout() {println("任务超时")}override fun onCancelled() {println("任务被取消")}})}/*** 示例2:带超时的任务*/fun exampleWithTimeout() {CoroutineTaskUtil.getInstance().executeWithTimeout(task = {delay(5000)"长时间任务完成"},timeoutMs = 3000,callback = object : TaskCallback<String> {override fun onSuccess(result: String) {println("成功: $result")}override fun onFailure(error: Throwable) {println("失败: ${error.message}")}override fun onTimeout() {println("任务超时,已中断")}override fun onCancelled() {println("任务被取消")}})}/*** 示例3:带重试的任务*/fun exampleWithRetry() {var attemptCount = 0CoroutineTaskUtil.getInstance().executeWithRetry(task = {attemptCount++if (attemptCount < 3) {throw RuntimeException("第${attemptCount}次尝试失败")}"第${attemptCount}次尝试成功"},retries = 5,delayMs = 500,callback = object : TaskCallback<String> {override fun onSuccess(result: String) {println("重试成功: $result")}override fun onFailure(error: Throwable) {println("重试失败: ${error.message}")}override fun onTimeout() {}override fun onCancelled() {}})}/*** 示例4:并行执行多个任务*/fun exampleParallelTasks() {CoroutineTaskUtil.getInstance().executeAll({ delay(1000)"任务1结果" },{ delay(2000)"任务2结果" },{ delay(1500)"任务3结果" },callback = object : TaskCallback<List<String>> {override fun onSuccess(result: List<String>) {println("所有任务完成: $result")}override fun onFailure(error: Throwable) {println("任务执行失败: ${error.message}")}override fun onTimeout() {}override fun onCancelled() {}})}/*** 示例5:使用简化API*/suspend fun exampleSimplifiedAPI() {executeTask(timeoutMs = 5000,task = {delay(2000)"简化API示例结果"},onSuccess = { result ->println("简化API成功: $result")},onFailure = { error ->println("简化API失败: ${error.message}")})}
}/*** 实际应用场景示例*/
class PracticalExamples {/*** 场景1:网络请求处理*/fun fetchUserData(userId: String) {CoroutineTaskUtil.getInstance().executeWithTimeout(task = {// 模拟网络请求delay(2000)"用户${userId}的数据"},timeoutMs = 10000,callback = object : TaskCallback<String> {override fun onSuccess(result: String) {updateUserInterface(result)}override fun onFailure(error: Throwable) {showErrorMessage("数据加载失败")}override fun onTimeout() {showErrorMessage("请求超时,请重试")}override fun onCancelled() {showErrorMessage("请求被取消")}})}private fun updateUserInterface(data: String) {println("更新UI: $data")}private fun showErrorMessage(message: String) {println("错误信息: $message")}/*** 场景2:文件操作*/fun processLargeFile(filePath: String) {CoroutineTaskUtil.getInstance().executeOnIoWithMainCallback(task = {readAndProcessFile(filePath)},callback = object : TaskCallback<String> {override fun onSuccess(result: String) {showProcessingResult(result)}override fun onFailure(error: Throwable) {showErrorMessage("文件处理失败: ${error.message}")}override fun onTimeout() {}override fun onCancelled() {}})}private suspend fun readAndProcessFile(filePath: String): String {delay(3000)return "处理完成的文件内容: $filePath"}private fun showProcessingResult(result: String) {println("处理结果: $result")}
}// 测试代码
fun main() = runBlocking {val examples = CoroutineTaskExamples()println("=== 示例1:基本用法 ===")examples.exampleBasicUsage()delay(1500)println("\n=== 示例2:带超时的任务 ===")examples.exampleWithTimeout()delay(4000)println("\n=== 示例3:带重试的任务 ===")examples.exampleWithRetry()delay(3000)println("\n=== 示例4:并行执行多个任务 ===")examples.exampleParallelTasks()delay(3000)println("\n=== 示例5:使用简化API ===")examples.exampleSimplifiedAPI()delay(3000)println("\n=== 实际应用场景 ===")val practicalExamples = PracticalExamples()practicalExamples.fetchUserData("123")delay(3000)practicalExamples.processLargeFile("/path/to/file.txt")delay(4000)
}

功能特性

  1. 核心功能

· 异步任务执行:使用协程实现真正的异步操作
· 超时控制:支持设置任务执行超时时间
· 错误处理:完善的异常处理机制
· 线程调度:支持指定执行线程和回调线程

  1. 高级特性

· 重试机制:支持自动重试失败的任务
· 并行执行:支持多个任务并行执行
· 任务取消:提供任务取消功能
· 资源管理:支持资源清理和释放

  1. API 设计

· 简洁易用:提供简单的函数调用接口
· 类型安全:完整的泛型支持
· 扩展性强:支持自定义配置和扩展

使用建议

  1. 网络请求:使用 executeWithTimeout 处理网络请求,设置合适的超时时间
  2. 文件操作:使用 executeOnIoWithMainCallback 在IO线程执行文件操作,在主线程更新UI
  3. 批量处理:使用 executeAll 并行执行多个独立任务
  4. 不稳定操作:使用 executeWithRetry 处理可能失败的操作

这个工具类提供了生产级别的异步任务处理能力,代码格式规范,注释完整,适合直接用于项目开发。

http://www.xdnf.cn/news/19561.html

相关文章:

  • UE5 为啥原生的NotifyState写逻辑会有问题
  • 开源低代码平台(NocoBase)
  • 20250828的学习笔记
  • 9.1日IO作业
  • 2025年09月01日Github流行趋势
  • 99、23种设计模式之组合模式(8/23)
  • 09.《路由基础知识解析和实践》
  • 基于外部对照数据借用的临床试验统计分析方案设计与仿真研究
  • PitVis-2023挑战赛:内镜下垂体瘤手术视频中的手术流程识别|文献速递-深度学习人工智能医疗图像
  • 如何把指定阿里云文件夹下的所有文件移动到另一个文件夹下,移动文件时把文件名称(不包括文件后缀)进行md5编码
  • 从理论到实践,深入剖析数据库水平拆分的安全平滑落地
  • Spark自定义累加器实现高效WordCount
  • Spark和Spring整合处理离线数据
  • promptoMANIA-AI绘画提示词生成器
  • Electron使用WebAssembly实现CRC-16 CCITT校验
  • macOS中Homebrew安装PHP的详细步骤(五)
  • 深入了解Flink核心:Slot资源管理机制
  • PostgreSQL 索引大全
  • 深入理解Docker容器技术:原理与实践
  • 如何安装CUDA????
  • three.js+WebGL踩坑经验合集(10.1):镜像问题又一坑——THREE.InstancedMesh的正反面显示问题
  • 机器学习-时序预测2
  • 基于FPGA+DSP数据采集处理平台的搭建
  • 【Vue2 ✨】Vue2 入门之旅(四):生命周期钩子
  • Unity核心概念③:Inspector窗口可编辑变量
  • C++/QT day3(9.1)
  • 深度学习中常用的激活函数
  • 关系型数据库——GaussDB的简单学习
  • Spring Boot 和 Spring Cloud 的原理和区别
  • 对于牛客网—语言学习篇—编程初学者入门训练—复合类型:BC141 井字棋及BC142 扫雷题目的解析