平时看博客或者学知识,学到的东西比较零散,没有独立的知识模块概念,而且学了之后很容易忘。于是我建立了一个自己的笔记仓库 (一个我长期维护的笔记仓库,感兴趣的可以点个star~你的star是我写作的巨大大大大的动力),将平时学到的东西都归类然后放里面,需要的时候呢也方便复习。
在现代Android中,异步处理数据和响应式编程已经成为了不可或缺的一部分。app的代码越来越复杂,我们需要一种简单的、高效的方式来处理数据流和状态变化。Kotlin Flow是协程的一部分,提供了强大且灵活的API来应对这些挑战。
Kotlin Flow可以简化异步编程的复杂性,它可以与协程配合,并提供了丰富的操作符,使用它我们可以轻松实现数据流的转换、组合和过滤等。
接下来,我们将简单探讨Kotlin Flow的基础知识,包括概念、使用方式。
在开始学习之前,我们需要先简单了解一下Kotlin Flow的基本概念。
数据流是一种数据处理方式,数据被异步地、连续地传输和处理。就像水从高处经水管流到低处,在流的这个过程中可以对水进行一些处理,比如过滤、加糖、加果汁、加热等等,下游拿到的水是已经经过处理的水,直接拿去消费(喝掉)就行。
在Kotlin中,对数据流的建模合适的类型就是Flow。从概念上讲,可以异步计算的数据流称之为Flow。Flow是Kotlin协程的一部分,提供了对数据流的产生、变换、组合和消费的强大支持。通过Flow,开发者可以轻松地处理异步数据流,编写高效、响应式的app。
Flow可以连续地发出多个值,而不像传统的函数,调一次只返回一个值。还有点区别在于,Flow是使用挂起函数以异步的方式进行消费和生产。
Kotlin Flow并不是唯一的数据流,但它是协程的一部分,所以和协程配合得很好。比如大家熟悉的RxJava也是数据流的一种建模。
数据流包含3个重要概念:
LiveData是Android架构组件的一部分,它能保存数据、能感知生命周期、利用观察者模式在可用的生命周期范围内将最新的数据通知给观察者。它的设计初衷是为了简化开发,上手相对比较容易。然而,LiveData在处理复杂的数据流时存在一些限制,比如只能在主线程操作数据、操作符不够强大、而且不支持切换线程。正如前面所说,它是为了简化开发才被造出来的,搞的太复杂反而违背了设计初衷。
Kotlin的协程是Kotlin语言引入的一种并发编程工具,一套方便的API,它允许开发者以同步的方式写出异步的代码,简化异步逻辑的代码编写。协程通过提供结构化的并发模式,使得编写异步代码变得更加直观和易于理解。协程可以恢复和暂停,比如可以挂起直到某些条件满足才继续执行。这些特性使得协程可以非常方便地处理复杂的异步逻辑。
虽然说Kotlin协程已经提供了强大的异步编程能力,但Kotlin Flow提供了额外的抽象层次,专门用于处理异步数据流的,它是作为协程的补充,使得开发者可以更方便地构建复杂的异步数据处理逻辑。
另外,LiveData是想一次处理一个数据,而Flow是连续的数据流。LiveData的数据处理是在主线程,而Flow可以切到各种线程去处理,还有异常处理、各种操作符、与协程紧密配合等。看起来Flow只是对LiveData的补充,而不是替代,当需要处理复杂的数据流时,可能用Flow更加适合,弥补了一些LiveData的局限性。
在Flow之前呢,有RxJava可以非常方便地处理数据流,但是呢,RxJava上手难度比较大,而且不能与Kotlin协程进行很好的配合,Flow就相对更容易上手,而且与Kotlin配合紧密,方便操作,具体更详细的对比后面会提到。
来个LiveData和Flow的对比:
| LiveData | Flow |
|---|---|
| 支持Java和Kotlin,使用简单 | 仅支持在Kotlin中使用,Java中使用起来比较困难 |
| 不需要协程环境来执行 | 需要协程环境来执行 |
| 主要在主线程上运行 | 在协程上运行,不阻塞主线程 |
| 转换运算符在主线程上执行 | 运算符是挂起函数,可以很方便地在不同线程上执行 |
| 默认情况下,能感知生命周期 | 默认情况下,不能感知生命周期 |
如果是Java项目就需要注意了,Livedata能和Java配合,Kotlin Flow想要和Java配合就难咯。
可能很多人都用过RxJava,前几年,在纯Java的Android项目中,RxJava对于响应式编程非常友好。但是上手难度还是比较大的,要学很久才知道怎么用,以及怎么才能用好。而后面,大家开始用Kotlin,然后用Kotlin协程,又有了Kotlin Flow,上手难度比RxJava轻松了不少。那么,相比RxJava,Kotlin Flow有哪些优势呢?
说了这么一大堆抽象的东西,下面我们来看看具体怎么使用。
先举一个简单例子:
fun main(): Unit = runBlocking { // 创建3个Flow,生产数据 val firstFlow = flowOf(1, 2) val secondFlow = flow { emit(3) emit(4) } val thirdFlow = listOf(5, 6).asFlow() // 挨个收集,消费者 firstFlow.collect { println(it) } secondFlow.collect { println(it) } thirdFlow.collect { println(it) } } 从这段代码中我们可以发现,Flow 的创建方式多样,如使用flowOf、flow、asFlow等。上面的例子中,每个 Flow 都通过 collect 终止操作来收集其发射的值,并对每个值执行相应的操作,而collect是需要在协程环境中执行的。
正如我前面所说的,除了生产者和消费者之外,中间还有一个可选的加工者,人如其名,是对数据进行转换加工等操作的,下面我们来简单看一下。
val firstFlow = flowOf(1, 2) // 将数据做 +2 处理 firstFlow.map { it + 2 }.collect { println(it) } 利用map操作符对数据进行了 +2 处理,这样最后输出就是3,4了。这里的map和我们平时使用的集合操作符map是一个含义,用法也是一样的,所以用到Flow上会看起来非常自然,没有陌生感。除了map以外,还有其他的操作符。
flowOf(1, 2, 3).map { it * 2 } flowOf(1, 2, 3).filter { it % 2 == 0 } flowOf(1, 2, 3).transform { value -> emit(value * 2) emit(value * 3) } flowOf(1, 2, 3, 4).take(2) flowOf(1, 2).zip(flowOf("A", "B")) { a, b -> "$a -> $b" } val flow1 = flow { emit(1) delay(100) emit(2) delay(100) emit(3) } val flow2 = flow { emit("A") delay(500) emit("B") emit("C") } val combinedFlow = flow1.combine(flow2) { a, b -> "$a$b" } combinedFlow.collect { println(it) } // 输出 // 1A // 2A // 3A // 3B // 3C flowOf(1, 2, 3).collect { println(it) } val list = flowOf(1, 2, 3).toList() val first = flowOf(1, 2, 3).first() flow { for (i in 1..3) { println("flow ${currentCoroutineContext()}") emit(i) } }.flowOn(Dispatchers.Default) .map { println("map ${currentCoroutineContext()}") it.toString() } .flowOn(Dispatchers.IO) .collect { withContext(Dispatchers.IO) { println("collect withContext ${currentCoroutineContext()}") } println("collect ${currentCoroutineContext()}") println(it) } /* 输出: flow [ProducerCoroutine{Active}@3b6f6746, Dispatchers.Default] flow [ProducerCoroutine{Active}@3b6f6746, Dispatchers.Default] flow [ProducerCoroutine{Active}@3b6f6746, Dispatchers.Default] map [ScopeCoroutine{Active}@4b60f5ce, Dispatchers.IO] map [ScopeCoroutine{Active}@4b60f5ce, Dispatchers.IO] map [ScopeCoroutine{Active}@4b60f5ce, Dispatchers.IO] collect withContext [DispatchedCoroutine{Active}@8945c64, Dispatchers.IO] collect [ScopeCoroutine{Active}@6fdb1f78, BlockingEventLoop@51016012] 1 collect withContext [DispatchedCoroutine{Active}@437a60dc, Dispatchers.IO] collect [ScopeCoroutine{Active}@6fdb1f78, BlockingEventLoop@51016012] 2 collect withContext [DispatchedCoroutine{Active}@7b238e10, Dispatchers.IO] collect [ScopeCoroutine{Active}@6fdb1f78, BlockingEventLoop@51016012] 3 */ // 先看一下有缓冲区的情况 flowOf("A","B","C","D","E") .onEach { println("Woman matchmaker emits: $it") } .buffer() .collect { println("Girl appointment with: $it") delay(1000) } //输出 Woman matchmaker emits: A Woman matchmaker emits: B Woman matchmaker emits: C Woman matchmaker emits: D Woman matchmaker emits: E Girl appointment with: A Girl appointment with: B Girl appointment with: C Girl appointment with: D Girl appointment with: E // 无缓冲区的情况 flowOf("A","B","C","D","E") .onEach { println("Woman matchmaker emits: $it") } .collect { println("Girl appointment with: $it") delay(1000) } // 输出 Woman matchmaker emits: A Girl appointment with: A Woman matchmaker emits: B Girl appointment with: B Woman matchmaker emits: C Girl appointment with: C Woman matchmaker emits: D Girl appointment with: D Woman matchmaker emits: E Girl appointment with: E flowOf(1, 2, 3).conflate() flow { emit(1) throw RuntimeException("RuntimeException") }.catch { e -> emit(-1) }.collect { println(it) } flow { emit(1) throw RuntimeException("RuntimeException") }.retry(3).collect { println(it) } // 输出 1 1 1 1 Exception in thread "main" java.lang.RuntimeException: RuntimeException 主要分为冷流和热流两种:
冷流(如Flow):
热流(如StateFlow、SharedFlow):
StateFlow 和 SharedFlow 是热流,它们提供了不同的功能和使用场景。生产数据不依赖消费者消费,热流与消费者是一对多的关系,当有多个消费者时,它们之间的数据都是同一份。MutableSharedFlow、MutableStateFlow是它们的可读可写的版本。
StateFlow 与 LiveData 有点类似,这里可以对照着学习,比如相同的地方有:
不同的地方:
SharedFlow和StateFlow一样,SharedFlow 也有两个版本:SharedFlow 与 MutableSharedFlow。那么它与StateFlow哪里不一样呢?
它们的使用场景大概可以这么区分:一个是状态(StateFlow),一个是事件(SharedFlow)。
它们两个是Kotlin Flow里面的扩展函数,用于将冷流转换为热流。
stateIn 将一个冷流转换为 StateFlow。它会保留最新的值,并且任何新的订阅者都会立即收到当前状态。怎么使用?
// ViewModel val stateInFlow = flow { emit(1) delay(300L) emit(2) delay(300L) emit(3) }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), null) // Activity lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.stateInFlow.collect { log("stateInFlow data $it") } } } // 输出 stateInFlow data null stateInFlow data 1 stateInFlow data 2 stateInFlow data 3 首先我们注意到stateIn需要传入3个参数,意思如下:
@param scope the coroutine scope in which sharing is started. @param started the strategy that controls when sharing is started and stopped. @param initialValue the initial value of the state flow. This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy with the `replayExpirationMillis` parameter. 官方建议:对于那些一次性的操作来说,你可以使用Lazily、Eagerly,但是对于需要观察其他的Flow的情况来说,更推荐用WhileSubscribed。WhileSubscribed在最后一个订阅者停止订阅之后还能继续保持stopTimeoutMillis时间的活跃,之后才停止,这有个好处,比如用户将app切换到后台,此时,上游的流就没必要继续产生并发射数据了,app都没在前台了,发射数据有点浪费资源了。但是,当app只是从竖屏切换到横屏状态时(lifecycle.repeatOnLifecycle那里我们传入的是STARTED,所以会被取消,重新STARTED时会重新collect),这种就没必要取消上游的生产者生成数据了,所以有个stopTimeoutMillis的时间值在那里。官方表示,合适的时间是5000毫秒。
shareIn 将一个冷流转换为 SharedFlow。它可以配置缓冲区大小和重播值的数量,并且可以在多个订阅者之间共享数据。共享同一个流的数据,而不是重新执行流的计算。
shareIn和stateIn参数差不多,但是没有初始值,多了一个replay参数,这个参数什么意思呢?假设:之前有订阅者,并且已经有3个流数据了,replay=1,这时再来一个订阅者,那么就会发射最新的那个值给这个新的订阅者,而不会发射给这个新的订阅者早先的第一个和第二个数据。
举个例子:
val shareInFlow = flow { emit(1) delay(300L) emit(2) delay(300L) emit(3) }.shareIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), 1) fun testShareIn() { viewModelScope.launch { launch { shareInFlow.collect { log("订阅者1 shareInFlow data $it") } } delay(1000L) launch { shareInFlow.collect { log("订阅者2 shareInFlow data $it") } } } } // 输出: 订阅者1 shareInFlow data 1 订阅者1 shareInFlow data 2 订阅者1 shareInFlow data 3 订阅者2 shareInFlow data 3 类似Kotlin协程中的suspendCancellableCoroutine,将callback转换为协程的风格。那么回调callback拿到的数据怎么转换为Flow发射出去呢?
答案是使用callbackFlow。callbackFlow 是一种特殊的 Flow 构建器,它允许你从回调中发射数据。举个栗子:
fun locationFlow(locationManager: LocationManager): Flow = callbackFlow { val listener = object : LocationListener { override fun onLocationChanged(location: Location) { trySend(location) // Emit the location update to the flow } } locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER, 0L, 0f, listener) awaitClose { locationManager.removeUpdates(listener) // Unregister listener on cancellation } } fun main() { runBlocking { locationFlow(locationManager) .collect { location -> println("Received location: ${location.latitude}, ${location.longitude}") } } } 默认情况下,Flow是不具备生命周期感知能力的。但我们可以使用下面的方法让其具备生命周期感知的能力。
V Lifecycle.repeatOnLifecycle(state) 官方推荐:使用repeatOnLifecycle在界面层收集Flow。调用repeatOnLifecycle的协程将不会继续执行后面的代码了,当它恢复的时候,已经是ui DESTROY的时候了,所以不要在repeatOnLifecycle的后面继续repeatOnLifecycle。官方推荐在repeatOnLifecycle里面launch多次,开启多个协程,然后在里面collect,相互不影响。Flow.flowWithLifecycle(lifecycle, state) 如果只有一个Flow数据需要收集,那么官方推荐使用flowWithLifecycle。// Update the uiState lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.uiState .onEach { uiState = it } .collect() } } 那么,你可能会问了?我都在lifecycleScope里面进行collect了,为啥还需要考虑生命周期的问题,不是自带感知吗? 答案是:这种方式会在ui非DESTROY时一直可以collect,即使app在后台,即onStop的状态时,也会进行收集,然后对ui进行更新,除非你确实有这个需求,那么不然就有点浪费资源了。
那么,在lifecycleScope.launchWhenStarted里面收集Flow数据应该没问题了吧?还是有一点问题,在ui层达到onStop状态之后,但并未destroy时,比如按home键,app此时在后台活着,这个时候Flow的管道还继续存在,且Flow的生产方还可以继续生产并emit。
当ViewModel向外部暴露一个冷流时,这个冷流是向网络请求数据,那么每次这个冷流被collect时都会进行一次网络请求。比如配置变更时,再次collect,那么就会再请求一次网络,这明显不太合适。
val result: Flow> = flow { emit(repository.fetchItem()) } 这个时候就需要一个可以临时储存数据的一个东西,假设我们叫它储物箱,它的作用是将上游生产的数据暂时存起来,不管下游collect多少次,都是从这个储物箱中获取最新的那个数据。其实,上面我们已经讲过这么一个东西了,它就是StateFlow。普通的Flow可以通过stateIn转换为StateFlow。官方建议在ViewModel中向外暴露StateFlow,或者用asLiveData转为LiveData。
看下实际项目中,怎么使用Flow。
请求网络在App开发中非常常见,我这里简单写了一个demo,用Retrofit获取数据:
interface WanAndroidService { @GET("wxarticle/chapters/json") suspend fun listRepos(): WxList? } class KotlinFlowViewModel : ViewModel() { val retrofit = Retrofit.Builder() .baseUrl(WANANDROID_BASE_URL) .addConverterFactory(GsonConverterFactory.create()) .build() val api = retrofit.create(WanAndroidService::class.java) fun getWxData(): Flow = flow { val response = api.listRepos() emit(response) }.catch { log("出错了 $it") emit(null) }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), null) } // Activity lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.getWxData().collect { newData -> Log.d("xfhy666", "getWxData ${newData?.data?.getOrNull(0)?.name}") tv_data.text = newData?.data?.getOrNull(0)?.name ?: "没获取到数据" } } } 我在获取网络数据之后使用了stateIn,将上游的Flow转换为StateFlow,将数据暂存到StateFlow中,然后在Activity中进行collect收集数据,进行ui展示。使用方式和LiveData类似。
就像协程能结合Room一起使用一样,Flow也能和Room一起配合使用。首先来看一下dao的定义:
@Dao interface UserDao { @Insert suspend fun insertUserBySuspend(user: User) @Query("SELECT * FROM user") fun getAllBySuspendFlow(): Flow> }
注意看getAllBySuspendFlow的返回值,是一个Flow。然后再来看使用方式:
// ViewModel /** * 插入数据到room */ fun insertUserData() { val user = User(name = "${random.nextInt()} 罗辑", age = random.nextInt()) viewModelScope.launch { userDao.insertUserBySuspend(user) } } /** * collect之后,实时接收数据库中所有的user */ val userDataList: Flow?> = userDao .getAllBySuspendFlow() .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), null) // Activity lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { launch { flowViewModel.userDataList.collect { dataList -> log("数据库中的数据总个数为 : ${dataList?.size}") } } } }
一旦我调用insertUserData方法插入数据到数据库中的时候,我的flowViewModel.userDataList.collect就会收到数据,相当于可以一直观察着数据的变化。是不是让你想起了点什么?没错,之前LiveData也是类似的:
@Dao interface UserDao { @Query("SELECT * FROM user WHERE id = :userId") fun getUserById(userId: Int): LiveData @Query("SELECT * FROM user") fun getAllUsers(): LiveData> }
如果你在之前的项目在使用LiveData,那么你刚开始尝试使用Flow时,可能会遇到之前用LiveData解决问题的场景,现在用Flow怎么实现的问题。下面,我将举几个例子,简单说明一下怎么从LiveData转换到Flow。
在ViewModel中请求数据,然后用LiveData暴露出去,在UI层观察。下面的示例中,我会分为两部分,上半部分是LiveData的用法,下半部分是Flow的用法。
// ViewModel private val _livedata1 = MutableLiveData() val livedata1 = _livedata1 fun fetchData1() { viewModelScope.launch(Dispatchers.IO) { val result = api.listRepos() _livedata1.postValue(result?.toString()) } } val flow1 = flow { val result = api.listRepos() emit(result.toString()) }.flowOn(Dispatchers.IO) .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), null) 那么在Activity中的收集代码如下:
flowViewModel.fetchData1() flowViewModel.livedata1.observe(this) { log("livedata1 数据 $it") } lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.flow1.collect { log("flow1 数据 $it") } } } 如上,一个简单的使用场景,使用起来差别不大。
在ViewModel中一个LiveData的数据依赖于另一个LiveData,并且需要用SwitchMap转换一下数据。下面的示例中,我会分为两部分,上半部分是LiveData的用法,下半部分是Flow的用法。
private val _liveDataA = MutableLiveData() val liveDataB: LiveData = _liveDataA.switchMap { value -> MutableLiveData("hh $value") } fun fetchData2() { _liveDataA.value = "param1" } private val flowA = MutableStateFlow("") val flowB: Flow = flowA.flatMapLatest { value -> flow { emit("hh $value") } }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), "") fun fetchFlowA() { flowA.value = "param1" } 那么在Activity中的收集代码如下:
flowViewModel.fetchData2() flowViewModel.liveDataB.observe(this) { log("liveDataB 数据 $it") } flowViewModel.fetchFlowA() lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.flowB.collect { log("flowB 数据 $it") } } } 如上,在LiveData中使用SwitchMap,在Flow中可以使用flatMapLatest转一下。
MediatorLiveData的数据,来源于观察另一个或多个LiveData others的数据,在观察到others数据变化时,根据业务需要得出新的值。这样可以用于合并多个LiveData的值、选择某个LiveData最新的值、获取多个LiveData的最新的值。
// ViewModel private val _liveData31 = MutableLiveData() private val _liveData32 = MutableLiveData() val mediatorLiveData: LiveData = MediatorLiveData().apply { addSource(_liveData31) { value -> value?.let { postValue("liveData31: $it LiveData32: ${_liveData32.value}") } } addSource(_liveData32) { value -> value?.let { postValue("LiveData32: $it liveData31: ${_liveData31.value}") } } } fun fetchData3() { _liveData31.value = "哈哈" _liveData32.value = 6 } private val flow31 = MutableStateFlow("") private val flow32 = MutableStateFlow(0) val combinedFlow = flow31.combine(flow32) { valueA, valueB -> "flow31: $valueA, flow32: $valueB" }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), "") fun fetchData3ByFlow() { flow31.value = "哈哈" flow32.value = 6 } 那么在Activity中的收集代码如下:
flowViewModel.fetchData3() flowViewModel.mediatorLiveData.observe(this) { log("mediatorLiveData 数据 $it") } flowViewModel.fetchData3ByFlow() lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.combinedFlow.collect { log("combinedFlow 数据 $it") } } } 要观察多个LiveData的值可以使用MediatorLiveData,而在Flow中,可以通过使用combine关键词来观察并合并多个 Flow 的最新值。
Transformations.map经常用于观察另一个LiveData的值,观察到变化时,对其map操作进行一些转换,然后生成一个新的LiveData。
// ViewModel private val liveData4: LiveData = MutableLiveData() val mappedLiveData: LiveData = Transformations.map(liveData4) { value -> "Mapped livadata value: $value" } private val flow4: Flow = flowOf(1, 2, 3) val mappedFlow: Flow = flow4.map { value -> "Mapped flow value: $value" }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), "") 那么在Activity中的收集代码如下:
flowViewModel.mappedLiveData.observe(this) { log("mappedLiveData data : $it") } lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.mappedFlow.collect { log("flow4 data $it") } } } 这种情况的话,LiveData和Flow几乎是一模一样的操作,都是通过map操作符来完成观察另外一个数据,并转换成新的数据。
Transformations.switchMap主要是将一个LiveData的值,转换为另一个LiveData。来看看Flow中是怎么实现:
// ViewModel private val _liveData5 = MutableLiveData() val switchMappedLiveData: LiveData = Transformations.switchMap(_liveData5) { value -> liveData { val result = "livedata data $value" emit(result) } } fun fetchData5() { _liveData5.value = "param1" } private val flow5 = MutableStateFlow("") val switchMappedFlow: Flow = flow5.flatMapLatest { value -> flow { val result = "flow data $value" emit(result) }.flowOn(Dispatchers.IO) } fun fetchFlow5() { flow5.value = "param1" } 那么在Activity中的收集代码如下:
flowViewModel.switchMappedLiveData.observe(this) { log("switchMappedLiveData data : $it") } flowViewModel.fetchData5() lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.switchMappedFlow.collect { log("switchMappedFlow data : $it") } } } flowViewModel.fetchFlow5() 在Flow中可以使用flatMapLatest来代替Transformations中的switchMap。
flow { emit(1) emit(null) emit("3") }.collect { println(it) } //输出: 1 null 3 data class Person(val name: String) val person = Person("一") flowOf(person, person, person, person).collect { println(it) } // 输出: Person(name=一) Person(name=一) Person(name=一) Person(name=一) val flow1 = flowOf(1, 2, 3, 4, 5, 6) val flow2 = flowOf("a", "b", "c") flow1.zip(flow2) { value1, value2 -> "新的数据: `$value1 $`value2" }.collect { println(it) } // 输出: //新的数据: 1 a //新的数据: 2 b //新的数据: 3 c 如果你尝试连续发射相同的值,StateFlow 会忽略后续的发射尝试,因为状态没有变化。
data class State(val data: Int, val timestamp: Long = System.currentTimeMillis()) val stateFlow = MutableStateFlow(State(1)) suspend fun updateData(value: Int) { stateFlow.emit(State(value)) } async { updateData(1) delay(100) updateData(1) } stateFlow.collect { println(it) } // 输出 State(data=1, timestamp=1719280640886) State(data=1, timestamp=1719280640944) State(data=1, timestamp=1719280641052) val sharedFlow = MutableSharedFlow() async { sharedFlow.emit(1) sharedFlow.emit(1) sharedFlow.emit(1) sharedFlow.emit(1) sharedFlow.emit(1) } sharedFlow.collect { println(it) } // 输出 1 1 1 1 1 大家先看一下下面的代码有没有问题:
fun getWxData(): Flow = flow { val response = api.listRepos() emit(response) } fun getListData(): Flow = flowOf(1, 2, 3) lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { flowViewModel.getWxData().collect { newData -> Log.d("xfhy666", "getWxData ${newData?.data?.getOrNull(0)?.name}") tv_data.text = newData?.data?.getOrNull(0)?.name ?: "没获取到数据" } flowViewModel.getListData().collect { data -> Log.d("xfhy666", "getListData 获取到的数据 $data") } } } getWxData和getListData都是返回的Flow,那么我在repeatOnLifecycle中连续两次collect,这样有问题吗?先看一下结果:
2024-07-02 07:24:40.520 8652-8652/com.xfhy.allinone D/xfhy666: getWxData 鸿洋 2024-07-02 07:24:40.524 8652-8652/com.xfhy.allinone D/xfhy666: getListData 获取到的数据 1 2024-07-02 07:24:40.524 8652-8652/com.xfhy.allinone D/xfhy666: getListData 获取到的数据 2 2024-07-02 07:24:40.524 8652-8652/com.xfhy.allinone D/xfhy666: getListData 获取到的数据 3 看起来是没问题的,我现在做一下改变,将getWxData的Flow转换为热流StateFlow
fun getWxData(): Flow = flow { val response = api.listRepos() emit(response) }.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), null) 其他地方都没动,现在我们看下结果:
2024-07-02 07:30:37.678 9472-9472/com.xfhy.allinone D/xfhy666: getWxData null 2024-07-02 07:30:43.120 9472-9472/com.xfhy.allinone D/xfhy666: getWxData 鸿洋 可以看到,getListData的Flow数据没有被消费,第一个collect一直阻塞在那里了,后面的就执行不到了。我们需要简单改一下:
lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { launch { flowViewModel.getWxData().collect { newData -> Log.d("xfhy666", "getWxData ${newData?.data?.getOrNull(0)?.name}") tv_data.text = newData?.data?.getOrNull(0)?.name ?: "没获取到数据" } } launch { flowViewModel.getListData().collect { data -> Log.d("xfhy666", "getListData 获取到的数据 $data") } } } } 好了,这样就能collect到数据了,并且互不影响,现在的输出数据为:
2024-07-02 07:33:51.358 9676-9676/com.xfhy.allinone D/xfhy666: getWxData null 2024-07-02 07:33:51.361 9676-9676/com.xfhy.allinone D/xfhy666: getListData 获取到的数据 1 2024-07-02 07:33:51.361 9676-9676/com.xfhy.allinone D/xfhy666: getListData 获取到的数据 2 2024-07-02 07:33:51.361 9676-9676/com.xfhy.allinone D/xfhy666: getListData 获取到的数据 3 2024-07-02 07:33:52.721 9676-9676/com.xfhy.allinone D/xfhy666: getWxData 鸿洋 我先写一段会抛出异常的代码:
fun getThrowFlow(): Flow = flow { emit(1) emit(2) throw RuntimeException("异常") } lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { launch { flowViewModel.getThrowFlow().collect { Log.d("xfhy666", "getThrowFlow 获取到的数据 $it") } } } } 然后在Activity中collect,看下结果:
2024-07-02 07:39:19.803 10024-10024/com.xfhy.allinone D/xfhy666: getThrowFlow 获取到的数据 1 2024-07-02 07:39:19.803 10024-10024/com.xfhy.allinone D/xfhy666: getThrowFlow 获取到的数据 2 --------- beginning of crash 2024-07-02 07:39:19.836 10024-10024/com.xfhy.allinone E/AndroidRuntime: FATAL EXCEPTION: main Process: com.xfhy.allinone, PID: 10024 java.lang.RuntimeException: 异常 at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowViewModel$getThrowFlow$1.invokeSuspend(KotlinFlowViewModel.kt:47) // 这一行的ViewModel中我抛出异常的地方 at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowViewModel$getThrowFlow$1.invoke(Unknown Source:8) at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowViewModel$getThrowFlow$1.invoke(Unknown Source:4) at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61) at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:230) // 到了collect这里 at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowActivity$initView$1$1$1$1.invokeSuspend(KotlinFlowActivity.kt:55) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69) at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:376) at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30) at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25) at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:110) at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126) at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56) at kotlinx.coroutines.BuildersKt.launch(Unknown Source:1) at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47) at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source:1) at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowActivity.initView$lambda-0(KotlinFlowActivity.kt:39) // lifecycleScope.launch at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowActivity.$r8$lambda$l0YhxSN3b9XO48WjJgeEDhKBz1g(Unknown Source:0) at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowActivity$$ExternalSyntheticLambda0.onClick(Unknown Source:2) at android.view.View.performClick(View.java:7448) at android.view.View.performClickInternal(View.java:7425) at android.view.View.access$3600(View.java:810) at android.view.View$PerformClick.run(View.java:28305) at android.os.Handler.handleCallback(Handler.java:938) at android.os.Handler.dispatchMessage(Handler.java:99) // dispatchMessage at android.os.Looper.loop(Looper.java:223) at android.app.ActivityThread.main(ActivityThread.java:7656) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:592) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:947) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@b417ad2, Dispatchers.Main.immediate] 一直将异常抛出到了ZygoteInit.main方法内,这将导致app崩溃退出。
使用 flow 构建器,生产者无法 emit 来自不同 CoroutineContext 的值。因此,不要通过创建新协程或使用 withContext 代码块在不同的 CoroutineContext 中调用 emit 。在这些情况下,可以使用其他流程构建器,例如 callbackFlow 。
下面请看错误示范:
fun errorUseFlow1(): Flow = flow { emit(1) // 下面这种是错误的用法 withContext(Dispatchers.IO) { emit(2) } } 当我开始collect时,发现崩溃了,日志如下:
java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [StandaloneCoroutine{Active}@f3c5f9, Dispatchers.Main.immediate], but emission happened in [DispatchedCoroutine{Active}@5b3593e, Dispatchers.IO]. Please refer to 'flow' documentation or use 'flowOn' instead at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:85) at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:106) at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:83) at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66) at com.xfhy.allinone.kotlin.coroutine.flow.KotlinFlowViewModel$errorUseFlow1$1$1.invokeSuspend(KotlinFlowViewModel.kt:54) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42) at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:749) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@47e4abb, Dispatchers.Main.immediate] 大概意思是违反了Flow的不可变性,Flow是在StandaloneCoroutine环境中收集的,但是却在DispatchedCoroutine环境发射,让你用flowOn代替。正确写法是用flowOn来切换协程环境。
LiveData仍然是Java项目、Android初学者、简单场景下的最佳选择。
对于除上面以外的其他情况,官方是建议使用Kotlin Flow。但是学习Kotlin Flow需要一些学习时间,但它是Kotlin语言的一部分,谷歌很挺这个东西。我去简单看了下NowInAndroid这个项目(该项目是谷歌官方的一个Android demo实战项目,功能齐全、完全使用Kotlin和Compose。遵循 Android 设计和开发最佳实践,旨在为开发人员提供有用的参考),我发现里面已经完全没有在使用LiveData了,全是Flow和各种Hilt依赖注入。理论上LiveData能做的事,Kotlin Flow也能做;LiveData不能做或者做起来比较困难的事,Kotlin Flow也能做。
我们可以打开LiveData官网,即使是2024年,谷歌也没有将它标记为过时,它仍然是非常棒的选择。现在和将来很长一段时间理论上都不会被标记为过时,毕竟还有那么多Java项目,而且主要是用起来也非常简单方便。从谷歌2022年的一个采访(Architecture: Live Q&A - MAD Skills)里面可以看出,谷歌意思是你想用LiveData就继续用,当然,更推荐你用Kotlin Flow。
至于要不要迁移,我的理解是,老代码就不动它。新代码,喜欢就可以用Flow,不喜欢就还是可以继续用LiveData。
好了,文章比较长,咱们再来回忆一下,主要介绍了Kotlin Flow的相关知识,包括基本概念、基本使用、实际应用以及一些需要注意的问题。Kotlin Flow是Kotlin协程的一部分,用于处理异步数据流,它相比LiveData和RxJava具有诸多优势,如更自然的协程支持、简单的语法、内存安全、更好的错误处理等。在基本使用方面,介绍了Flow的创建、消费、操作符、类型以及如何将回调转换为Flow、让Flow具备生命周期感知能力和处理配置变更问题等。在实际应用中,展示了Flow在请求网络、与Room结合使用以及替代LiveData解决问题等场景的用法。接着还有一些细节方面的讨论,如StateFlow无法连续emit的解决办法、多次连续的collect可能导致的问题、Flow中异常的处理、使用flow构建器的注意事项以及Flow和LiveData的选择和迁移问题。