Kotlin知识体系(七) : Flow线程控制、状态管理及异常处理指南
1. 线程切换与调度
1.1 使用 flowOn
操作符
flowOn
用于改变上游操作的执行线程,而不会改变其下游操作符的线程。
例如,启动网络请求或数据库查询等耗时任务前切换到 Dispatchers.IO
:
val dataFlow = flow {emit(fetchDataFromNetwork()) // 在 IO 线程执行
}.map { data ->processData(data) // 在 IO 线程执行
}.flowOn(Dispatchers.IO) // 上游操作切换到 IO 线程
.onEach{// 运行在 Default 线程generateImage(it)
}.flowOn(Dispatchers.Default) // 上游操作切换到 Default 线程
.filter { result ->// 恢复默认上下文(调用者线程)result.isValid
}
.collect { // 在调用者线程,比如MainupdateUI(it)
}
• 下游的 collect
的线程默认由调用它的协程上下文决定。
1.2 指定协程上下文收集数据
collect
所在的协程决定了下游操作的线程。例如在 ViewModel
中:
viewModelScope.launch {dataFlow.collect { processedData ->updateUI(processedData) // 在主线程执行(viewModelScope默认使用 Dispatchers.Main)}
}
若要在收集时切换线程(如处理耗时操作):
viewModelScope.launch {dataFlow.collect { data ->withContext(Dispatchers.Default) { // 切换到计算线程处理复杂操作processHeavyData(data)}updateUI() // 回到主线程更新UI}
}
通过合理结合 flowOn
和协程调度器,可以灵活控制 Flow
在不同线程间的切换,确保流畅的异步数据流处理。
2. 状态管理 : StateFlow和SharedFlow
2.1 StateFlow
StateFlow 适用于状态管理 (如 UI 状态),需要初始值,只保留最新值。
特性 :
- StateFlow 是一个状态持有者 ,始终维护一个当前值(value)。
- 必须在构造时提供初始值,确保任何时候都有状态可读。
- 新订阅者会立即收到当前值 。
- 通过 .value = … 更新状态,是线程安全的操作。
// 在ViewModel中声明
private val _uiState = MutableStateFlow<LoginState>(LoginState.Idle)
val uiState: StateFlow<LoginState> = _uiState.asStateFlow()// 更新状态
fun login(username: String, password: String) {viewModelScope.launch {_uiState.value = LoginState.Loadingtry {val user = repository.login(username, password)_uiState.value = LoginState.Success(user)} catch (e: Exception) {_uiState.value = LoginState.Error(e.message)}}
}// 通过stateIn转换冷流为StateFlow
val userFlow: StateFlow<User> = repository.getUserUpdates().stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000), // 5秒无订阅停止上游initialValue = User.EMPTY)
2.2 SharedFlow
SharedFlow 适用于事件广播的场景(如通知、导航请求),可配置重播策略和缓冲区。
特性 :
- SharedFlow 是一个事件或数据发射器 ,不强制维护状态。
- 可以在任意时刻开始发射值,没有默认值。
- 通过 replay = N 控制新订阅者接收最近 N 个历史值。
- 不能像 StateFlow 一样通过 .value 获取当前值。
// 配置参数说明
val eventFlow = MutableSharedFlow<Event>(replay = 1, // 新订阅者收到的历史事件数量extraBufferCapacity = 10 // 溢出策略前的额外缓冲区
)// 发射一次性事件
fun triggerEvent(event: Event) {viewModelScope.launch {eventFlow.emit(event) // 挂起函数,缓冲区满时会挂起}
}// 转换为SharedFlow的冷流示例
val networkErrorFlow = repository.getNetworkErrorObservable().shareIn(scope = viewModelScope,started = SharingStarted.Lazily, // 首个订阅者出现后启动replay = 0)
2.2.1 replay 参数
作用
- 控制新订阅者(collector)能接收的“历史值”数量 。
- 新加入的收集器会立即收到最近 replay 个已发射的值(即回放机制)。
默认值
- 默认值为 0,表示不回放任何历史值 。新订阅者只能从订阅时刻开始接收后续发射的值。
典型用途
- 需要新订阅者获取最近状态更新的场景(如网络请求结果回放)。
val sharedFlow = MutableSharedFlow<Int>(replay = 2 // 新订阅者会收到最近两次发射的值
)viewModelScope.launch {sharedFlow.emit(1)sharedFlow.emit(2)sharedFlow.emit(3)
}viewModelScope.launch {sharedFlow.collect { value ->println("新订阅者收到: $value")}
}
// 输出:
// 新订阅者收到: 2
// 新订阅者收到: 3
2.2.2 onBufferOverflow 参数
作用
- 定义当缓冲区满时,如何处理新发射的值 (即背压策略)。
- 当发射速度大于消费速度时,缓冲区可能被填满,此时需要决定如何处理新的值。
默认值
- 默认为 BufferOverflow.SUSPEND,即挂起发射操作 ,直到有空间可用。
可选值
值 | 行为 | 适用场景 |
---|---|---|
SUSPEND | 挂起发射操作,等待缓冲区释放空间 | 默认策略,适合低频事件 |
DROP_OLDEST | 丢弃最旧的值,保留最新的值 | 高速事件流(如传感器数据) |
DROP_LATEST | 丢弃新发射的值,保留旧值 | 不允许丢失重要事件 |
2.2.3 extraBufferCapacity 参数
作用
- 控制缓冲区额外容量 ,用于存储尚未被消费的值。
- 实际缓冲区总容量 = replay + extraBufferCapacity。
- 如果缓冲区不足,会触发 onBufferOverflow 策略。
默认值
- 默认值为 0,即无额外缓冲区 。仅靠 replay 提供的容量。
典型用途
- 增加缓冲区容量以应对突发流量(如高并发异步任务)。
- 避免因短暂的消费延迟导致发射操作阻塞或值丢失。
val sharedFlow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 3, // 总缓冲区容量 = 1(replay) + 3(extra) = 4onBufferOverflow = BufferOverflow.DROP_OLDEST
)viewModelScope.launch {repeat(10) {sharedFlow.emit(it)}
}viewModelScope.launch {sharedFlow.collect { value ->delay(100L) // 消费速度较慢println("收到: $value")}
}
// 收集器将依次收到大部分值,但可能在某些情况下丢弃最早的值
3. 生命周期管理
3.1 repeatOnLifecycle
的机制与必要性
在 Android 开发中,Flow 的收集需要严格遵循生命周期规则,以避免 后台资源浪费 和 界面更新异常。传统方式(如 launchWhenStarted
)的缺陷在于:
// ❌ 危险写法:流在后台仍保持活跃
lifecycleScope.launchWhenStarted {viewModel.dataFlow.collect { data ->updateUI(data) // 当应用进入后台时可能触发崩溃}
}
repeatOnLifecycle
的解决方案:
// ✅ 安全写法:自动启停收集
lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {viewModel.dataFlow.collect { data ->updateUI(data) // 仅在界面可见时执行}}
}
执行过程分解:
- 当生命周期进入
STARTED
状态时,启动流收集 - 当生命周期进入
STOP
状态时,取消收集协程
LiveData的生命周期是onStart的时候立即推送最新数据,onStop的时候自动暂停数据更新。
协程默认是从onCreate到onDestory,但是通过repeatOnLifecycle(Lifecycle.State.STARTED)可以达到和LiveData一样的生命周期特性。
3.1.1 如果不使用repeatOnLifecycle,会遇到的问题
当 UI 组件进入非活跃状态(如 onStop)时,协程不会自动暂停,而是继续运行,导致以下问题:
- 不必要的资源消耗:
即使界面不可见,数据流仍会持续处理(如网络请求、数据库查询等),浪费 CPU、内存和电量。 - 防止空指针异常:
当组件处于非活跃状态时,其内部的一些视图或者数据可能已被销毁。若协程在此时仍在运行并尝试访问这些资源,就会引发空指针异常。repeatOnLifecycle 能确保协程在组件活跃时运行,防止此类异常的发生。
3.1.2 同时手机多个流
当需要同时收集多个流时,可通过结构化并发提高效率。
此时使用同一个repeatOnLifecycle,即可同时管理这些流的生命周期。
lifecycleScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {// 并行收集多个流launch { flowA.collect { updateA(it) } }launch { flowB.collect { updateB(it) } }launch { flowC.collect { updateC(it) } }}
}
优势:
• 单次生命周期状态检查,避免重复调用 repeatOnLifecycle
• 所有流的收集协程统一管理,同时启动/停止
repeatOnLifecycle(STARTED)是在onStart的时候开始收集,onPause的时候结束收集。而默认的lifecycleScope,是在onCreate的时候就开始收集(如果代码写在onCreate里的话),在onDestory里结束收集。
3.2 与 Jetpack Compose 的集成
在 Compose 中通过 collectAsStateWithLifecycle
实现自动生命周期感知:
@Composable
fun UserProfileScreen(viewModel: UserViewModel) {val uiState by viewModel.uiState.collectAsStateWithLifecycle()when (uiState) {is Loading -> ShowProgress()is Success -> DisplayUser((uiState as Success).data)is Error -> ShowErrorAlert()}
}
底层实现原理,就是调用的repeatOnLifecycle(Lifecycle.State.STARTED)
:
@Composable
fun <T> Flow<T>.collectAsStateWithLifecycle(initial: T,lifecycle: Lifecycle = LocalLifecycleOwner.current.lifecycle,minActiveState: Lifecycle.State = Lifecycle.State.STARTED
): State<T> {val state = remember(this, lifecycle) { mutableStateOf(initial) }DisposableEffect(this, lifecycle) {val job = lifecycle.coroutineScope.launch {repeatOnLifecycle(minActiveState) {this@collectAsStateWithLifecycle.collect {state.value = it}}}onDispose { job.cancel() }}return state
}
3.3 误区 : 忽略冷流的重复触发
// 每次界面重建都会重新触发网络请求
repeatOnLifecycle(Lifecycle.State.STARTED) {repository.fetchDataFlow().collect { ... }
}
优化策略:
// 将冷流转换为 StateFlow 共享数据
val cachedFlow = repository.fetchDataFlow().stateIn(scope = viewModelScope,started = SharingStarted.WhileSubscribed(5000),initialValue = null)// 界面层安全收集
repeatOnLifecycle(Lifecycle.State.STARTED) {cachedFlow.collect { ... }
}
通过 repeatOnLifecycle
的合理使用,开发者可以实现:
- 资源节约:避免后台不必要的计算和网络请求
- 状态安全:防止在无效生命周期更新界面导致的崩溃
- 数据一致性:确保界面重建后自动同步最新数据
4. 异常处理机制
4.1 catch
操作符的精准控制
catch
是Flow处理上游异常的专用操作符,但需注意其作用域限制:仅能捕获其上游的异常,无法处理下游操作符或收集端的异常。
基础用法:触发异常后,替换为默认值发送
userProfileFlow.map { it.toUserModel() } // 上游操作.catch { e -> // 捕获map及之前的异常emit(UserModel.ANONYMOUS) // 发射备用数据logError(e) }.collect { model -> // 可能抛出异常(无法被catch捕获)require(model.id != null) }
高级模式:条件重抛异常
networkFlow.catch { cause ->if (cause is IOException) {emit(CachedData) // 网络异常使用缓存} else {throw cause // 其他异常继续向上传递}}.collect { ... }
4.2 try/catch
块的补充作用
在 collect
端使用传统的 try/catch
作为最后防线,处理未被捕获的异常。
防御式收集示例:
// ✅ 正确:在collect内部处理
flow { ... }.collect { try {process(it)} catch (e: Exception) {handleLocalError(e)}
}
常见陷阱:
// ❌ 错误:try包裹整个Flow链,无法中断流
try {flow { ... }.map { ... }.collect { ... }
} catch (e: Exception) {// 不会触发!Flow异常在协程中传播,不会抛出到此处
}
4.3 智能重试机制
4.3.1 retry
的作用
- 在流发射过程中遇到异常时,自动重新启动整个流(即重新执行 flow { … }),最多重试指定次数。
- 如果重试次数用尽仍然失败,则抛出异常。
使用方式
val flow = flow {emit(1)throw IOException("Network error")
}flow.retry(3) // 最多重试 3 次.collect { value ->Log.d("Flow", "Received: $value")}
特点
- 无条件重试:只要出现异常(无论什么类型),都会触发重试。
- 固定重试次数:必须提前指定最大重试次数。
- 无法自定义重试条件:不支持根据异常类型、时间间隔、尝试次数等动态决策。
适用场景
- 简单的失败重试逻辑,如网络请求失败后最多重试 N 次。
- 不需要区分异常类型或进行复杂判断的场景。
4.3.2 retryWhen
的作用
- 提供一个 lambda 函数,用于根据异常类型和当前尝试次数,决定是否继续重试。
- 可以对不同的异常进行差异化处理,甚至可以引入延迟、指数退避等策略。
使用方式
val flow = flow {emit(1)throw IOException("Network error")
}flow.retryWhen { cause, attempt ->if (attempt < 3 && cause is IOException) {delay(1000L * attempt) // 指数退避true // 返回 true 表示继续重试} else {false // 返回 false 表示终止重试}
}
.collect { value ->Log.d("Flow", "Received: $value")
}
特点
- 高度可定制:通过 lambda 参数
(cause: Throwable, attempt: Long) -> Boolean
控制重试逻辑。 - 支持延迟重试:可以在每次重试前加入
delay()
实现退避策略。 - 支持异常过滤:可以只对某些异常类型(如
IOException
)进行重试。 - 支持动态决策:可以根据尝试次数、异常类型、环境状态等做出重试决策。
4.4 onCompletion
: 当Flow完成收集
onCompletion 是一个操作符 ,用于在 Flow 完成收集时执行某些逻辑 ,通过 cause
参数区分状态。
它可以用来执行资源释放、状态清理、界面更新等收尾操作。
异常日志记录示例:
dataSyncFlow.onCompletion { cause ->when {cause == null -> log("同步成功")cause is CancellationException -> log("同步被取消")else -> log("同步失败: ${cause.stackTrace}")}}.catch { ... }.collect { ... }
资源释放示例:
fileDownloadFlow.onCompletion { cause ->// 无论成功或异常,都关闭文件句柄closeFileHandle() }.collect { data -> writeToFile(data) }