Observable
, but with a more Kotlin - native approach, Flow simplifies handling asynchronous data sequences. This blog post aims to provide an in - depth understanding of Kotlin Flow, including core concepts, typical usage scenarios, and best practices.A Flow
is an asynchronous stream of values. It can emit zero or more values over time and then complete normally or with an exception. Flows are cold, which means that until a terminal operator like collect
is called, no data is produced.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..3) {
// Simulate some asynchronous work
kotlinx.coroutines.delay(100)
emit(i)
}
}
fun main() = runBlocking {
simpleFlow().collect { value ->
println(value)
}
}
In the above code, the flow
builder is used to create a new Flow
. The emit
function is used to send values to the flow, and the collect
function is a terminal operator that subscribes to the flow and processes each emitted value.
Kotlin Flow provides a rich set of operators for transforming, filtering, and combining flows. For example, the map
operator can be used to transform each value in the flow:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..3) {
kotlinx.coroutines.delay(100)
emit(i)
}
}
fun main() = runBlocking {
simpleFlow()
.map { it * 2 }
.collect { value ->
println(value)
}
}
Flows respect the cancellation of coroutines. If the coroutine that is collecting the flow is cancelled, the flow stops producing values.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..Int.MAX_VALUE) {
kotlinx.coroutines.delay(100)
emit(i)
}
}
fun main() {
val scope = CoroutineScope(Dispatchers.Default)
val job = scope.launch {
simpleFlow().collect { value ->
println(value)
if (value == 3) {
scope.cancel() // Cancel the coroutine
}
}
}
runBlocking {
job.join()
}
}
When fetching data from an API that can return multiple pages or updates over time, Kotlin Flow can be used to handle the stream of data.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import retrofit2.Retrofit
import retrofit2.converter.gson.GsonConverterFactory
import retrofit2.http.GET
interface ApiService {
@GET("data")
suspend fun getData(): List<String>
}
fun fetchDataFlow(): Flow<List<String>> = flow {
val retrofit = Retrofit.Builder()
.baseUrl("https://example.com/")
.addConverterFactory(GsonConverterFactory.create())
.build()
val apiService = retrofit.create(ApiService::class.java)
repeat(3) {
kotlinx.coroutines.delay(1000)
val data = apiService.getData()
emit(data)
}
}
Kotlin Flow can be used to update the UI in a reactive way. For example, when observing changes in a database or network state, the UI can be updated whenever new data is available.
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import androidx.lifecycle.lifecycleScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val uiUpdateFlow: Flow<String> = flow {
repeat(5) {
delay(1000)
emit("Update $it")
}
}
lifecycleScope.launch {
uiUpdateFlow.collect { update ->
// Update the UI here
println(update)
}
}
}
}
Use catch
operator to handle errors in the flow.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.runBlocking
fun errorProneFlow(): Flow<Int> = flow {
for (i in 1..3) {
if (i == 2) {
throw RuntimeException("Error at 2")
}
emit(i)
}
}
fun main() = runBlocking {
errorProneFlow()
.catch { e -> println("Caught error: ${e.message}") }
.collect { value ->
println(value)
}
}
When dealing with a flow that produces values faster than the collector can consume, use operators like buffer
, conflate
, or flowOn
to handle backpressure.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.runBlocking
fun highSpeedFlow(): Flow<Int> = flow {
for (i in 1..100) {
kotlinx.coroutines.delay(10)
emit(i)
}
}
fun main() = runBlocking {
highSpeedFlow()
.buffer()
.collect { value ->
kotlinx.coroutines.delay(100)
println(value)
}
}
Here is a more comprehensive example that combines multiple concepts:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.runBlocking
fun complexFlow(): Flow<Int> = flow {
for (i in 1..5) {
if (i == 3) {
throw RuntimeException("Error at 3")
}
kotlinx.coroutines.delay(100)
emit(i)
}
}
fun main() = runBlocking {
complexFlow()
.map { it * 2 }
.catch { e -> println("Caught error: ${e.message}") }
.collect { value ->
println(value)
}
}
Kotlin Flow is a powerful tool for handling asynchronous streams of data in Kotlin. Its core concepts, such as cold flows, operators, and cancellation, provide a flexible and robust way to work with data. With typical usage scenarios like data fetching and UI updates, and best practices for error handling and backpressure, developers can effectively use Kotlin Flow in their projects.