Kotlin Flow 进阶:背压策略与共享流
用 Kotlin Flow 替换 LiveData 之后,很多人止步于 flow { } + collect { } 的基本用法。但当生产者发射速度超过消费者处理速度时,问题就来了——背压(Backpressure)。当你想让多个订阅者共享同一个数据源时,冷流的热化——共享流(SharedFlow / StateFlow)就是必修课。
本文是 Kotlin Flow 进阶篇,聚焦两个实战核心:背压策略与共享流机制。
一、冷流与热流:本质区别
在讨论背压和共享流之前,先搞清楚冷流和热流的核心差异:
| 特性 | 冷流(Cold Flow) | 热流(Hot Flow) |
|---|---|---|
| 数据生产时机 | 被 collect 时才生产 | 独立于订阅者持续生产 |
| 多个订阅者 | 各自独立生产数据 | 共享同一数据源 |
| 类比 | Netflix(点播) | 电视直播(广播) |
| 典型代表 | flow { } |
SharedFlow、StateFlow |
| 背压问题 | 有(生产者可能比消费者快) | 无(订阅者跟不上就丢弃或缓冲) |
// 冷流:每次 collect 都独立执行
val coldFlow = flow {
println("开始生产") // 每个 collector 都会触发
for (i in 1..3) {
emit(i)
}
}
// 两个 collector 独立执行
coldFlow.collect { println("A: $it") } // 输出:开始生产, A:1, A:2, A:3
coldFlow.collect { println("B: $it") } // 输出:开始生产, B:1, B:2, B:3
二、背压问题:生产者太快怎么办
Flow 是顺序执行的——生产者 emit 一个值后,必须等消费者 collect 完成才能 emit 下一个。如果生产者每秒 emit 100 个值,消费者每秒只能处理 10 个,积压就出现了。
// 模拟背压场景
fun fastProducer(): Flow<Int> = flow {
for (i in 1..100) {
emit(i) // 每 10ms 发射一个
delay(10)
}
}
// 默认行为:collect 多慢,整体就多慢
// collect 耗时 100ms → 整体耗时 100 * 100ms = 10 秒
fastProducer().collect { value ->
delay(100) // 模拟慢消费者
println("处理: $value")
}
默认的背压策略是 挂起等待——生产者等消费者处理完再发下一个。这是最安全的方式,但不是最高效的。
三、三种背压策略
1. buffer:并发缓冲
buffer() 让生产者和消费者在不同协程中运行,中间加一个缓冲区。生产者不用等消费者,先往缓冲区里塞。
fastProducer()
.buffer(capacity = 50) // 缓冲区容量 50
.collect { value ->
delay(100)
println("处理: $value")
}
执行流程:
无 buffer: produce → wait → consume → produce → wait → consume ...
有 buffer: produce → produce → produce ... (并发)
consume → consume → consume ... (并发)
时间对比:
无 buffer: ~10 秒
buffer(50): ~6 秒(生产者先跑满,消费者从缓冲区取)
buffer 容量选择
| 容量 | 行为 | 适用场景 |
|---|---|---|
0(默认) |
无缓冲,生产者等消费者 | 需要严格顺序 |
1 |
缓冲 1 个值 | 最小并发 |
64 / Channel.BUFFERED |
缓冲 64 个值(默认常量) | 一般场景 |
Channel.UNLIMITED |
无限缓冲 | ⚠️ 谨慎使用,可能 OOM |
Channel.CONFLATED |
只保留最新值 | 同 conflate() |
2. conflate:只保留最新值
conflate() 的策略是:消费者忙的时候,生产者继续发,但中间的值会被丢弃,只保留最新的。
fastProducer()
.conflate() // 跳过中间值
.collect { value ->
delay(100)
println("处理: $value")
}
// 输出示例:
// 处理: 1 (消费者处理 1 期间,2-10 被跳过)
// 处理: 10 (消费者处理 10 期间,11-20 被跳过)
// 处理: 20
// ...
// 总共只处理约 10 个值,而非 100 个
3. collectLatest:取消旧处理
collectLatest 和 conflate 类似,但它不只是跳过旧值——它会取消正在进行的旧 collect 块,让新的 collect 块立刻执行。
fastProducer()
.collectLatest { value -> // 新值到来时取消旧的处理
println("开始处理: $value")
delay(100) // 模拟耗时操作
println("完成处理: $value") // 很可能看不到这行
}
// 输出示例:
// 开始处理: 1
// 开始处理: 2 (1 被取消,"完成处理: 1" 不会输出)
// 开始处理: 3 (2 被取消)
// ...
// 开始处理: 100
// 完成处理: 100 (最后一个值没有被打断,能完成)
conflate vs collectLatest 对比
| 特性 | conflate | collectLatest |
|---|---|---|
| 处理旧值 | 跳过不处理 | 取消正在进行的处理 |
| 新值到达时 | 消费者继续处理当前值 | 中断当前处理,开始新处理 |
| 适用场景 | 处理必须完成,旧值可以跳过 | 处理可以中断,只关心最新结果 |
| 典型例子 | 日志打印、数据持久化 | 搜索建议、图片加载 |
三种策略选择决策树
生产者比消费者快?
├── 否 → 不需要处理,默认挂起等待即可
└── 是 → 消费者能否跳过中间值?
├── 能 → 中间值的处理必须完成吗?
│ ├── 是 → conflate()
│ └── 否 → collectLatest()
└── 不能 → buffer() (缓冲突发,但注意容量)
四、SharedFlow:冷流变热流
冷流最大的局限是:每个 collector 独立生产数据。如果你有 3 个 UI 组件同时监听同一个数据源,冷流会生产 3 份数据——浪费资源且无法共享状态。
SharedFlow 是热流,多个订阅者共享同一数据源。
基本用法
// 方式1:直接创建
val sharedFlow = MutableSharedFlow<String>()
// 发射数据
scope.launch {
sharedFlow.emit("Hello")
sharedFlow.emit("World")
}
// 多个订阅者共享同一数据
sharedFlow.collect { println("A: $it") } // A: Hello, A: World
sharedFlow.collect { println("B: $it") } // B: Hello, B: World
shareIn:将冷流转换为 SharedFlow
// 冷流 → 热流
val locationFlow = flow {
while (true) {
emit(getLocation()) // 模拟位置更新
delay(1000)
}
}
// 用 shareIn 转为热流,多个订阅者共享
val sharedLocation = locationFlow.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000), // 停止策略
replay = 1 // 新订阅者收到最近 1 个值
)
关键参数解析
1. started:何时开始共享
| 策略 | 行为 | 适用场景 |
|---|---|---|
Eagerly |
立即开始,永不停止 | 全局常驻数据(时钟、配置) |
Lazily |
第一个订阅者到来时开始,永不停止 | 懒初始化场景 |
WhileSubscribed(stopTimeout) |
有订阅者时运行,无订阅者时延迟停止 | ⚠️ Android 最常用 |
SharingStarted.WhileSubscribed(5000)。当 UI 退到后台时,5 秒后自动停止上游数据生产,回到前台时自动恢复。这比 Eagerly 省电省流量。stopTimeout 设为多少?5000ms 是官方推荐的默认值——既避免配置切换时频繁重启上游,又不至于浪费太多资源。
2. replay:重放缓存
// replay = 0:新订阅者只收到订阅后发射的值
val noReplay = flow.shareIn(scope, SharingStarted.Eagerly, replay = 0)
// replay = 1:新订阅者立即收到最近 1 个值(类似 LiveData)
val withReplay = flow.shareIn(scope, SharingStarted.Eagerly, replay = 1)
// replay = 3:新订阅者收到最近 3 个值
val moreReplay = flow.shareIn(scope, SharingStarted.Eagerly, replay = 3)
replay = 1 足够。
五、StateFlow:专为状态设计
StateFlow 是 SharedFlow 的特化版本,专为「状态持有」设计:
- 必须有初始值
replay = 1(固定,新订阅者总是拿到最新状态)- 值相同时不发射(去重)
- 通过
.value属性同步读取当前值
StateFlow vs SharedFlow 对比
| 特性 | StateFlow | SharedFlow |
|---|---|---|
| 初始值 | 必须提供 | 不需要 |
| replay | 固定为 1 | 可自定义 |
| 去重 | 自动(相同值不发射) | 不自动去重 | 同步读取 | .value |
不支持 |
| 典型场景 | UI 状态持有 | 事件广播 |
stateIn:将冷流转为 StateFlow
val uiState = repository.getDataFlow()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = UiState.Loading // 必须提供初始值
)
六、Android 实战:MVVM 中的 Flow 最佳实践
1. ViewModel 暴露状态
class MyViewModel(private val repository: Repository) : ViewModel() {
// ✅ 推荐:stateIn 暴露 UI 状态
val uiState: StateFlow<UiState> = repository.observeData()
.map { data -> UiState.Success(data) }
.onStart { emit(UiState.Loading) }
.catch { emit(UiState.Error(it.message)) }
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = UiState.Loading
)
// ❌ 反面教材:Eagerly 导致后台持续运行
val badState = repository.observeData()
.stateIn(viewModelScope, SharingStarted.Eagerly, UiState.Loading)
}
2. 一次性事件:SharedFlow 而非 StateFlow
导航事件、Snackbar、Toast 等一次性事件不应该用 StateFlow——因为 StateFlow 会去重,且配置切换时会重放,导致事件重复消费。
class MyViewModel : ViewModel() {
// ✅ 一次性事件用 SharedFlow(replay = 0)
private val _navigationEvent = MutableSharedFlow<NavDestination>()
val navigationEvent: SharedFlow<NavDestination> = _navigationEvent.asSharedFlow()
// ✅ Snackbar 事件
private val _snackbarEvent = MutableSharedFlow<String>()
val snackbarEvent: SharedFlow<String> = _snackbarEvent.asSharedFlow()
fun onButtonClick() {
viewModelScope.launch {
_navigationEvent.emit(NavDestination.Detail)
}
}
}
// Activity/Fragment 中收集
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.navigationEvent.collect { destination ->
navController.navigate(destination)
}
}
}
3. 搜索建议:collectLatest 典型场景
class SearchViewModel : ViewModel() {
private val _query = MutableStateFlow("")
val searchResults = _query
.debounce(300) // 防抖 300ms
.filter { it.isNotBlank() }
.distinctUntilChanged() // 相同查询不重复搜索
.flatMapLatest { query -> // 新查询取消旧搜索
repository.search(query)
.catch { emit(emptyList()) }
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
fun onQueryChanged(query: String) {
_query.value = query
}
}
flatMapLatest 在上游值变化时取消下游 Flow 的收集,相当于对 Flow 本身做 collectLatest。搜索场景推荐 flatMapLatest,因为它在 Flow 操作符层面取消,比在 collect 层面更高效。
4. 传感器数据:conflate 典型场景
class SensorViewModel : ViewModel() {
// 传感器数据每秒可能更新 100+ 次,UI 只需要最新值
val accelerometerData = sensorManager.observeAccelerometer()
.conflate() // 丢弃中间值,只处理最新
.map { it.toUiModel() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = AccelerometerUiModel()
)
}
5. 列表 + 详情:多订阅者共享
class ListViewModel(private val repository: Repository) : ViewModel() {
// 多个 Fragment 共享同一数据源
val items = repository.observeItems()
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
replay = 1
)
}
// Fragment A: 列表页
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.items.collect { items ->
listAdapter.submitList(items)
}
}
}
// Fragment B: 详情页(同一 ViewModel)
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.items.collect { items ->
// 只更新当前详情项
val currentItem = items.find { it.id == currentId }
updateUI(currentItem)
}
}
}
七、repeatOnLifecycle:Android 生命周期感知
Flow 的 collect 是挂起函数,如果在 lifecycleScope 中直接调用,配置切换时可能重复收集或在不安全的生命周期状态收集。
// ❌ 错误:配置切换时重复收集
lifecycleScope.launch {
viewModel.uiState.collect { state ->
updateUI(state) // 旋转屏幕会再次触发
}
}
// ❌ 错误:在 onCreate 中收集,onStop 后仍在处理
lifecycleScope.launch {
viewModel.uiState.collect { ... }
}
// ✅ 正确:只在 STARTED 状态收集
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
updateUI(state)
}
}
}
collectAsStateWithLifecycle() 已经封装了这个逻辑:
@Composable
fun MyScreen(viewModel: MyViewModel) {
val uiState by viewModel.uiState.collectAsStateWithLifecycle()
// 自动在 STARTED 时收集、STOPPED 时停止
}
八、常见坑与最佳实践
坑 1:WhileSubscribed 的 stopTimeout 设为 0
// ❌ 配置切换时上游会立即停止再重启
SharingStarted.WhileSubscribed(0)
// ✅ 给 5 秒缓冲,避免配置切换时重启上游
SharingStarted.WhileSubscribed(5000)
坑 2:SharedFlow replay 太大
// ❌ 新订阅者收到 100 个历史值,内存和性能都有问题
MutableSharedFlow<Event>(replay = 100)
// ✅ 状态用 StateFlow(replay=1),事件用 SharedFlow(replay=0)
MutableSharedFlow<Event>(replay = 0)
坑 3:在 collect 中做耗时操作不加 buffer
// ❌ 耗时操作阻塞上游
flow { emit(1) }
.collect { value ->
Thread.sleep(1000) // 阻塞!
}
// ✅ 用 buffer 让生产者消费者并发
flow { emit(1) }
.buffer()
.collect { value ->
delay(1000) // 不阻塞上游
}
坑 4:混淆 StateFlow 和 SharedFlow 的用途
// ❌ 用 StateFlow 表示导航事件
val navigateEvent = MutableStateFlow<Screen?>(null)
// 问题:配置切换时 null → Screen 不会触发(去重),Screen → Screen 也不会触发
// ✅ 事件用 SharedFlow,状态用 StateFlow
val uiState = MutableStateFlow<UiState>(UiState.Loading) // 状态
val navigateEvent = MutableSharedFlow<Screen>() // 事件
九、速查表
| 场景 | 推荐方案 | 关键操作符 |
|---|---|---|
| ViewModel 暴露 UI 状态 | stateIn |
WhileSubscribed(5000) |
| 一次性事件(导航/Toast) | MutableSharedFlow(replay=0) |
asSharedFlow() |
| 搜索建议 | flatMapLatest + debounce |
distinctUntilChanged |
| 传感器/高频数据 | conflate() |
stateIn |
| 多 Fragment 共享数据 | shareIn(replay=1) |
WhileSubscribed |
| 生产者短暂突发 | buffer() |
capacity = BUFFERED |
| Compose 收集 | collectAsStateWithLifecycle() |
自动生命周期感知 |
| View 体系收集 | repeatOnLifecycle(STARTED) |
lifecycleScope.launch |
buffer 并发缓冲、conflate 保留最新、collectLatest 取消旧处理。共享流双雄——StateFlow 持有状态、SharedFlow 广播事件。Android 上 WhileSubscribed(5000) 是黄金搭档。