Kotlin Flow 进阶:背压策略与共享流

用 Kotlin Flow 替换 LiveData 之后,很多人止步于 flow { } + collect { } 的基本用法。但当生产者发射速度超过消费者处理速度时,问题就来了——背压(Backpressure)。当你想让多个订阅者共享同一个数据源时,冷流的热化——共享流(SharedFlow / StateFlow)就是必修课。

本文是 Kotlin Flow 进阶篇,聚焦两个实战核心:背压策略与共享流机制。

前置知识:本文假设你已经了解 Flow 基本概念(冷流、collect、操作符),如果还没有,建议先看官方文档 Flow Guide

一、冷流与热流:本质区别

在讨论背压和共享流之前,先搞清楚冷流和热流的核心差异:

特性 冷流(Cold Flow) 热流(Hot Flow)
数据生产时机 被 collect 时才生产 独立于订阅者持续生产
多个订阅者 各自独立生产数据 共享同一数据源
类比 Netflix(点播) 电视直播(广播)
典型代表 flow { } SharedFlowStateFlow
背压问题 有(生产者可能比消费者快) 无(订阅者跟不上就丢弃或缓冲)
// 冷流:每次 collect 都独立执行
val coldFlow = flow {
    println("开始生产")  // 每个 collector 都会触发
    for (i in 1..3) {
        emit(i)
    }
}

// 两个 collector 独立执行
coldFlow.collect { println("A: $it") }  // 输出:开始生产, A:1, A:2, A:3
coldFlow.collect { println("B: $it") }  // 输出:开始生产, B:1, B:2, B:3

二、背压问题:生产者太快怎么办

Flow 是顺序执行的——生产者 emit 一个值后,必须等消费者 collect 完成才能 emit 下一个。如果生产者每秒 emit 100 个值,消费者每秒只能处理 10 个,积压就出现了。

// 模拟背压场景
fun fastProducer(): Flow<Int> = flow {
    for (i in 1..100) {
        emit(i)                    // 每 10ms 发射一个
        delay(10)
    }
}

// 默认行为:collect 多慢,整体就多慢
// collect 耗时 100ms → 整体耗时 100 * 100ms = 10 秒
fastProducer().collect { value ->
    delay(100)                     // 模拟慢消费者
    println("处理: $value")
}

默认的背压策略是 挂起等待——生产者等消费者处理完再发下一个。这是最安全的方式,但不是最高效的。

三、三种背压策略

1. buffer:并发缓冲

buffer() 让生产者和消费者在不同协程中运行,中间加一个缓冲区。生产者不用等消费者,先往缓冲区里塞。

fastProducer()
    .buffer(capacity = 50)  // 缓冲区容量 50
    .collect { value ->
        delay(100)
        println("处理: $value")
    }

执行流程:

无 buffer:  produce → wait → consume → produce → wait → consume ...
有 buffer:  produce → produce → produce ... (并发)
            consume → consume → consume ... (并发)

时间对比:
  无 buffer: ~10 秒
  buffer(50): ~6 秒(生产者先跑满,消费者从缓冲区取)
注意:buffer 不是万能的。如果消费者持续比生产者慢,缓冲区终究会满,最终还是退化为挂起等待。buffer 适用于「短暂突发」的场景。

buffer 容量选择

容量 行为 适用场景
0(默认) 无缓冲,生产者等消费者 需要严格顺序
1 缓冲 1 个值 最小并发
64 / Channel.BUFFERED 缓冲 64 个值(默认常量) 一般场景
Channel.UNLIMITED 无限缓冲 ⚠️ 谨慎使用,可能 OOM
Channel.CONFLATED 只保留最新值 同 conflate()

2. conflate:只保留最新值

conflate() 的策略是:消费者忙的时候,生产者继续发,但中间的值会被丢弃,只保留最新的。

fastProducer()
    .conflate()              // 跳过中间值
    .collect { value ->
        delay(100)
        println("处理: $value")
    }

// 输出示例:
// 处理: 1    (消费者处理 1 期间,2-10 被跳过)
// 处理: 10   (消费者处理 10 期间,11-20 被跳过)
// 处理: 20
// ...
// 总共只处理约 10 个值,而非 100 个
最佳场景:UI 刷新、实时数据展示。用户只关心最新状态,中间过渡值不重要。比如:进度条更新、传感器数据、股价显示。

3. collectLatest:取消旧处理

collectLatestconflate 类似,但它不只是跳过旧值——它会取消正在进行的旧 collect 块,让新的 collect 块立刻执行。

fastProducer()
    .collectLatest { value ->      // 新值到来时取消旧的处理
        println("开始处理: $value")
        delay(100)                  // 模拟耗时操作
        println("完成处理: $value")  // 很可能看不到这行
    }

// 输出示例:
// 开始处理: 1
// 开始处理: 2   (1 被取消,"完成处理: 1" 不会输出)
// 开始处理: 3   (2 被取消)
// ...
// 开始处理: 100
// 完成处理: 100  (最后一个值没有被打断,能完成)

conflate vs collectLatest 对比

特性 conflate collectLatest
处理旧值 跳过不处理 取消正在进行的处理
新值到达时 消费者继续处理当前值 中断当前处理,开始新处理
适用场景 处理必须完成,旧值可以跳过 处理可以中断,只关心最新结果
典型例子 日志打印、数据持久化 搜索建议、图片加载

三种策略选择决策树

生产者比消费者快?
├── 否 → 不需要处理,默认挂起等待即可
└── 是 → 消费者能否跳过中间值?
    ├── 能 → 中间值的处理必须完成吗?
    │   ├── 是 → conflate()
    │   └── 否 → collectLatest()
    └── 不能 → buffer() (缓冲突发,但注意容量)

四、SharedFlow:冷流变热流

冷流最大的局限是:每个 collector 独立生产数据。如果你有 3 个 UI 组件同时监听同一个数据源,冷流会生产 3 份数据——浪费资源且无法共享状态。

SharedFlow 是热流,多个订阅者共享同一数据源。

基本用法

// 方式1:直接创建
val sharedFlow = MutableSharedFlow<String>()

// 发射数据
scope.launch {
    sharedFlow.emit("Hello")
    sharedFlow.emit("World")
}

// 多个订阅者共享同一数据
sharedFlow.collect { println("A: $it") }  // A: Hello, A: World
sharedFlow.collect { println("B: $it") }  // B: Hello, B: World

shareIn:将冷流转换为 SharedFlow

// 冷流 → 热流
val locationFlow = flow {
    while (true) {
        emit(getLocation())  // 模拟位置更新
        delay(1000)
    }
}

// 用 shareIn 转为热流,多个订阅者共享
val sharedLocation = locationFlow.shareIn(
    scope = viewModelScope,
    started = SharingStarted.WhileSubscribed(5000),  // 停止策略
    replay = 1  // 新订阅者收到最近 1 个值
)

关键参数解析

1. started:何时开始共享

策略 行为 适用场景
Eagerly 立即开始,永不停止 全局常驻数据(时钟、配置)
Lazily 第一个订阅者到来时开始,永不停止 懒初始化场景
WhileSubscribed(stopTimeout) 有订阅者时运行,无订阅者时延迟停止 ⚠️ Android 最常用
Android 推荐:使用 SharingStarted.WhileSubscribed(5000)。当 UI 退到后台时,5 秒后自动停止上游数据生产,回到前台时自动恢复。这比 Eagerly 省电省流量。

stopTimeout 设为多少?5000ms 是官方推荐的默认值——既避免配置切换时频繁重启上游,又不至于浪费太多资源。

2. replay:重放缓存

// replay = 0:新订阅者只收到订阅后发射的值
val noReplay = flow.shareIn(scope, SharingStarted.Eagerly, replay = 0)

// replay = 1:新订阅者立即收到最近 1 个值(类似 LiveData)
val withReplay = flow.shareIn(scope, SharingStarted.Eagerly, replay = 1)

// replay = 3:新订阅者收到最近 3 个值
val moreReplay = flow.shareIn(scope, SharingStarted.Eagerly, replay = 3)
replay 的内存开销:每个 replay 的值都会缓存在内存中。replay 越大,内存占用越高。对于大对象(如 Bitmap、List),慎用高 replay 值。Android UI 场景通常 replay = 1 足够。

五、StateFlow:专为状态设计

StateFlowSharedFlow 的特化版本,专为「状态持有」设计:

  • 必须有初始值
  • replay = 1(固定,新订阅者总是拿到最新状态)
  • 值相同时不发射(去重)
  • 通过 .value 属性同步读取当前值

StateFlow vs SharedFlow 对比

特性 StateFlow SharedFlow
初始值 必须提供 不需要
replay 固定为 1 可自定义
去重 自动(相同值不发射) 不自动去重
同步读取 .value 不支持
典型场景 UI 状态持有 事件广播

stateIn:将冷流转为 StateFlow

val uiState = repository.getDataFlow()
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = UiState.Loading  // 必须提供初始值
    )

六、Android 实战:MVVM 中的 Flow 最佳实践

1. ViewModel 暴露状态

class MyViewModel(private val repository: Repository) : ViewModel() {

    // ✅ 推荐:stateIn 暴露 UI 状态
    val uiState: StateFlow<UiState> = repository.observeData()
        .map { data -> UiState.Success(data) }
        .onStart { emit(UiState.Loading) }
        .catch { emit(UiState.Error(it.message)) }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = UiState.Loading
        )

    // ❌ 反面教材:Eagerly 导致后台持续运行
    val badState = repository.observeData()
        .stateIn(viewModelScope, SharingStarted.Eagerly, UiState.Loading)
}

2. 一次性事件:SharedFlow 而非 StateFlow

导航事件、Snackbar、Toast 等一次性事件不应该用 StateFlow——因为 StateFlow 会去重,且配置切换时会重放,导致事件重复消费。

class MyViewModel : ViewModel() {

    // ✅ 一次性事件用 SharedFlow(replay = 0)
    private val _navigationEvent = MutableSharedFlow<NavDestination>()
    val navigationEvent: SharedFlow<NavDestination> = _navigationEvent.asSharedFlow()

    // ✅ Snackbar 事件
    private val _snackbarEvent = MutableSharedFlow<String>()
    val snackbarEvent: SharedFlow<String> = _snackbarEvent.asSharedFlow()

    fun onButtonClick() {
        viewModelScope.launch {
            _navigationEvent.emit(NavDestination.Detail)
        }
    }
}

// Activity/Fragment 中收集
lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.navigationEvent.collect { destination ->
            navController.navigate(destination)
        }
    }
}
为什么不用 StateFlow 表示事件?因为配置切换(如旋转屏幕)时,StateFlow 会重放最新值,导致导航事件重复触发。SharedFlow replay=0 不会重放,是更安全的事件通道。

3. 搜索建议:collectLatest 典型场景

class SearchViewModel : ViewModel() {

    private val _query = MutableStateFlow("")

    val searchResults = _query
        .debounce(300)              // 防抖 300ms
        .filter { it.isNotBlank() }
        .distinctUntilChanged()     // 相同查询不重复搜索
        .flatMapLatest { query ->   // 新查询取消旧搜索
            repository.search(query)
                .catch { emit(emptyList()) }
        }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )

    fun onQueryChanged(query: String) {
        _query.value = query
    }
}
flatMapLatest vs collectLatestflatMapLatest 在上游值变化时取消下游 Flow 的收集,相当于对 Flow 本身做 collectLatest。搜索场景推荐 flatMapLatest,因为它在 Flow 操作符层面取消,比在 collect 层面更高效。

4. 传感器数据:conflate 典型场景

class SensorViewModel : ViewModel() {

    // 传感器数据每秒可能更新 100+ 次,UI 只需要最新值
    val accelerometerData = sensorManager.observeAccelerometer()
        .conflate()                  // 丢弃中间值,只处理最新
        .map { it.toUiModel() }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = AccelerometerUiModel()
        )
}

5. 列表 + 详情:多订阅者共享

class ListViewModel(private val repository: Repository) : ViewModel() {

    // 多个 Fragment 共享同一数据源
    val items = repository.observeItems()
        .shareIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            replay = 1
        )
}

// Fragment A: 列表页
viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.items.collect { items ->
            listAdapter.submitList(items)
        }
    }
}

// Fragment B: 详情页(同一 ViewModel)
viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.items.collect { items ->
            // 只更新当前详情项
            val currentItem = items.find { it.id == currentId }
            updateUI(currentItem)
        }
    }
}

七、repeatOnLifecycle:Android 生命周期感知

Flow 的 collect 是挂起函数,如果在 lifecycleScope 中直接调用,配置切换时可能重复收集或在不安全的生命周期状态收集。

// ❌ 错误:配置切换时重复收集
lifecycleScope.launch {
    viewModel.uiState.collect { state ->
        updateUI(state)  // 旋转屏幕会再次触发
    }
}

// ❌ 错误:在 onCreate 中收集,onStop 后仍在处理
lifecycleScope.launch {
    viewModel.uiState.collect { ... }
}

// ✅ 正确:只在 STARTED 状态收集
lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.uiState.collect { state ->
            updateUI(state)
        }
    }
}
Jetpack Compose 中不需要 repeatOnLifecycle——collectAsStateWithLifecycle() 已经封装了这个逻辑:
@Composable
fun MyScreen(viewModel: MyViewModel) {
    val uiState by viewModel.uiState.collectAsStateWithLifecycle()
    // 自动在 STARTED 时收集、STOPPED 时停止
}

八、常见坑与最佳实践

坑 1:WhileSubscribed 的 stopTimeout 设为 0

// ❌ 配置切换时上游会立即停止再重启
SharingStarted.WhileSubscribed(0)

// ✅ 给 5 秒缓冲,避免配置切换时重启上游
SharingStarted.WhileSubscribed(5000)

坑 2:SharedFlow replay 太大

// ❌ 新订阅者收到 100 个历史值,内存和性能都有问题
MutableSharedFlow<Event>(replay = 100)

// ✅ 状态用 StateFlow(replay=1),事件用 SharedFlow(replay=0)
MutableSharedFlow<Event>(replay = 0)

坑 3:在 collect 中做耗时操作不加 buffer

// ❌ 耗时操作阻塞上游
flow { emit(1) }
    .collect { value ->
        Thread.sleep(1000)  // 阻塞!
    }

// ✅ 用 buffer 让生产者消费者并发
flow { emit(1) }
    .buffer()
    .collect { value ->
        delay(1000)  // 不阻塞上游
    }

坑 4:混淆 StateFlow 和 SharedFlow 的用途

// ❌ 用 StateFlow 表示导航事件
val navigateEvent = MutableStateFlow<Screen?>(null)
// 问题:配置切换时 null → Screen 不会触发(去重),Screen → Screen 也不会触发

// ✅ 事件用 SharedFlow,状态用 StateFlow
val uiState = MutableStateFlow<UiState>(UiState.Loading)    // 状态
val navigateEvent = MutableSharedFlow<Screen>()             // 事件

九、速查表

场景 推荐方案 关键操作符
ViewModel 暴露 UI 状态 stateIn WhileSubscribed(5000)
一次性事件(导航/Toast) MutableSharedFlow(replay=0) asSharedFlow()
搜索建议 flatMapLatest + debounce distinctUntilChanged
传感器/高频数据 conflate() stateIn
多 Fragment 共享数据 shareIn(replay=1) WhileSubscribed
生产者短暂突发 buffer() capacity = BUFFERED
Compose 收集 collectAsStateWithLifecycle() 自动生命周期感知
View 体系收集 repeatOnLifecycle(STARTED) lifecycleScope.launch
一句话总结:背压三剑客——buffer 并发缓冲、conflate 保留最新、collectLatest 取消旧处理。共享流双雄——StateFlow 持有状态、SharedFlow 广播事件。Android 上 WhileSubscribed(5000) 是黄金搭档。