当前位置: 首页 > ai >正文

Kotlin知识体系(七) : Flow线程控制、状态管理及异常处理指南

1. 线程切换与调度

1.1 使用 flowOn 操作符

flowOn 用于改变上游操作的执行线程,而不会改变其下游操作符的线程。
例如,启动网络请求或数据库查询等耗时任务前切换到 Dispatchers.IO

val dataFlow = flow {emit(fetchDataFromNetwork()) // 在 IO 线程执行
}.map { data ->processData(data) // 在 IO 线程执行
}.flowOn(Dispatchers.IO) // 上游操作切换到 IO 线程
.onEach{// 运行在 Default 线程generateImage(it)
}.flowOn(Dispatchers.Default) // 上游操作切换到 Default 线程
.filter { result ->// 恢复默认上下文(调用者线程)result.isValid 
}
.collect { // 在调用者线程,比如MainupdateUI(it) 
}

下游collect 的线程默认由调用它的协程上下文决定。


1.2 指定协程上下文收集数据

collect 所在的协程决定了下游操作的线程。例如在 ViewModel 中:

viewModelScope.launch {dataFlow.collect { processedData ->updateUI(processedData) // 在主线程执行(viewModelScope默认使用 Dispatchers.Main)}
}

若要在收集时切换线程(如处理耗时操作):

viewModelScope.launch {dataFlow.collect { data ->withContext(Dispatchers.Default) { // 切换到计算线程处理复杂操作processHeavyData(data)}updateUI() // 回到主线程更新UI}
}

通过合理结合 flowOn 和协程调度器,可以灵活控制 Flow 在不同线程间的切换,确保流畅的异步数据流处理。

2. 状态管理 : StateFlow和SharedFlow

2.1 StateFlow

StateFlow 适用于状态管理 (如 UI 状态),需要初始值,只保留最新值。

特性 :

  • StateFlow 是一个状态持有者 ,始终维护一个当前值(value)。
  • 必须在构造时提供初始值,确保任何时候都有状态可读。
  • 新订阅者会立即收到当前值 。
  • 通过 .value = … 更新状态,是线程安全的操作。
// 在ViewModel中声明
private val _uiState = MutableStateFlow<LoginState>(LoginState.Idle)
val uiState: StateFlow<LoginState> = _uiState.asStateFlow()// 更新状态
fun login(username: String, password: String) {viewModelScope.launch {_uiState.value = LoginState.Loadingtry {val user = repository.login(username, password)_uiState.value = LoginState.Success(user)} catch (e: Exception) {_uiState.value = LoginState.Error(e.message)}}
}// 通过stateIn转换冷流为StateFlow
val userFlow: StateFlow<User> = repository.getUserUpdates().stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000), // 5秒无订阅停止上游initialValue = User.EMPTY)

2.2 SharedFlow

SharedFlow 适用于事件广播的场景(如通知、导航请求),可配置重播策略和缓冲区。

特性 :

  • SharedFlow 是一个事件或数据发射器 ,不强制维护状态。
  • 可以在任意时刻开始发射值,没有默认值。
  • 通过 replay = N 控制新订阅者接收最近 N 个历史值。
  • 不能像 StateFlow 一样通过 .value 获取当前值。
// 配置参数说明
val eventFlow = MutableSharedFlow<Event>(replay = 1,           // 新订阅者收到的历史事件数量extraBufferCapacity = 10 // 溢出策略前的额外缓冲区
)// 发射一次性事件
fun triggerEvent(event: Event) {viewModelScope.launch {eventFlow.emit(event) // 挂起函数,缓冲区满时会挂起}
}// 转换为SharedFlow的冷流示例
val networkErrorFlow = repository.getNetworkErrorObservable().shareIn(scope = viewModelScope,started = SharingStarted.Lazily, // 首个订阅者出现后启动replay = 0)
2.2.1 replay 参数

作用

  • 控制新订阅者(collector)能接收的“历史值”数量 。
  • 新加入的收集器会立即收到最近 replay 个已发射的值(即回放机制)。

默认值

  • 默认值为 0,表示不回放任何历史值 。新订阅者只能从订阅时刻开始接收后续发射的值。

典型用途

  • 需要新订阅者获取最近状态更新的场景(如网络请求结果回放)。
val sharedFlow = MutableSharedFlow<Int>(replay = 2 // 新订阅者会收到最近两次发射的值
)viewModelScope.launch {sharedFlow.emit(1)sharedFlow.emit(2)sharedFlow.emit(3)
}viewModelScope.launch {sharedFlow.collect { value ->println("新订阅者收到: $value")}
}
// 输出:
// 新订阅者收到: 2
// 新订阅者收到: 3
2.2.2 onBufferOverflow 参数

作用

  • 定义当缓冲区满时,如何处理新发射的值 (即背压策略)。
  • 当发射速度大于消费速度时,缓冲区可能被填满,此时需要决定如何处理新的值。

默认值

  • 默认为 BufferOverflow.SUSPEND,即挂起发射操作 ,直到有空间可用。

可选值

行为适用场景
SUSPEND挂起发射操作,等待缓冲区释放空间默认策略,适合低频事件
DROP_OLDEST丢弃最旧的值,保留最新的值高速事件流(如传感器数据)
DROP_LATEST丢弃新发射的值,保留旧值不允许丢失重要事件
2.2.3 extraBufferCapacity 参数

作用

  • 控制缓冲区额外容量 ,用于存储尚未被消费的值。
  • 实际缓冲区总容量 = replay + extraBufferCapacity。
  • 如果缓冲区不足,会触发 onBufferOverflow 策略。

默认值

  • 默认值为 0,即无额外缓冲区 。仅靠 replay 提供的容量。

典型用途

  • 增加缓冲区容量以应对突发流量(如高并发异步任务)。
  • 避免因短暂的消费延迟导致发射操作阻塞或值丢失。
val sharedFlow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 3, // 总缓冲区容量 = 1(replay) + 3(extra) = 4onBufferOverflow = BufferOverflow.DROP_OLDEST
)viewModelScope.launch {repeat(10) {sharedFlow.emit(it)}
}viewModelScope.launch {sharedFlow.collect { value ->delay(100L) // 消费速度较慢println("收到: $value")}
}
// 收集器将依次收到大部分值,但可能在某些情况下丢弃最早的值

3. 生命周期管理

3.1 repeatOnLifecycle 的机制与必要性

在 Android 开发中,Flow 的收集需要严格遵循生命周期规则,以避免 后台资源浪费 和 界面更新异常。传统方式(如 launchWhenStarted)的缺陷在于:

// ❌ 危险写法:流在后台仍保持活跃
lifecycleScope.launchWhenStarted {viewModel.dataFlow.collect { data ->updateUI(data) // 当应用进入后台时可能触发崩溃}
}

repeatOnLifecycle 的解决方案:

// ✅ 安全写法:自动启停收集
lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {viewModel.dataFlow.collect { data ->updateUI(data) // 仅在界面可见时执行}}
}

执行过程分解:

  1. 当生命周期进入 STARTED 状态时,启动流收集
  2. 当生命周期进入 STOP 状态时,取消收集协程

LiveData的生命周期是onStart的时候立即推送最新数据,onStop的时候自动暂停数据更新。
协程默认是从onCreate到onDestory,但是通过repeatOnLifecycle(Lifecycle.State.STARTED)可以达到和LiveData一样的生命周期特性。

3.1.1 如果不使用repeatOnLifecycle,会遇到的问题

当 UI 组件进入非活跃状态(如 onStop)时,协程​​不会自动暂停​​,而是继续运行,导致以下问题:

  • ​​不必要的资源消耗​​
    即使界面不可见,数据流仍会持续处理(如网络请求、数据库查询等),浪费 CPU、内存和电量。
  • ​​防止空指针异常​​:
    当组件处于非活跃状态时,其内部的一些视图或者数据可能已被销毁。若协程在此时仍在运行并尝试访问这些资源,就会引发空指针异常。repeatOnLifecycle 能确保协程在组件活跃时运行,防止此类异常的发生。
3.1.2 同时手机多个流

当需要同时收集多个流时,可通过结构化并发提高效率。
此时使用同一个repeatOnLifecycle,即可同时管理这些流的生命周期。

lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {// 并行收集多个流launch { flowA.collect { updateA(it) } }launch { flowB.collect { updateB(it) } }launch { flowC.collect { updateC(it) } }}
}

优势:
• 单次生命周期状态检查,避免重复调用 repeatOnLifecycle

• 所有流的收集协程统一管理,同时启动/停止

repeatOnLifecycle(STARTED)是在onStart的时候开始收集,onPause的时候结束收集。而默认的lifecycleScope,是在onCreate的时候就开始收集(如果代码写在onCreate里的话),在onDestory里结束收集。


3.2 与 Jetpack Compose 的集成

在 Compose 中通过 collectAsStateWithLifecycle 实现自动生命周期感知:

@Composable
fun UserProfileScreen(viewModel: UserViewModel) {val uiState by viewModel.uiState.collectAsStateWithLifecycle()when (uiState) {is Loading -> ShowProgress()is Success -> DisplayUser((uiState as Success).data)is Error -> ShowErrorAlert()}
}

底层实现原理,就是调用的repeatOnLifecycle(Lifecycle.State.STARTED)

@Composable
fun <T> Flow<T>.collectAsStateWithLifecycle(initial: T,lifecycle: Lifecycle = LocalLifecycleOwner.current.lifecycle,minActiveState: Lifecycle.State = Lifecycle.State.STARTED
): State<T> {val state = remember(this, lifecycle) { mutableStateOf(initial) }DisposableEffect(this, lifecycle) {val job = lifecycle.coroutineScope.launch {repeatOnLifecycle(minActiveState) {this@collectAsStateWithLifecycle.collect {state.value = it}}}onDispose { job.cancel() }}return state
}

3.3 误区 : 忽略冷流的重复触发

// 每次界面重建都会重新触发网络请求
repeatOnLifecycle(Lifecycle.State.STARTED) {repository.fetchDataFlow().collect { ... }
}

优化策略:

// 将冷流转换为 StateFlow 共享数据
val cachedFlow = repository.fetchDataFlow().stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000),initialValue = null)// 界面层安全收集
repeatOnLifecycle(Lifecycle.State.STARTED) {cachedFlow.collect { ... }
}

通过 repeatOnLifecycle 的合理使用,开发者可以实现:

  1. 资源节约:避免后台不必要的计算和网络请求
  2. 状态安全:防止在无效生命周期更新界面导致的崩溃
  3. 数据一致性:确保界面重建后自动同步最新数据

4. 异常处理机制

4.1 catch操作符的精准控制

catch 是Flow处理上游异常的专用操作符,但需注意其作用域限制:仅能捕获其上游的异常,无法处理下游操作符或收集端的异常。

基础用法:触发异常后,替换为默认值发送

userProfileFlow.map { it.toUserModel() } // 上游操作.catch { e -> // 捕获map及之前的异常emit(UserModel.ANONYMOUS) // 发射备用数据logError(e) }.collect { model -> // 可能抛出异常(无法被catch捕获)require(model.id != null) }

高级模式:条件重抛异常

networkFlow.catch { cause ->if (cause is IOException) {emit(CachedData) // 网络异常使用缓存} else {throw cause // 其他异常继续向上传递}}.collect { ... }

4.2 try/catch块的补充作用

collect 端使用传统的 try/catch 作为最后防线,处理未被捕获的异常。

防御式收集示例:

// ✅ 正确:在collect内部处理
flow { ... }.collect { try {process(it)} catch (e: Exception) {handleLocalError(e)}
}

常见陷阱:

// ❌ 错误:try包裹整个Flow链,无法中断流
try {flow { ... }.map { ... }.collect { ... }
} catch (e: Exception) {// 不会触发!Flow异常在协程中传播,不会抛出到此处
}

4.3 智能重试机制

4.3.1 retry 的作用
  • 在流发射过程中遇到异常时,自动重新启动整个流(即重新执行 flow { … }),最多重试指定次数。
  • 如果重试次数用尽仍然失败,则抛出异常。

使用方式

val flow = flow {emit(1)throw IOException("Network error")
}flow.retry(3) // 最多重试 3 次.collect { value ->Log.d("Flow", "Received: $value")}

特点

  • 无条件重试:只要出现异常(无论什么类型),都会触发重试。
  • 固定重试次数:必须提前指定最大重试次数。
  • 无法自定义重试条件:不支持根据异常类型、时间间隔、尝试次数等动态决策。

适用场景

  • 简单的失败重试逻辑,如网络请求失败后最多重试 N 次。
  • 不需要区分异常类型或进行复杂判断的场景。
4.3.2 retryWhen 的作用
  • 提供一个 lambda 函数,用于根据异常类型和当前尝试次数,决定是否继续重试
  • 可以对不同的异常进行差异化处理,甚至可以引入延迟、指数退避等策略。

使用方式

val flow = flow {emit(1)throw IOException("Network error")
}flow.retryWhen { cause, attempt ->if (attempt < 3 && cause is IOException) {delay(1000L * attempt) // 指数退避true // 返回 true 表示继续重试} else {false // 返回 false 表示终止重试}
}
.collect { value ->Log.d("Flow", "Received: $value")
}

特点

  • 高度可定制:通过 lambda 参数 (cause: Throwable, attempt: Long) -> Boolean 控制重试逻辑。
  • 支持延迟重试:可以在每次重试前加入 delay() 实现退避策略。
  • 支持异常过滤:可以只对某些异常类型(如 IOException)进行重试。
  • 支持动态决策:可以根据尝试次数、异常类型、环境状态等做出重试决策。

4.4 onCompletion : 当Flow完成收集

onCompletion 是一个操作符 ,用于在 Flow 完成收集时执行某些逻辑 ,通过 cause 参数区分状态。
它可以用来执行资源释放、状态清理、界面更新等收尾操作。

异常日志记录示例:

dataSyncFlow.onCompletion { cause ->when {cause == null -> log("同步成功")cause is CancellationException -> log("同步被取消")else -> log("同步失败: ${cause.stackTrace}")}}.catch { ... }.collect { ... }

资源释放示例:

fileDownloadFlow.onCompletion { cause ->// 无论成功或异常,都关闭文件句柄closeFileHandle() }.collect { data -> writeToFile(data) }
http://www.xdnf.cn/news/5182.html

相关文章:

  • 每日脚本学习5.10 - XOR脚本
  • SSH终端登录与网络共享
  • AI与机器人学:从SLAM到导航的未来
  • HTTP/3展望、我应该迁移到HTTP/2吗
  • 【Linux】线程的同步与互斥
  • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
  • 互联网大厂Java面试实录:Spring Boot与微服务架构在电商场景中的应用解析
  • MIT XV6 - 1.4 Lab: Xv6 and Unix utilities - find
  • vllm笔记
  • Linux510 ssh服务 ssh连接
  • 数学证明 | 逻辑的力量
  • 每天五分钟机器学习:拉格朗日对偶函数
  • 2025年渗透测试面试题总结-渗透测试红队面试三(题目+回答)
  • Pandas:数据处理与分析
  • 操作系统实验习题解析 上篇
  • UniRepLknet助力YOLOv8:高效特征提取与目标检测性能优化
  • 什么是静态住宅IP?为什么静态住宅IP能提高注册通过率?
  • 【部署】win10的wsl环境下调试dify的api后端服务
  • PyTorch API 2 - 混合精度、微分、cpu、cuda、可视化
  • torch.nn 下的常用深度学习函数
  • uniapp-商城-48-后台 分类数据添加修改弹窗bug
  • Kubernetes 使用 containerd 实现 GPU 支持及 GPU Operator 部署指南
  • Eclipse 插件开发 6 右键菜单
  • 从 JMS 到 ActiveMQ:API 设计与扩展机制分析(三)
  • 单脉冲前视成像多目标分辨算法——论文阅读
  • stm32之IIC
  • 基于STM32的居家环境监测报警Proteus仿真+程序设计+设计报告+讲解视频
  • 利用多AI协作实现AI编辑器高效开发:创新架构与实践基本构想
  • DeepSeek 实现趣味心理测试应用开发教程
  • JAVA自动装箱拆箱