Kotlin Flow for Reactive Programming
Flow is Kotlin’s implementation of reactive streams, allowing you to handle asynchronous data streams in a reactive way. It’s built on top of coroutines and provides a powerful way to handle streams of data.
Basic Flow
Creating a Flow
// Basic flow
val flow = flow {
emit(1)
emit(2)
emit(3)
}
// Flow from collection
val listFlow = listOf(1, 2, 3).asFlow()
// Flow from range
val rangeFlow = (1..3).asFlow()
Collecting from Flow
// Basic collection
flow.collect { value ->
println(value)
}
// Using launchIn
flow.launchIn(scope)
// Using collectLatest
flow.collectLatest { value ->
// Handle latest value
}
Flow Operators
Transformation Operators
// map
flow.map { it * 2 }
// filter
flow.filter { it > 0 }
// transform
flow.transform { value ->
emit(value)
emit(value * 2)
}
Combination Operators
// zip
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf("a", "b", "c")
flow1.zip(flow2) { number, letter ->
"$number$letter"
}
// combine
flow1.combine(flow2) { number, letter ->
"$number$letter"
}
Flow Context
FlowOn Operator
flow
.map { /* CPU intensive work */ }
.flowOn(Dispatchers.Default)
.collect { /* UI work */ }
Buffer
flow
.buffer()
.collect { value ->
// Process values
}
Error Handling
Try-Catch
try {
flow.collect { value ->
// Process value
}
} catch (e: Exception) {
// Handle error
}
Catch Operator
flow
.catch { e ->
emit(-1) // Emit fallback value
}
.collect { value ->
// Process value
}
StateFlow and SharedFlow
StateFlow
val stateFlow = MutableStateFlow(0)
// Update value
stateFlow.value = 1
// Collect values
stateFlow.collect { value ->
// Handle value
}
SharedFlow
val sharedFlow = MutableSharedFlow<Int>()
// Emit value
sharedFlow.emit(1)
// Collect values
sharedFlow.collect { value ->
// Handle value
}
Common Use Cases
Network Calls
fun fetchUsers(): Flow<List<User>> = flow {
val users = api.getUsers()
emit(users)
}.flowOn(Dispatchers.IO)
Database Operations
fun observeUsers(): Flow<List<User>> = flow {
val users = database.userDao().getAllUsers()
emit(users)
}.flowOn(Dispatchers.IO)
UI Events
val searchQuery = MutableStateFlow("")
searchQuery
.debounce(300)
.distinctUntilChanged()
.collect { query ->
// Perform search
}
Best Practices
-
Use appropriate operators
// Good flow .filter { it > 0 } .map { it * 2 } .collect { /* ... */ } // Avoid flow.collect { value -> if (value > 0) { val doubled = value * 2 // Process doubled value } }
-
Handle errors properly
// Good flow .catch { e -> // Handle error } .collect { /* ... */ } // Avoid try { flow.collect { /* ... */ } } catch (e: Exception) { // Handle error }
-
Use appropriate scope
// Good flow.launchIn(viewModelScope) // Avoid flow.launchIn(GlobalScope)
Advanced Features
Custom Operators
fun <T> Flow<T>.throttleFirst(periodMillis: Long): Flow<T> = flow {
var lastEmissionTime = 0L
collect { value ->
val currentTime = System.currentTimeMillis()
if (currentTime - lastEmissionTime >= periodMillis) {
emit(value)
lastEmissionTime = currentTime
}
}
}
Testing Flow
@Test
fun testFlow() = runTest {
val flow = flowOf(1, 2, 3)
val result = mutableListOf<Int>()
flow.toList(result)
assertEquals(listOf(1, 2, 3), result)
}
Conclusion
Kotlin Flow helps you:
- Handle asynchronous data streams
- Implement reactive programming patterns
- Manage UI state effectively
- Handle complex data transformations
Remember:
- Use appropriate operators
- Handle errors properly
- Consider backpressure
- Test your flows
Stay tuned for our next post where we’ll explore Object-Oriented Programming in Kotlin!