The Reactive Streams specification is a standard for asynchronous stream processing with non - blocking backpressure. It defines four interfaces: Publisher
, Subscriber
, Subscription
, and Processor
.
Publisher
: Represents a source of data that can emit a sequence of elements to one or more subscribers.Subscriber
: Represents a consumer of data that receives elements from a publisher.Subscription
: Represents a contract between a publisher and a subscriber, allowing the subscriber to request a specific number of elements from the publisher and cancel the subscription.Processor
: Represents both a publisher and a subscriber, allowing it to transform the data stream.Kotlin provides the Flow
type, which is an implementation of reactive streams. A Flow
is a cold stream, meaning that the data is not produced until a collector subscribes to it. Here is a simple example of creating and collecting a Flow
:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
// Create a Flow that emits integers from 1 to 5
fun myFlow(): Flow<Int> = flow {
for (i in 1..5) {
// Simulate some asynchronous work
kotlinx.coroutines.delay(100)
emit(i)
}
}
fun main() = runBlocking {
myFlow().collect { value ->
println(value)
}
}
In this example, the flow
builder is used to create a Flow
. The emit
function is used to send elements downstream. The collect
function is used to subscribe to the Flow
and consume the elements.
Kotlin reactive streams provide a rich set of operators to transform, filter, and combine Flow
objects. Here are some commonly used operators:
map
: Transforms each element in the Flow
using a given function.import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking
fun myFlow(): Flow<Int> = flow {
for (i in 1..5) {
kotlinx.coroutines.delay(100)
emit(i)
}
}
fun main() = runBlocking {
myFlow()
.map { it * 2 }
.collect { value ->
println(value)
}
}
filter
: Filters the elements in the Flow
based on a given predicate.import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.runBlocking
fun myFlow(): Flow<Int> = flow {
for (i in 1..5) {
kotlinx.coroutines.delay(100)
emit(i)
}
}
fun main() = runBlocking {
myFlow()
.filter { it % 2 == 0 }
.collect { value ->
println(value)
}
}
When fetching data from multiple sources asynchronously, Kotlin reactive streams can be used to handle the data in a non - blocking way. For example, fetching data from multiple REST APIs:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay
// Simulate an API call
fun fetchDataFromApi1(): Flow<String> = flow {
delay(200)
emit("Data from API 1")
}
fun fetchDataFromApi2(): Flow<String> = flow {
delay(300)
emit("Data from API 2")
}
fun main() = runBlocking {
val combinedFlow = merge(fetchDataFromApi1(), fetchDataFromApi2())
combinedFlow.collect { data ->
println(data)
}
}
In event - driven systems, reactive streams can be used to handle events asynchronously. For example, handling user interface events in an Android application:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay
// Simulate UI events
fun uiEvents(): Flow<String> = flow {
for (i in 1..3) {
delay(200)
emit("Event $i")
}
}
fun main() = runBlocking {
uiEvents().collect { event ->
println("Handling event: $event")
}
}
Kotlin reactive streams can be used to perform parallel processing on a data stream. The flowOn
operator can be used to specify the coroutine context in which the Flow
should be executed.
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay
fun parallelFlow(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking {
parallelFlow().collect { value ->
println("Processed value: $value")
}
}
Error handling in Kotlin reactive streams is important to ensure the stability of the application. The catch
operator can be used to handle exceptions in the Flow
:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.runBlocking
fun errorProneFlow(): Flow<Int> = flow {
for (i in 1..3) {
if (i == 2) {
throw RuntimeException("Something went wrong")
}
emit(i)
}
}
fun main() = runBlocking {
errorProneFlow()
.catch { e ->
println("Caught exception: ${e.message}")
}
.collect { value ->
println(value)
}
}
When working with resources such as database connections or file handles, it is important to ensure that the resources are properly managed. The onCompletion
operator can be used to perform cleanup operations when the Flow
is completed:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlocking
fun resourceFlow(): Flow<Int> = flow {
println("Opening resource")
for (i in 1..3) {
emit(i)
}
}.onCompletion {
println("Closing resource")
}
fun main() = runBlocking {
resourceFlow().collect { value ->
println(value)
}
}
Testing Kotlin reactive streams can be done using the TestCoroutineScope
and the TestCollector
provided by the Kotlin coroutines testing library. Here is an example of testing a Flow
:
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
@ExperimentalCoroutinesApi
class FlowTest {
fun myFlow(): Flow<Int> = flow {
for (i in 1..3) {
emit(i)
}
}
@Test
fun testMyFlow() = TestCoroutineScope().runBlockingTest {
val result = mutableListOf<Int>()
myFlow().collect { value ->
result.add(value)
}
assertEquals(listOf(1, 2, 3), result)
}
}
Kotlin reactive streams provide a powerful and flexible way to handle asynchronous and event - driven operations. With its support for backpressure, rich set of operators, and seamless integration with Kotlin coroutines, it is a great choice for building modern, responsive applications. By understanding the core concepts, typical usage scenarios, and best practices, intermediate - to - advanced software engineers can effectively apply Kotlin reactive streams in their projects.