Kotlin Flow 快速入门

1. 引言:拥抱异步数据流的新方式

在现代 Android 开发中,处理网络请求、数据库访问、用户交互等异步操作是家常便饭。如何优雅、高效地管理这些随时间产生的数据流,一直是开发者关注的焦点。Kotlin 协程 (Coroutines) 为我们带来了强大的异步编程模型,而 Kotlin Flow 正是构建于协程之上的、用于处理冷数据流 (Cold Streams) 的解决方案。

如果你曾受困于回调地狱 (Callback Hell),觉得 LiveData 在某些场景下不够灵活,或者正在为你的 Kotlin 项目寻找 RxJava 的替代品,那么 Flow 将是你理想的选择。

本文将带你:

  • 理解 Flow 的核心概念。
  • 学习如何创建、转换和收集 Flow。
  • 掌握在 Android ViewModel 和 UI 中安全使用 Flow 的最佳实践。

学习前提: 本文假设你已具备 Kotlin 基础语法和 Kotlin 协程的基本知识。

2. 为什么选择 Kotlin Flow?

  • 基于协程: 与 Kotlin 协程深度集成,享受结构化并发带来的便利,简化异步代码管理和生命周期控制。
  • 冷流特性: Flow 默认是“冷”的,代码块只在被收集 (collect) 时执行,有效节省资源。
  • 操作符丰富: 提供大量类似 RxJava 的操作符 (map, filter, flatMapConcat, zip 等),方便地转换和组合数据。
  • 背压支持: 内建支持背压 (Backpressure),能自动处理数据生产和消费速率不匹配的问题。
  • 简洁的错误处理: 可使用标准 try-catch 或 Flow 提供的 catch 操作符优雅处理异常。
  • Jetpack 友好: 与 ViewModel、Lifecycle 等 Jetpack 组件无缝集成。

3. Flow 核心概念解析

可以把 Flow 想象成一个异步的数据序列,就像河流一样,数据项按顺序流动。

核心组成
  • 生产者 (Producer): 负责产生数据,通常在 flow { ... } 构建器内部使用 emit() 发射数据。
  • 中间操作符 (Intermediate Operators): 对数据流进行转换、过滤等操作(如 map, filter),返回一个新的 Flow。它们是惰性的。
  • 收集者 (Collector) / 终端操作符 (Terminal Operator): 触发 Flow 的执行并消费数据,最常用的是 collect。它是挂起函数。

3.1 创建 Flow (生产者)

最基础的创建方式是使用 flow { ... } 构建器:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.delay

// 定义一个 Flow,每秒发射一个数字 (0, 1, 2)
fun simpleNumberFlow(): Flow<Int> = flow {
    println("Flow started") // 只有 collect 时才会打印
    for (i in 0..2) {
        delay(1000) // 模拟耗时操作
        println("Emitting $i")
        emit(i) // 发射数据项
    }
}
Use code with caution.
Markdown
其他便捷构建器
flowOf(item1, item2, ...): 从固定值创建
listOf(1, 2, 3).asFlow(): 从集合或序列转换
3.2 消费 Flow (终端操作符)
使用终端操作符来启动 Flow 并接收数据collect 是最常用的
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.collect

fun main() = runBlocking { // 启动一个协程环境来运行 suspend 函数
    println("Calling flow...")
    val numberFlow = simpleNumberFlow()

    println("Calling collect...")
    // collect 是挂起函数,会等待 Flow 完成
    numberFlow.collect { value ->
        println("Collected $value")
    }
    println("Flow collection finished.")
}
Use code with caution.
Kotlin
输出:
Calling flow...
Calling collect...
Flow started
Emitting 0
Collected 0
Emitting 1
Collected 1
Emitting 2
Collected 2
Flow collection finished.
Use code with caution.
Text
其他终端操作符如 toList(), first(), reduce() 它们会收集整个 Flow 并返回一个单一值
3.3 转换 Flow (中间操作符)
中间操作符可以链式调用对数据流进行加工
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.filter

suspend fun processData(data: Int): String {
    delay(500) // 模拟处理耗时
    return "Processed data: $data"
}

fun main() = runBlocking {
    simpleNumberFlow() // 原始流: 0, 1, 2
        .filter { it % 2 == 0 } // 过滤奇数: 0, 2
        .map { data -> processData(data) } // 转换数据: "Processed data: 0", "Processed data: 2"
        .collect { result ->
            println(result)
        }
}
Use code with caution.
Kotlin
输出:
Flow started
Emitting 0
Processed data: 0
Emitting 1
Emitting 2
Processed data: 2
Use code with caution.
Text
常用中间操作符map, filter, transform, take, onEach, debounce, flatMapConcat, zip, combine 
4.  Android 中实战 Flow
通常在 Repository -> ViewModel -> UI 架构中使用 Flow
4.1 ViewModel 业务逻辑与状态管理
ViewModel 负责调用 Repository 获取数据通常返回 Flow),处理业务逻辑并将最终状态暴露给 UI推荐使用 StateFlow  SharedFlow  UI 暴露状态
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import android.util.Log

// 假设的 Repository 和数据类
data class UiState(val isLoading: Boolean = false, val data: String? = null, val error: String? = null)
class DataRepository {
    fun fetchData(): Flow<String> = flow {
        delay(2000) // 模拟网络请求
        // emit("Data fetched successfully!")
        throw RuntimeException("Network error!") // 模拟错误
    }
}

class MyViewModel(private val repository: DataRepository) : ViewModel() {

    // 使用 MutableStateFlow 管理 UI 状态
    private val _uiState = MutableStateFlow(UiState(isLoading = true)) // 初始状态为加载中
    val uiState: StateFlow<UiState> = _uiState.asStateFlow() // 暴露只读的 StateFlow 给 UI

    init {
        loadData()
    }

    fun loadData() {
        _uiState.value = UiState(isLoading = true) // 开始加载,更新状态

        viewModelScope.launch { // 在 ViewModel 的协程作用域中启动
            repository.fetchData()
                .map { data -> UiState(isLoading = false, data = data) } // 成功,更新状态
                .catch { e -> emit(UiState(isLoading = false, error = e.message ?: "Unknown error")) } // 捕获异常,更新状态
                .collect { state ->
                    _uiState.value = state // 将最终状态发射给 StateFlow
                    Log.d("MyViewModel", "State updated: $state")
                }
        }
    }
}
Use code with caution.
Kotlin
StateFlow vs SharedFlow
StateFlow: 持有最新状态值,新收集者会立即收到最新值。非常适合表示 UI 状态。只有一个值。 SharedFlow: 可以配置重播缓存 (replay),允许多个收集者接收数据流事件(一对多)。可以发射多个值。
4.2 UI (Activity/Fragment)安全地收集 Flow 关键在于生命周期感知避免在 UI 不可见时收集 Flow防止资源浪费和内存泄漏推荐使用 repeatOnLifecycle API import androidx.appcompat.app.AppCompatActivity import android.os.Bundle import android.widget.TextView import androidx.activity.viewModels // KTX 库,方便获取 ViewModel import androidx.lifecycle.Lifecycle import androidx.lifecycle.lifecycleScope import androidx.lifecycle.repeatOnLifecycle import kotlinx.coroutines.launch import com.example.yourapp.R // 假设你的 R 文件路径 class MainActivity : AppCompatActivity() { private val viewModel: MyViewModel by viewModels() // 获取 ViewModel 实例 private lateinit var textView: TextView override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // 假设布局中有个 TextView textView = findViewById(R.id.my_text_view) // 使用 lifecycleScope + repeatOnLifecycle 安全地收集 Flow lifecycleScope.launch { // 当生命周期至少为 STARTED 时,执行 collect 代码块 // 当生命周期进入 STOPPED 时,自动取消 collect // 当生命周期再次回到 STARTED 时,重新启动 collect repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.uiState.collect { state -> // 在这里根据 State 更新 UI if (state.isLoading) { textView.text = "Loading..." } else if (state.error != null) { textView.text = "Error: ${state.error}" } else { textView.text = state.data ?: "No data" } } } } // (可选) 添加一个按钮来触发重新加载 // val button = findViewById<Button>(R.id.reload_button) // button.setOnClickListener { viewModel.loadData() } } } Use code with caution. Kotlin
避免直接在 lifecycleScope.launch 中 collect
直接使用 lifecycleScope.launch { viewModel.flow.collect { … } } 会导致即使 UI 进入后台 (STOPPED),Flow 仍然在收集,浪费资源。repeatOnLifecycle 解决了这个问题。
4.3 错误处理 (catch 操作符) catch 操作符能捕获其上游之前的操作符和 flow 构建器的异常它本身也是一个中间操作符 fun main() = runBlocking { flow { emit(1) emit(2) throw RuntimeException("Error on emission!") // 上游异常 emit(3) // 这不会被发射 } .map { it * 2 } // 上游操作 .catch { e -> // 捕获上游异常 println("Caught error: ${e.message}") emit(-1) // 可以发射一个默认值或执行其他操作 } .collect { value -> // 下游消费 println("Collected value: $value") // 如果 collect 内部抛出异常,catch 是捕获不到的 } } Use code with caution. Kotlin 输出: Collected value: 2 Collected value: 4 Caught error: Error on emission! Collected value: -1 Use code with caution. Text 5. Flow vs LiveData vs RxJava 特性 Kotlin Flow LiveData RxJava (2/3) 基础 Kotlin 协程 Android Jetpack 独立库 (Java) 类型 冷流 (默认), 热流 (Shared/State) 热流 (生命周期感知) 冷流 (Observable), 热流 (Subject) 背压 内建支持 不支持 (主要用于 UI 状态) 支持 (多种策略) 操作符 丰富 有限 (主要靠 Transformations) 非常丰富 错误处理 try-catch, catch 操作符 通常在 Observer 中处理 onError 回调, 操作符 生命周期感知 需要手动处理 (e.g., repeatOnLifecycle) 内建 需要手动处理 (e.g., dispose) 平台 Kotlin Multiplatform Android Java (Android, Server ) 学习曲线 中等 (需懂协程) 选择建议: Android 项目/ Kotlin 项目: 优先考虑 Kotlin Flow尤其是 StateFlow 用于 UI 状态 简单 UI 状态更新: LiveData 仍然是一个简单有效的选择 已有大量 RxJava 代码的项目: 迁移成本较高可考虑继续使用 RxJava 或逐步迁移 6. 总结与展望 Kotlin Flow Android 开发带来了更现代更简洁更安全的异步数据流处理方案通过理解其核心概念冷流构建器操作符收集器)、掌握与 ViewModel UI 生命周期的结合方式 (viewModelScope, StateFlow, repeatOnLifecycle)以及熟悉其错误处理机制你将能更高效地构建响应式健壮的 Android 应用 这只是 Flow 的入门它还有更多高级特性如 SharedFlow缓冲 (buffer)并发 (flatMapMerge) 等待你去探索开始在你的项目中使用 Flow