当前位置: 首页 > news >正文

kotlin知识体系(六) : Flow核心概念与与操作符指南

1. Flow基础概念

在这里插入图片描述

1.1 冷流(Cold Stream)

冷流是Flow的默认形式,其核心特点如下:
按需触发:仅在消费者调用 collect 时开始发射数据,且每次收集都会重新执行流的逻辑(类似“单播”)。
独立性:同一流的多个消费者会各自触发独立的数据生产和发射流程。
适用场景:适用于一次性任务(如网络请求、数据库查询)或需要重复触发的场景。

代码示例

val coldFlow = flow { emit("冷流开始发射数据") delay(1000)emit("数据发射完成")
}// 消费者1收集数据
coldFlow.collect { println("消费者1: $it") }// 消费者2再次收集会重新触发流的执行
coldFlow.collect { println("消费者2: $it") }

1.2 热流(Hot Stream)

热流的生命周期不与消费者绑定,特点如下:
主动发射:数据由生产者主动发射,无论是否有消费者订阅(类似“广播”)。
共享性:多个消费者共享同一流实例,接收到的是同一组数据的最新值或事件。
代表类型StateFlow(用于状态管理)、SharedFlow(用于事件传递)。

适用场景
• 实时状态同步(如UI状态更新)。
• 全局事件通知(如用户登录状态变动)。

代码示例

// 热流示例(StateFlow)
val stateFlow = MutableStateFlow("初始值")// 生产者更新值(不依赖是否有消费者)
stateFlow.value = "新值" // 多个消费者共享同一最新值
stateFlow.collect { println("消费者A: $it") }
stateFlow.collect { println("消费者B: $it") }

1.3 数据流模型(Data Stream Model)

Flow采用生产者-消费者模型,通过异步非阻塞的机制和响应式编程思想,形成完整的处理链。

1.3.1 生产者-消费者分离

生产者(Producer)
负责生成数据流(如通过 emit 发送数据),可以是网络请求、数据库查询或传感器事件等。

fun fetchData(): Flow<String> = flow {val data = api.getData() // 模拟耗时操作emit(data) 
}

消费者(Consumer)
订阅数据并处理(如更新UI、存储结果):

fetchData().collect { data ->updateUI(data) // 处理数据
}
1.3.2 异步非阻塞

Flow通过在协程中运行,实现以下特点:
非阻塞主线程:耗时操作(如IO)通过 flowOn(Dispatchers.IO) 切换到子线程。
挂起与恢复:使用挂起函数(如 delay)可暂停流处理,无需回调嵌套。

1.3.3 响应式编程

实时响应:当数据变化时,自动推送新值(如 StateFlow 驱动UI刷新)。
链式处理:通过操作符(如 mapfilter)组合复杂逻辑,例如:

flow { emit(1..5) }.flatMapConcat { it.asFlow() } // 展开数据.filter { it % 2 == 0 }        // 过滤偶数.map { it * 2 }              // 转换数据.collect { println(it) }     // 输出:4, 8

1.4 关键总结

特性冷流热流
触发方式按需触发(被收集时才发射)主动发射
数据共享消费者独立触发,数据独立多消费者共享数据
适用场景接口请求、数据库查询状态共享(UI)、全局事件通知

2. Flow的构建方式

2.1 flow { ... }

flow 构建器是创建自定义Flow的核心方式,适用于需要手动控制数据发射逻辑的场景。其核心特点是通过 emit() 函数逐个发射数据,并通过挂起函数支持异步操作。

基础示例:模拟分页加载数据

fun loadPagesFlow(pageSize: Int): Flow<List<Item>> = flow {var page = 0while (true) {val items = fetchPage(page, pageSize) // 挂起函数获取分页数据emit(items)                           // 发射当前页数据if (items.size < pageSize) break      // 数据不足时终止流page++}
}// 使用方式
viewModelScope.launch {loadPagesFlow(20).collect { items ->updateRecyclerView(items)}
}

关键特性
冷流特性:每次收集都会重新执行 flow 块内的逻辑
线程限制:默认在调用者的协程上下文中运行,不可直接切换线程(需配合 flowOn
挂起支持:内部可使用 delay()withContext() 等挂起函数

适用场景
• 需要逐条生成数据的异步任务(如实时聊天消息接收)
• 复杂的数据生成逻辑(如分页加载、传感器数据融合)


2.2 flowOf()

快速创建包含固定数据集的冷流,适用于已知静态数据的场景。其行为类似集合操作,但支持响应式处理链。

示例:创建预定义颜色值的流

val colorFlow = flowOf("Red", "Green", "Blue")// 等价于:
val colorFlow = flow {emit("Red")emit("Green")emit("Blue")
}

特殊用法:空数据流

val emptyFlow = flowOf<String>() // 创建不发射任何值的流

性能特点
• 数据缓存在内存中,每次收集都会重新发射全部数据
• 适用于小规模固定数据集(≤1000条)


2.3 asFlow()

将现有集合或序列转换为冷流,实现集合数据与Flow操作符的无缝衔接。

集合转换示例

val listFlow = listOf(1, 2, 3).asFlow()// 等价于:
val listFlow = flow {emit(1)emit(2)emit(3)
}

序列适配示例

(1..10_000).asSequence()       // 创建懒序列.filter { it % 2 == 0 }    // 先进行过滤.asFlow()                  // 转换为Flow.map { it * 2 }            // 继续Flow操作.collect { print(it) }

对比 flowOf()

flowOf(1, 2, 3)listOf(1,2,3).asFlow()
数据存储方式独立参数存储依赖原始集合对象
大数据性能适合小数据集适合超大规模集合(惰性处理)
操作符兼容性需转换为Flow操作符链可衔接集合/序列预处理

3. Flow操作符详解


3.1 中间操作符

3.1.1 转换操作符

通过转换操作符,开发者可以对流中的每个元素进行实时修改或扩展,支持同步和异步处理

  1. map
    功能:将数据转换为其他类型
    场景:数据格式转换(如DTO → UI Model)
    注意事项:内部可调用挂起函数(如网络请求)

    userFlow.map { user -> UserProfile(user.id, user.name) 
    }
    
  2. filter
    功能:筛选符合条件的数据
    场景:排除无效数据(如空值、非法范围值)
    技巧:可 filterIsInstance<T>() ,直接实现 filter + map 操作符的效果

    intFlow.filter { num -> num % 2 == 0 }
    
  3. transform
    功能:复杂转换(可多次emit
    场景:一对多转换(如展开嵌套集合)
    典型用法:动态生成中间状态(如加载中→成功/失败)

    queryFlow.transform { query ->emit(Loading)emit(SearchResult(query))
    }
    

3.1.2 组合操作符

组合多个流的数据,实现复杂的数据混合逻辑。

  1. zip
    功能:合并两个流,按顺序配对
    触发条件:双方流在同一位置都有新元素时触发
    终止条件:任一流结束则终止
    场景:合并关联性强的数据(如用户ID + 用户详情)
    特点:结果流长度等于较短流的长度

    val userIds = flowOf(1, 2, 3)
    val userNames = flowOf("Alice", "Bob")
    userIds.zip(userNames) { id, name -> "$id: $name" 
    } // 输出:1: Alice, 2: Bob
    
  2. combine
    功能:动态合并流(任一更新触发)
    触发条件:任一流发射新值时触发
    终止条件:双方流都结束时终止
    场景:实时仪表盘(温度+湿度)、输入框联动
    特点:结果流长度 = 流A长度 + 流B长度 - 1

    val tempFlow = flowOf(25, 26)
    val humidityFlow = flowOf(60, 65)
    tempFlow.combine(humidityFlow) { t, h ->"$t℃/$h%"
    } // 输出:25/60 → 26/60 → 26/65
    

3.1.3 背压处理操作符

解决生产者-消费者速度不匹配的问题,提供多种流量控制策略。

  1. buffer
    功能:添加缓冲区,允许生产者和消费者异步运行
    参数capacity: Int(默认64)
    场景:生产者快于消费者时避免阻塞(如日志批量上传)
    注意:缓冲区过大会增加内存压力

    fastFlow.buffer(100).collect { ... }
    
  2. conflate
    功能:丢弃中间值,只保留最新值
    场景:UI刷新(如实时位置更新时,跳过中间帧)
    典型问题:可能丢失关键历史数据

    locationFlow.conflate().collect { updateMap(it) 
    }
    
  3. collectLatest
    功能:新元素到达时取消当前处理并重新开始
    场景:搜索建议(用户持续输入时取消未完成请求)
    要求:处理块必须是可取消的挂起函数

    inputFlow.collectLatest { searchApi(it) 
    }
    

3.1.4 生命周期操作符

监控流的执行过程,实现资源管理和状态跟踪。

  1. onStart
    触发时机:流开始收集时
    场景:显示加载动画、初始化资源
    注意:即使流没有元素也会触发

    flow.onStart { showLoading() 
    }
    
  2. onCompletion
    触发时机:流正常结束或取消时
    参数cause: Throwable?(null表示正常结束)
    场景:隐藏加载动画、释放资源

    flow.onCompletion { hideLoading() 
    }
    
  3. catch
    作用域:捕获上游的所有异常
    限制:无法捕获下游操作符和 collect 中的异常
    恢复策略:通过 emit() 发射备用值

    flow.catch { e ->emit(ErrorData(e))
    }
    

3.2 末端操作符

触发流的执行并处理最终结果。

  1. collect
    功能:启动流的收集,处理每个元素
    返回值Unit
    场景:自定义处理逻辑(如更新UI)

    flow.collect { data -> println(data) 
    }
    
  2. toList/toSet
    功能:将流转换为集合
    返回值List<T>/Set<T>
    场景:批量处理数据(如缓存全部结果)

    val list = flow.toList()
    
  3. launchIn
    功能:在指定协程作用域中启动流收集
    返回值Job
    场景:简化生命周期管理(如Android中配合lifecycleScope)

    flow.onEach { ... }.launchIn(viewModelScope)
    
  4. first()/single()
    first:取第一个元素后取消流
    single:确保流只发射一个元素(否则抛异常)
    场景:获取单次结果(如权限请求)

    val result = flow.first()
    

http://www.xdnf.cn/news/43687.html

相关文章:

  • opencv图像库编程
  • 软件开发过程中技术债的控制策略
  • iPhone 13P 换超容电池,一年实记的“电池循环次数-容量“柱状图
  • next.js 如何实现动态路由?
  • 【消息队列RocketMQ】一、RocketMQ入门核心概念与架构解析
  • Git拉分支技巧:从零开始创建并推送分支
  • 每天学一个 Linux 命令(28):ln
  • 产品经理学习过程
  • 深度剖析即梦 AI:开启创意无限的智能创作时代
  • springboot--web开发响应参数注解
  • Web前端:百度首页克隆 - 前端开发练习
  • 网络设备基础运维全攻略:华为/思科核心操作与巡检指南
  • 2.2 BackgroundWorker的使用介绍
  • Python实现对大批量Word文档进行批量自动化排版(15)
  • 数字系统与编码
  • 2020 年 7 月大学英语四级考试真题(组合卷)——解析版
  • 并发设计模式实战系列(4):线程池
  • RabbitMQ和Seata冲突吗?Seata与Spring中的事务管理冲突吗
  • Chromium 134 编译指南 Ubuntu篇:环境搭建与源码获取(一)
  • PyTorch基础笔记
  • python爬虫复习
  • 杨氏矩阵、字符串旋转、交换奇偶位,offsetof宏
  • Java发生OOM是否必然导致JVM退出
  • 30天开发操作系统 第26天 -- 为窗口移动提速
  • 如何将自己封装的组件发布到npm上:详细教程
  • 组装一台intel n95纯Linux Server服务器
  • UniFlash以串口方式烧录MSPM0G3507(无需仿真器)
  • 方案精读:数字政府智慧政务服务一网通办服务解决方案【附全文阅读】
  • 精通 Spring Cache + Redis:避坑指南与最佳实践
  • 鸿蒙ArkUI之布局实战,线性布局(Column,Row)、弹性布局(Flex)、层叠布局(Stack),详细用法