RxJava to Kotlin Coroutines: The Ultimate Migration Guide
As a developer working in large legacy applications, I’ve had the pleasure (and sometimes pain) of leading the modernization of large codebases that relied heavily on RxJava for their reactive programming. This modernization effort revolved heavily around translating RxJava code into equivalent Coroutines and Flow. I learned a lot during the migrations, and I’m going to share everything I’ve learned in this article.
The focus of this article is to provide a comprehensive guide for developers to use when migrating legacy RxJava code to Kotlin Coroutines and Flow. My hope is to document the common use cases of RxJava and provide a step by step and thorough guide to translating RxJava code into the more modern and native Kotlin Coroutines and Flow.
Learning
The first and most important step to migrating RxJava to coroutines is quite obviously learning how to use Coroutines and Flow. To this end, I have some resources that I have found helpful in my learning of Coroutines and Flow.
Dependencies
In order to use Kotlin Coroutines and Flow in your applications, you will need several dependencies, outlined below with descriptions of what they provide so you can determine which you need. Of course, the versions may change by the time you read this, so use your best judgment there.
dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0-RC" // Essential to use Coroutines
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9" // Essential to use Coroutines in Android
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.6.0-RC" // Utilities for interop with RxJava and Coroutines
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.8.1" // Utilities for interop with LiveData and Coroutines
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0-RC" // Essential to use Coroutines in your tests
testImplementation "app.cash.turbine:turbine:1.1.0" // Library to simplify testing of Flows
}
Special Discussion on Single
and Coroutines
Before we get to the nitty gritty, we need to place special emphasis on something that I’ve seen many developers struggle with when adopting coroutines after using RxJava for years.
There will never be a situation where you convert an RxJava
Single
to aFlow
. Every time you see RxJava'sSingle
type, it is always directly equivalent to a suspend function or plain coroutine.
This is by far the most common anti-pattern that I’ve seen developers struggle to understand.
If I ever catch any of you converting a Single
to a Flow
I will summon the Coroutines Gods down to smite thee immediately :)
CoroutineScope == CompositeDisposable
For all practical purposes, you can imagine that CoroutineScope
is directly equivalent to RxJava’s CompositeDisposable
.
Both CompositeDisposable
and CoroutineScope
serve the exact same purpose. They both boil down to a container to track active resources and cancel them at the appropriate time. So for all migration purposes, it is useful to treat them as directly equivalent.
The CoroutineScope
interface does provide some additional information via itsCoroutineContext
, but an explanation of that structure is likely better served by reading the learning material linked above.
TL;DR
RxJava’s CompositeDisposable.dispose()
is directly equivalent to Coroutines'CoroutineScope.cancel()
Interop with RxJava
Before we get into a migration strategy, we need to cover the equivalencies between all of the RxJava data types and how to convert back and forth between RxJava and Coroutines.
The next few sections should serve as a reference for you while doing your development to help remember which scenarios require which solutions.
There are 3 distinct scenarios in which you may find yourself during your migrations:
- Re-writing RxJava as Coroutines code.
- Bridging RxJava code to work within your Coroutines code.
- Bridging Coroutines code to work within your RxJava code.
Scenario 1: Re-writing RxJava as Coroutines
This is the most typical scenario, but perhaps the least simple. The table below provides a concise reference for how to convert each data type.
Scenario 2: Integrating RxJava into Coroutines code
In this scenario, there may be some RxJava code that you don’t have access to, and as such, you cannot directly convert it to Coroutines. In this case, there are very simple operators that allow you to integrate RxJava directly into your Coroutines. The table below provides a concise reference for how to convert each data type.
Scenario 3: Integrating Coroutines into RxJava code
In this scenario, you may need to bridge some existing Coroutines code to work with some RxJava code that you just don’t have the bandwidth to migrate yet. In which case, there are very simple operators available to integrate Coroutines directly into your RxJava code. The table below provides a concise reference for how to convert each data type.
Interop with LiveData
As you get to the end of each phase of your migration, you’ll likely be replacing LiveData
with Flow
. This step is not strictly mandatory in order to migrate to Coroutines and Flow, but it is nice to be able to rely on the same data structure throughout your app, from the network layer to the UI layer. It can be tedious forwarding events between LiveData and other streams, so having one data structure tends to simplify things.
Just as with RxJava, there are 3 distinct scenarios in which you may find yourself when converting LiveData
to Flow
:
- Replacing LiveData with Flow
- Convert a LiveData to a Flow
- Convert a Flow to a LiveData
Scenario 1: Replacing LiveData with Flow
In the ideal case, you own all the code and you can directly replace a LiveData with a Flow. In this case, you should almost always be converting all of your LiveData to StateFlow.
StateFlow has some important differences from LiveData to be aware of as you do your conversions:
- It requires a default value. This allows StateFlow to ensure there is always something for consumers to consume.
- It has
distinctUntilChanged
inherently applied to all emissions. - StateFlow is not inherently lifecycle-aware, so your collection must be done in a lifecycle-aware way. Shown below is a very simple way to accomplish this:
viewModel.someDataFlow
// flowWithLifecycle ensures emissions are only received during the specified lifecycle state
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.onEach {
// Process emission
}.launchIn(lifecycleScope)
Scenario 2: Convert a LiveData to a Flow
Sometimes, you will find that you cannot change a particular LiveData because you don’t have access to it or it may be just too impactful for the effort you’re alotted. In which case, you have the .asFlow
operator which will convert that particular LiveData directly to a flow. You also have the option of using the .stateIn(viewModelScope)
or .shareIn(viewModelScope)
operators if you need to convert to a StateFlow or SharedFlow.
Scenario 3: Convert a Flow to a LiveData
Sometimes, you will find that you have some API that requires you to use a LiveData even though your code is all using Flow. In this case, you can just use the .asLiveData()
operator available to bridge that gap.
Approaching your migration
Now that we’ve covered exactly how to convert each RxJava type to Coroutines and vice versa, as well as LiveData, we should have all the tools we need to start our migration. Anything beyond what was covered above will have been covered by your courses on Coroutines and Flow.
The most important thing to remember about the migration is that it is exceedingly easy to perform the migration in very small pieces. With large applications, it is simply not possible to migrate everything all at once. It may be a years-long process of phasing out RxJava support, so the fantastic interop support is what really makes the migration possible.
Given the extensive and simple interoperability between Coroutines/Flow and RxJava and LiveData, there should few to no obstacles to truly prevent you from migrating your application piece-by-piece.
Start with the service layer
Starting your migration with the service layer simplifies many things. This allows us to begin our migration at the root of the reactive chain and work our way up to the UI layer in steps.
This guide is going to assume you are using Retrofit and OkHttp for your network layer, as that is by far the most common pattern in use in Android Architecture.
I recommend that you start small with the least impactful service to get your feet wet. Follow that service all the way all the way up to the UI layer as a POC to work out the kinks before you convert more and more impactful services. There is little benefit to migrating your entire data layer at once.
Converting Retrofit Interfaces to Coroutines
A typical Retrofit service interface implementation with RxJava will look something like this:
interface MyService {
@Post("/my/service/url/foo/bar")
fun getSomeData(@Query("id") id: Int): Single<SomeData>
}
Converting this to use Coroutines is very simple. The only things that need to be modified are:
- Add the suspend modifier
- Return the data directly instead of wrapping with
Single
interface MyService {
@Post("/my/service/url/foo/bar")
suspend fun getSomeData(@Query("id") id: Int): SomeData
}
Fixing the broken service consumers (temporarily)
Now, obviously, you’ve just broken the contract for the consumers of your services. Your repositories are going to have compilation errors now. I recommend you stop your migration here and resolve those errors, and run your application to validate things are still working as expected.
Resolving the error is as simple as wrapping the call to the broken method with the Kotlin operatorrxSingle
.
import kotlinx.coroutines.rx2.rxSingle
class MyRepository @Inject constructor(
myService: MyService
) {
fun getSomeData(id: Int): Single<SomeData> {
return rxSingle {
myService.getSomeData(id)
}
}
}
Once you’ve validated that your app still works with these changes, we can move on to the next steps.
Work your way up the data layer
Every application’s data layer is going to look different, and so I can’t give you the silver bullet step-by-step guide to converting your application’s data layer specifically.
But let’s imagine an application with a very simple data layer that consists of a Repository that holds 2 services with RxJava:
class MyRepository @Inject constructor(
private val myFirstService: MyFirstService,
private val mySecondService: MySecondService
) {
fun getSomeData(): Single<Int> {
return myFirstService
.getSomeData()
.flatMap { id ->
mySecondService.getMoreData(id)
}
}
}
As you can see, we’re using the result of one service call to make a new service call.
Converting this to use Coroutines becomes as simple as the following:
class MyRepository @Inject constructor(
private val myFirstService: MyFirstService,
private val mySecondService: MySecondService
) {
suspend fun getSomeData(): Int {
val firstResponse = myFirstService.getSomeData()
return mySecondService.getMoreData(firstResponse)
}
}
As you can see, this is much easier to read and has fewer lines than its RxJava counterpart.
Stop!
Once you’ve made your way through the data layer and you’re comfortable with the changes you’ve made (and resolved compilation errors), I strongly recommend stopping here before moving into your business layer. Take some time and do some regression testing on your features to ensure there are no strange or subtle bugs happening.
Moving On to the Business Layer
For this tutorial, I will assume the next logical step of your migration would be your ViewModel. But, of course, your architecture may differ so use your best judgment on where to start in your business layer. For example, if you’re implementing Clean Architecture then the next step would likely be to convert some associated UseCase implementations.
viewModelScope
ViewModels have built in support for coroutines, so you should not need to manually manage any CoroutineScope
cancellation in your ViewModel. From within your ViewModel, you have access to a viewModelScope
property. This property is a CorutineScope
instance managed by the androidx libraries “automagically”. So the viewModelScope
is guaranteed to always be cancelled upon destruction of your ViewModel instance. This ensures cancellation of any associated resources (provided you abide by cooperative cancellation in your coroutines).
ViewModel with RxJava
Suppose you have a pretty simple ViewModel that uses 2 repositories to provide some data to your view. It has one repository with an Observable
stream of data, and each time it emits, we want to launch a service call from a different repository to get some related data.
Let’s take a look at what that would look like with RxJava:
class MyViewModel @Inject constructor(
private val myFirstRepository: MyFirstRepository,
private val mySecondRepository: MySecondRepository
): ViewModel() {
private val disposableContainer = CompositeDisposable()
private val _state = MutableLiveData<UiState>(UiState.Loading)
val state: LiveData<UiState> = _state
init {
disposableContainer.add(
myFirstRepository.data
.flatMapSingle {
mySecondRepository.makeSomeServiceCall(it)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
_state.value = UiState.Success(it)
}
)
}
fun loadData(id: Int) {
myFirstRepository.init(id)
}
override fun onCleared() {
disposableContainer.dispose()
}
}
Now let’s take a look at the relevant aspects of this to understand how things will change:
As you can see in the example above, we have to manually track the Disposable
created by subscribing to the Observable
from MyFirstRepository
. We then need to make sure we override the onCleared()
function so we can dispose of the CompositeDisposable
. Also note that we are forced to specify the relevant schedulers (in reality there are ways around using observeOn
if we use postValue
instead of .value
but this is for demonstration purposes).
Same ViewModel with Coroutines
Now let’s take a look at how things change with Coroutines:
class MyViewModel @Inject constructor(
private val myFirstRepository: MyFirstRepository,
private val mySecondRepository: MySecondRepository
): ViewModel() {
private val _state = MutableStateFlow<UiState>(UiState.Loading)
val state: StateFlow<UiState> = _state
init {
myFirstRepository.data
.map {
mySecondRepository.makeSomeServiceCall(it)
}
.onEach { data ->
_state.update { UiState.Success(data) }
}.launchIn(viewModelScope)
}
fun loadData(id: Int) {
viewModelScope.launch {
myFirstRepository.init(id)
}
}
}
At first glance, you can see how there is much less code to maintain here. That is one of the great parts of using Coroutines. It tends to reduce the amount of code you need to write to accomplish a given task.
Looking closer, let’s see what has changed:
- We no longer need a disposable container, since the
viewModelScope
is is managed “automagically”. We don’t need to maintain our own mechanism to dispose active resources - We also don’t need to remember to use
addDisposable
or any equivalent, sinceviewModelScope
tracks resources for us and we’ve deleteddisposableContainer
- We no longer need to override
onCleared()
becauseviewModelScope
is managed for us, as mentioned several times already - We do not need to manually provide any schedulers (although you can, it is strongly recommended that you leave
Dispatcher
switching to theIO
thread within your data layer, not your business layer) - We are processing emissions in our
onEach
method instead of asubscribe
block. This ensures that any exceptions are caught by downstreamcatch
operators and it reduces the bracket-hell that can come with usingcollect
- We’ve changed
LiveData
toStateFlow
. There are some minor differences to be aware of, but for the majority of cases a direct replacement is perfectly advisable - We are using
viewModelScope.launch
to invokemyFirstRepository.init(id)
. This is because theinit
method is now a suspend function, whereas it was aCompletable
function before.
Updating your ViewModel to use Coroutines is largely an exercise in applying the RxJava Interop section outlined above, so be sure you are referring to that section frequently to find equivalencies.
Stop!
Just as we had done with the data layer, this is an important place to stop and do some regression testing to ensure your code still works the way you expect it to. It’s important to try to avoid biting off more than you can chew at one time!
Once you’ve resolved all compilation errors and you’re confident you haven’t broken your app, it’s time to move on to the final piece of the puzzle!
Finally, Migrating your View layer
Now that we’ve integrated coroutines into our data layer and our business layer, it’s time to start wrapping things up with the final piece of the puzzle: the View layer.
Implementations of the View layer tend to vary wildly, with many different technologies being used. However, there are 2 major cases that create distinct dichotomies: XML Views and Jetpack Compose.
For this article, I am going to assume that you are using XML views. Since you’re still using RxJava, it’s probably a safe assumption most of your code is also using XML views.
Fragments and Activities
Fragments and Activities function largely the same in the context of Coroutines. Just as with ViewModels and viewModelScope
, we are provided with a very convenient lifecycleScope
instance that manages CoroutineScope
cancellation for us!
lifecycleScope
You will often find yourself needing to observe emissions from a Flow
with coroutines, and in order to do so you can use the lifecycleScope
as your CoroutineScope
very similarly to the way you use viewModelScope
.
Let’s consider the ViewModel discussed previously. I’ll simplify the boilerplate like ViewModel factory and view manipulation, etc. Below is how this would look in a Fragment:
class MyFragment: Fragment() {
lateinit var viewModel: MyViewModel
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewModel.state
.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
.onEach {
when (it) {
UiState.Success -> handleSuccess(it)
UiState.Loading -> handleLoading(it)
UiState.Error -> handleError(it)
}
}
.launchIn(lifecycleScope)
}
}
Let’s analyze the important aspects of this:
- Most importantly
flowWithLifecycle
. This ensures that we only receive emissions from the underlyingFlow
while the Fragment is in theStarted
state. This gives us lifecycle-aware emissions, just asLiveData
would give us. - We are using
lifecycleScope
to manage the cancellation of our resources.
XML DataBinding
Data binding supports only the usage of StateFlow
at the time of writing this article. So you cannot use SharedFlow
or Flow
. You can use StateFlow
in exactly the same way you would have used LiveData
Conclusion
Migrating away from RxJava to use Coroutines is a great way to modernize your application. And with all the memory benefits gained by the efficiency of Coroutines’ usage of Threads, you may even encounter performance improvements. At the very least, your code will be much simpler and easier to maintain for years to come!
Thanks for reading, and I am always open to feedback or corrections, so if you found this article helpful, please leave a like and a comment to help me get some visibility! :)