Merge "Add RxJava 3 support to Room" into androidx-master-dev
diff --git a/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt b/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
index 65197d9..94fd25a 100644
--- a/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
+++ b/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
@@ -115,6 +115,8 @@
     prebuilts(LibraryGroups.REMOTECALLBACK, "1.0.0-alpha02")
     // TODO: Remove this once b/157899389 is resolved
     ignore(LibraryGroups.ROOM.group, "room-compiler")
+    // TODO: Remove during release phase of rxjava3 artifact
+    ignore(LibraryGroups.ROOM.group, "room-rxjava3")
     prebuilts(LibraryGroups.ROOM, "2.3.0-alpha01")
     prebuilts(LibraryGroups.SAVEDSTATE, "1.1.0-alpha01")
     // TODO: Remove this ignore once androidx.security:security-biometric:1.0.0-alpha01 is released
diff --git a/jetifier/jetifier/core/src/main/resources/default.config b/jetifier/jetifier/core/src/main/resources/default.config
index 9b14a9d..3669e48 100644
--- a/jetifier/jetifier/core/src/main/resources/default.config
+++ b/jetifier/jetifier/core/src/main/resources/default.config
@@ -1266,6 +1266,10 @@
             "to" : "androidx/room/rxjava2"
         },
         {
+            "from" : "android/arch/persistence/room/rxjava3",
+            "to" : "androidx/room/rxjava3"
+        },
+        {
             "from" : "android/arch/persistence/room/guava",
             "to" : "androidx/room/guava"
         },
diff --git a/jetifier/jetifier/core/src/main/resources/default.generated.config b/jetifier/jetifier/core/src/main/resources/default.generated.config
index 4887f7c..031a53c 100644
--- a/jetifier/jetifier/core/src/main/resources/default.generated.config
+++ b/jetifier/jetifier/core/src/main/resources/default.generated.config
@@ -1217,6 +1217,10 @@
       "to": "androidx/room/rxjava2"
     },
     {
+      "from": "android/arch/persistence/room/rxjava3",
+      "to": "androidx/room/rxjava3"
+    },
+    {
       "from": "android/arch/persistence/room/guava",
       "to": "androidx/room/guava"
     },
diff --git a/jetifier/jetifier/migration.config b/jetifier/jetifier/migration.config
index 0e7bb80..527d14e 100644
--- a/jetifier/jetifier/migration.config
+++ b/jetifier/jetifier/migration.config
@@ -1247,6 +1247,10 @@
       "to": "androidx/room/rxjava2"
     },
     {
+      "from": "android/arch/persistence/room/rxjava3",
+      "to": "androidx/room/rxjava3"
+    },
+    {
       "from": "android/arch/persistence/room/guava",
       "to": "androidx/room/guava"
     },
@@ -2827,6 +2831,18 @@
     {
       "from": {
         "groupId": "android.arch.persistence.room",
+        "artifactId": "rxjava3",
+        "version": "{oldRoomVersion}"
+      },
+      "to": {
+        "groupId": "androidx.room",
+        "artifactId": "room-rxjava3",
+        "version": "{newRoomVersion}"
+      }
+    },
+    {
+      "from": {
+        "groupId": "android.arch.persistence.room",
         "artifactId": "testing",
         "version": "{oldRoomVersion}"
       },
diff --git a/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt b/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
index e01dc6d..6e24603 100644
--- a/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
@@ -146,6 +146,14 @@
     val COMPLETABLE = ClassName.get("io.reactivex", "Completable")
 }
 
+object RxJava3TypeNames {
+    val FLOWABLE = ClassName.get("io.reactivex.rxjava3.core", "Flowable")
+    val OBSERVABLE = ClassName.get("io.reactivex.rxjava3.core", "Observable")
+    val MAYBE = ClassName.get("io.reactivex.rxjava3.core", "Maybe")
+    val SINGLE = ClassName.get("io.reactivex.rxjava3.core", "Single")
+    val COMPLETABLE = ClassName.get("io.reactivex.rxjava3.core", "Completable")
+}
+
 object ReactiveStreamsTypeNames {
     val PUBLISHER = ClassName.get("org.reactivestreams", "Publisher")
 }
@@ -161,6 +169,14 @@
     val RX_EMPTY_RESULT_SET_EXCEPTION = ClassName.get(ROOM_PACKAGE, "EmptyResultSetException")
 }
 
+object RoomRxJava3TypeNames {
+    val RX_ROOM = ClassName.get("$ROOM_PACKAGE.rxjava3", "RxRoom")
+    val RX_ROOM_CREATE_FLOWABLE = "createFlowable"
+    val RX_ROOM_CREATE_OBSERVABLE = "createObservable"
+    val RX_EMPTY_RESULT_SET_EXCEPTION =
+        ClassName.get("$ROOM_PACKAGE.rxjava3", "EmptyResultSetException")
+}
+
 object RoomCoroutinesTypeNames {
     val COROUTINES_ROOM = ClassName.get(ROOM_PACKAGE, "CoroutinesRoom")
 }
diff --git a/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt b/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
index d78b24f..26e89bd 100644
--- a/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
@@ -30,7 +30,6 @@
 import androidx.room.vo.CustomTypeConverter
 import androidx.room.vo.Field
 import com.squareup.javapoet.TypeName
-import java.lang.StringBuilder
 import javax.lang.model.element.ElementKind
 
 object ProcessorErrors {
@@ -567,6 +566,9 @@
     val MISSING_ROOM_RXJAVA2_ARTIFACT = "To use RxJava2 features, you must add `rxjava2`" +
             " artifact from Room as a dependency. androidx.room:room-rxjava2:<version>"
 
+    val MISSING_ROOM_RXJAVA3_ARTIFACT = "To use RxJava3 features, you must add `rxjava3`" +
+            " artifact from Room as a dependency. androidx.room:room-rxjava3:<version>"
+
     val MISSING_ROOM_COROUTINE_ARTIFACT = "To use Coroutine features, you must add `ktx`" +
             " artifact from Room as a dependency. androidx.room:room-ktx:<version>"
 
diff --git a/room/compiler/src/main/kotlin/androidx/room/processor/TransactionMethodProcessor.kt b/room/compiler/src/main/kotlin/androidx/room/processor/TransactionMethodProcessor.kt
index 7467f5b..8f987b9 100644
--- a/room/compiler/src/main/kotlin/androidx/room/processor/TransactionMethodProcessor.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/processor/TransactionMethodProcessor.kt
@@ -19,6 +19,7 @@
 import androidx.room.ext.GuavaUtilConcurrentTypeNames
 import androidx.room.ext.LifecyclesTypeNames
 import androidx.room.ext.RxJava2TypeNames
+import androidx.room.ext.RxJava3TypeNames
 import androidx.room.ext.findKotlinDefaultImpl
 import androidx.room.ext.hasAnyOf
 import androidx.room.vo.TransactionMethod
@@ -85,6 +86,11 @@
             RxJava2TypeNames.MAYBE,
             RxJava2TypeNames.SINGLE,
             RxJava2TypeNames.COMPLETABLE,
+            RxJava3TypeNames.FLOWABLE,
+            RxJava3TypeNames.OBSERVABLE,
+            RxJava3TypeNames.MAYBE,
+            RxJava3TypeNames.SINGLE,
+            RxJava3TypeNames.COMPLETABLE,
             GuavaUtilConcurrentTypeNames.LISTENABLE_FUTURE)
     }
 }
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/RxTypes.kt b/room/compiler/src/main/kotlin/androidx/room/solver/RxTypes.kt
index fd9c3b2..9bc4c56 100644
--- a/room/compiler/src/main/kotlin/androidx/room/solver/RxTypes.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/RxTypes.kt
@@ -17,7 +17,9 @@
 package androidx.room.solver
 
 import androidx.room.ext.RoomRxJava2TypeNames
+import androidx.room.ext.RoomRxJava3TypeNames
 import androidx.room.ext.RxJava2TypeNames
+import androidx.room.ext.RxJava3TypeNames
 import androidx.room.processor.ProcessorErrors
 import com.squareup.javapoet.ClassName
 
@@ -27,6 +29,7 @@
     val factoryMethodName: String? = null,
     val canBeNull: Boolean = false
 ) {
+    // RxJava2 types
     RX2_FLOWABLE(
         version = RxVersion.TWO,
         className = RxJava2TypeNames.FLOWABLE,
@@ -44,9 +47,28 @@
         canBeNull = true),
     RX2_COMPLETABLE(
         version = RxVersion.TWO,
-        className = RxJava2TypeNames.COMPLETABLE);
+        className = RxJava2TypeNames.COMPLETABLE),
+    // RxJava3 types
+    RX3_FLOWABLE(
+        version = RxVersion.THREE,
+        className = RxJava3TypeNames.FLOWABLE,
+        factoryMethodName = RoomRxJava3TypeNames.RX_ROOM_CREATE_FLOWABLE),
+    RX3_OBSERVABLE(
+        version = RxVersion.THREE,
+        className = RxJava3TypeNames.OBSERVABLE,
+        factoryMethodName = RoomRxJava3TypeNames.RX_ROOM_CREATE_OBSERVABLE),
+    RX3_SINGLE(
+        version = RxVersion.THREE,
+        className = RxJava3TypeNames.SINGLE),
+    RX3_MAYBE(
+        version = RxVersion.THREE,
+        className = RxJava3TypeNames.MAYBE,
+        canBeNull = true),
+    RX3_COMPLETABLE(
+        version = RxVersion.THREE,
+        className = RxJava3TypeNames.COMPLETABLE);
 
-    fun isSingle() = this == RX2_SINGLE
+    fun isSingle() = this == RX2_SINGLE || this == RX3_SINGLE
 }
 
 internal enum class RxVersion(
@@ -58,4 +80,8 @@
         rxRoomClassName = RoomRxJava2TypeNames.RX_ROOM,
         emptyResultExceptionClassName = RoomRxJava2TypeNames.RX_EMPTY_RESULT_SET_EXCEPTION,
         missingArtifactMessage = ProcessorErrors.MISSING_ROOM_RXJAVA2_ARTIFACT),
+    THREE(
+        rxRoomClassName = RoomRxJava3TypeNames.RX_ROOM,
+        emptyResultExceptionClassName = RoomRxJava3TypeNames.RX_EMPTY_RESULT_SET_EXCEPTION,
+        missingArtifactMessage = ProcessorErrors.MISSING_ROOM_RXJAVA3_ARTIFACT);
 }
\ No newline at end of file
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxCallableQueryResultBinderProvider.kt b/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxCallableQueryResultBinderProvider.kt
index 9cc4717..5808a3f 100644
--- a/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxCallableQueryResultBinderProvider.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxCallableQueryResultBinderProvider.kt
@@ -55,7 +55,9 @@
     companion object {
         fun getAll(context: Context) = listOf(
             RxType.RX2_SINGLE,
-            RxType.RX2_MAYBE
+            RxType.RX2_MAYBE,
+            RxType.RX3_SINGLE,
+            RxType.RX3_MAYBE
         ).map { RxCallableQueryResultBinderProvider(context, it) }
     }
 }
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxQueryResultBinderProvider.kt b/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxQueryResultBinderProvider.kt
index 7861ec7..a96b7d8 100644
--- a/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxQueryResultBinderProvider.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/RxQueryResultBinderProvider.kt
@@ -71,7 +71,9 @@
     companion object {
         fun getAll(context: Context) = listOf(
             RxType.RX2_FLOWABLE,
-            RxType.RX2_OBSERVABLE
+            RxType.RX2_OBSERVABLE,
+            RxType.RX3_FLOWABLE,
+            RxType.RX3_OBSERVABLE
         ).map { RxQueryResultBinderProvider(context, it) }
     }
 }
\ No newline at end of file
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/prepared/binderprovider/RxPreparedQueryResultBinderProvider.kt b/room/compiler/src/main/kotlin/androidx/room/solver/prepared/binderprovider/RxPreparedQueryResultBinderProvider.kt
index 57cef99..c1ade4a 100644
--- a/room/compiler/src/main/kotlin/androidx/room/solver/prepared/binderprovider/RxPreparedQueryResultBinderProvider.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/prepared/binderprovider/RxPreparedQueryResultBinderProvider.kt
@@ -17,7 +17,6 @@
 package androidx.room.solver.prepared.binderprovider
 
 import androidx.room.ext.L
-import androidx.room.ext.RxJava2TypeNames
 import androidx.room.ext.T
 import androidx.room.ext.typeName
 import androidx.room.parser.ParsedQuery
@@ -65,7 +64,10 @@
         fun getAll(context: Context) = listOf(
             RxPreparedQueryResultBinderProvider(context, RxType.RX2_SINGLE),
             RxPreparedQueryResultBinderProvider(context, RxType.RX2_MAYBE),
-            RxCompletablePreparedQueryResultBinderProvider(context, RxType.RX2_COMPLETABLE)
+            RxCompletablePreparedQueryResultBinderProvider(context, RxType.RX2_COMPLETABLE),
+            RxPreparedQueryResultBinderProvider(context, RxType.RX3_SINGLE),
+            RxPreparedQueryResultBinderProvider(context, RxType.RX3_MAYBE),
+            RxCompletablePreparedQueryResultBinderProvider(context, RxType.RX3_COMPLETABLE)
         )
     }
 }
@@ -77,7 +79,7 @@
 
     private val completableType: TypeMirror? by lazy {
         context.processingEnv.elementUtils
-            .getTypeElement(RxJava2TypeNames.COMPLETABLE.toString())?.asType()
+            .getTypeElement(rxType.className.toString())?.asType()
     }
 
     override fun matches(declared: DeclaredType): Boolean {
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableDeleteOrUpdateMethodBinderProvider.kt b/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableDeleteOrUpdateMethodBinderProvider.kt
index 67bb1f8..0a9518c 100644
--- a/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableDeleteOrUpdateMethodBinderProvider.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableDeleteOrUpdateMethodBinderProvider.kt
@@ -17,7 +17,6 @@
 package androidx.room.solver.shortcut.binderprovider
 
 import androidx.room.ext.L
-import androidx.room.ext.RxJava2TypeNames
 import androidx.room.ext.T
 import androidx.room.ext.typeName
 import androidx.room.processor.Context
@@ -61,7 +60,10 @@
         fun getAll(context: Context) = listOf(
             RxCallableDeleteOrUpdateMethodBinderProvider(context, RxType.RX2_SINGLE),
             RxCallableDeleteOrUpdateMethodBinderProvider(context, RxType.RX2_MAYBE),
-            RxCompletableDeleteOrUpdateMethodBinderProvider(context, RxType.RX2_COMPLETABLE)
+            RxCompletableDeleteOrUpdateMethodBinderProvider(context, RxType.RX2_COMPLETABLE),
+            RxCallableDeleteOrUpdateMethodBinderProvider(context, RxType.RX3_SINGLE),
+            RxCallableDeleteOrUpdateMethodBinderProvider(context, RxType.RX3_MAYBE),
+            RxCompletableDeleteOrUpdateMethodBinderProvider(context, RxType.RX3_COMPLETABLE)
         )
     }
 }
@@ -73,7 +75,7 @@
 
     private val completableTypeMirror: TypeMirror? by lazy {
         context.processingEnv.elementUtils
-                .getTypeElement(RxJava2TypeNames.COMPLETABLE.toString())?.asType()
+                .getTypeElement(rxType.className.toString())?.asType()
     }
 
     /**
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableInsertMethodBinderProvider.kt b/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableInsertMethodBinderProvider.kt
index 14c2ead..923a748 100644
--- a/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableInsertMethodBinderProvider.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/shortcut/binderprovider/RxCallableInsertMethodBinderProvider.kt
@@ -64,7 +64,10 @@
         fun getAll(context: Context) = listOf(
             RxCallableInsertMethodBinderProvider(context, RxType.RX2_SINGLE),
             RxCallableInsertMethodBinderProvider(context, RxType.RX2_MAYBE),
-            RxCompletableInsertMethodBinderProvider(context, RxType.RX2_COMPLETABLE)
+            RxCompletableInsertMethodBinderProvider(context, RxType.RX2_COMPLETABLE),
+            RxCallableInsertMethodBinderProvider(context, RxType.RX3_SINGLE),
+            RxCallableInsertMethodBinderProvider(context, RxType.RX3_MAYBE),
+            RxCompletableInsertMethodBinderProvider(context, RxType.RX3_COMPLETABLE)
         )
     }
 }
diff --git a/room/compiler/src/test/data/common/input/Rx3Room.java b/room/compiler/src/test/data/common/input/Rx3Room.java
new file mode 100644
index 0000000..5609338
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/Rx3Room.java
@@ -0,0 +1,5 @@
+// mock rx3 helper
+package androidx.room.rxjava3;
+
+class RxRoom {
+}
diff --git a/room/compiler/src/test/data/common/input/rxjava3/Completable.java b/room/compiler/src/test/data/common/input/rxjava3/Completable.java
new file mode 100644
index 0000000..bce2a20
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/rxjava3/Completable.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.core;
+
+import java.util.concurrent.Callable;
+
+public abstract class Completable {
+    public static Completable fromCallable(Callable callable) {return null;}
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/rxjava3/Flowable.java b/room/compiler/src/test/data/common/input/rxjava3/Flowable.java
new file mode 100644
index 0000000..091f2ad
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/rxjava3/Flowable.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.core;
+
+import org.reactivestreams.Publisher;
+
+public abstract class Flowable<T> implements Publisher<T> {
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/rxjava3/Maybe.java b/room/compiler/src/test/data/common/input/rxjava3/Maybe.java
new file mode 100644
index 0000000..c11fdc2
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/rxjava3/Maybe.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.core;
+
+import java.util.concurrent.Callable;
+
+public abstract class Maybe<T> {
+    public static <T> Maybe<T> fromCallable(Callable callable) {return null;}
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/rxjava3/Observable.java b/room/compiler/src/test/data/common/input/rxjava3/Observable.java
new file mode 100644
index 0000000..2e8696a
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/rxjava3/Observable.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.core;
+
+public abstract class Observable<T> {
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/rxjava3/Single.java b/room/compiler/src/test/data/common/input/rxjava3/Single.java
new file mode 100644
index 0000000..9ddcee9
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/rxjava3/Single.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.core;
+
+import java.util.concurrent.Callable;
+
+public abstract class Single<T> {
+    public static <T> Single<T> fromCallable(Callable callable) {return null;}
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/kotlin/androidx/room/processor/InsertionMethodProcessorTest.kt b/room/compiler/src/test/kotlin/androidx/room/processor/InsertionMethodProcessorTest.kt
index c230f81..3c76e8d 100644
--- a/room/compiler/src/test/kotlin/androidx/room/processor/InsertionMethodProcessorTest.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/processor/InsertionMethodProcessorTest.kt
@@ -22,6 +22,7 @@
 import androidx.room.OnConflictStrategy
 import androidx.room.ext.CommonTypeNames
 import androidx.room.ext.RxJava2TypeNames
+import androidx.room.ext.RxJava3TypeNames
 import androidx.room.ext.typeName
 import androidx.room.solver.shortcut.result.InsertMethodAdapter
 import androidx.room.testing.TestInvocation
@@ -413,7 +414,17 @@
                 Pair("${RxJava2TypeNames.MAYBE}<Long>",
                         InsertMethodAdapter.InsertionType.INSERT_SINGLE_ID),
                 Pair("${RxJava2TypeNames.MAYBE}<List<Long>>",
-                        InsertMethodAdapter.InsertionType.INSERT_ID_LIST)
+                        InsertMethodAdapter.InsertionType.INSERT_ID_LIST),
+                Pair(RxJava3TypeNames.COMPLETABLE,
+                    InsertMethodAdapter.InsertionType.INSERT_VOID_OBJECT),
+                Pair("${RxJava3TypeNames.SINGLE}<Long>",
+                    InsertMethodAdapter.InsertionType.INSERT_SINGLE_ID),
+                Pair("${RxJava3TypeNames.SINGLE}<List<Long>>",
+                    InsertMethodAdapter.InsertionType.INSERT_ID_LIST),
+                Pair("${RxJava3TypeNames.MAYBE}<Long>",
+                    InsertMethodAdapter.InsertionType.INSERT_SINGLE_ID),
+                Pair("${RxJava3TypeNames.MAYBE}<List<Long>>",
+                    InsertMethodAdapter.InsertionType.INSERT_ID_LIST)
         ).forEach { pair ->
             val dots = if (pair.second in setOf(
                             InsertMethodAdapter.InsertionType.INSERT_ID_LIST,
@@ -740,8 +751,9 @@
         return assertAbout(JavaSourcesSubjectFactory.javaSources())
                 .that(listOf(JavaFileObjects.forSourceString("foo.bar.MyClass",
                         DAO_PREFIX + input.joinToString("\n") + DAO_SUFFIX),
-                        COMMON.USER, COMMON.BOOK, COMMON.NOT_AN_ENTITY,
-                        COMMON.COMPLETABLE, COMMON.MAYBE, COMMON.SINGLE) + additionalJFOs
+                        COMMON.USER, COMMON.BOOK, COMMON.NOT_AN_ENTITY, COMMON.RX2_COMPLETABLE,
+                        COMMON.RX2_MAYBE, COMMON.RX2_SINGLE, COMMON.RX3_COMPLETABLE,
+                        COMMON.RX3_MAYBE, COMMON.RX3_SINGLE) + additionalJFOs
                 )
                 .processedWith(TestProcessor.builder()
                         .forAnnotations(Insert::class, Dao::class)
diff --git a/room/compiler/src/test/kotlin/androidx/room/processor/ShortcutMethodProcessorTest.kt b/room/compiler/src/test/kotlin/androidx/room/processor/ShortcutMethodProcessorTest.kt
index f82a516..2a18a51 100644
--- a/room/compiler/src/test/kotlin/androidx/room/processor/ShortcutMethodProcessorTest.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/processor/ShortcutMethodProcessorTest.kt
@@ -21,6 +21,7 @@
 import androidx.room.ext.CommonTypeNames
 import androidx.room.ext.GuavaUtilConcurrentTypeNames
 import androidx.room.ext.RxJava2TypeNames
+import androidx.room.ext.RxJava3TypeNames
 import androidx.room.ext.typeName
 import androidx.room.testing.TestInvocation
 import androidx.room.testing.TestProcessor
@@ -141,6 +142,9 @@
                 "${RxJava2TypeNames.SINGLE}<Integer>",
                 "${RxJava2TypeNames.MAYBE}<Integer>",
                 RxJava2TypeNames.COMPLETABLE,
+                "${RxJava3TypeNames.SINGLE}<Integer>",
+                "${RxJava3TypeNames.MAYBE}<Integer>",
+                RxJava3TypeNames.COMPLETABLE,
                 "${GuavaUtilConcurrentTypeNames.LISTENABLE_FUTURE}<Integer>"
         ).forEach { type ->
             singleShortcutMethod(
@@ -244,6 +248,9 @@
                 "${RxJava2TypeNames.SINGLE}<Integer>",
                 "${RxJava2TypeNames.MAYBE}<Integer>",
                 RxJava2TypeNames.COMPLETABLE,
+                "${RxJava3TypeNames.SINGLE}<Integer>",
+                "${RxJava3TypeNames.MAYBE}<Integer>",
+                RxJava3TypeNames.COMPLETABLE,
                 "${GuavaUtilConcurrentTypeNames.LISTENABLE_FUTURE}<Integer>"
         ).forEach { type ->
             singleShortcutMethod(
@@ -453,8 +460,10 @@
         return Truth.assertAbout(JavaSourcesSubjectFactory.javaSources())
                 .that(listOf(JavaFileObjects.forSourceString("foo.bar.MyClass",
                         DAO_PREFIX + input.joinToString("\n") + DAO_SUFFIX
-                ), COMMON.USER, COMMON.BOOK, COMMON.NOT_AN_ENTITY, COMMON.COMPLETABLE, COMMON.MAYBE,
-                    COMMON.SINGLE, COMMON.LISTENABLE_FUTURE, COMMON.GUAVA_ROOM) + additionalJFOs)
+                ), COMMON.USER, COMMON.BOOK, COMMON.NOT_AN_ENTITY, COMMON.RX2_COMPLETABLE,
+                    COMMON.RX2_MAYBE, COMMON.RX2_SINGLE, COMMON.RX3_COMPLETABLE,
+                    COMMON.RX3_MAYBE, COMMON.RX3_SINGLE, COMMON.LISTENABLE_FUTURE,
+                    COMMON.GUAVA_ROOM) + additionalJFOs)
                 .processedWith(TestProcessor.builder()
                         .forAnnotations(annotation, Dao::class)
                         .nextRunHandler { invocation ->
diff --git a/room/compiler/src/test/kotlin/androidx/room/processor/TransactionMethodProcessorTest.kt b/room/compiler/src/test/kotlin/androidx/room/processor/TransactionMethodProcessorTest.kt
index c42a7cc..d3f1046 100644
--- a/room/compiler/src/test/kotlin/androidx/room/processor/TransactionMethodProcessorTest.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/processor/TransactionMethodProcessorTest.kt
@@ -43,7 +43,6 @@
                 import androidx.room.*;
                 import java.util.*;
                 import androidx.lifecycle.*;
-                import io.reactivex.*;
                 import com.google.common.util.concurrent.*;
                 @Dao
                 abstract class MyClass {
@@ -101,11 +100,11 @@
     }
 
     @Test
-    fun deferredReturnType_flowable() {
+    fun deferredReturnType_rx2_flowable() {
         singleTransactionMethod(
             """
                 @Transaction
-                public Flowable<String> doInTransaction(int param) { return null; }
+                public io.reactivex.Flowable<String> doInTransaction(int param) { return null; }
                 """) { transaction, _ ->
             assertThat(transaction.name, `is`("doInTransaction"))
         }.failsToCompile()
@@ -117,11 +116,29 @@
     }
 
     @Test
-    fun deferredReturnType_completable() {
+    fun deferredReturnType_rx3_flowable() {
         singleTransactionMethod(
             """
                 @Transaction
-                public Completable doInTransaction(int param) { return null; }
+                public io.reactivex.rxjava3.core.Flowable<String> doInTransaction(int param) { 
+                    return null; 
+                }
+                """) { transaction, _ ->
+            assertThat(transaction.name, `is`("doInTransaction"))
+        }.failsToCompile()
+            .withErrorContaining(
+                ProcessorErrors.transactionMethodAsync(
+                    "io.reactivex.rxjava3.core.Flowable"
+                )
+            )
+    }
+
+    @Test
+    fun deferredReturnType_rx2_completable() {
+        singleTransactionMethod(
+            """
+                @Transaction
+                public io.reactivex.Completable doInTransaction(int param) { return null; }
                 """) { transaction, _ ->
             assertThat(transaction.name, `is`("doInTransaction"))
         }.failsToCompile()
@@ -133,11 +150,29 @@
     }
 
     @Test
-    fun deferredReturnType_single() {
+    fun deferredReturnType_rx3_completable() {
         singleTransactionMethod(
             """
                 @Transaction
-                public Single<String> doInTransaction(int param) { return null; }
+                public io.reactivex.rxjava3.core.Completable doInTransaction(int param) { 
+                    return null;
+                }
+                """) { transaction, _ ->
+            assertThat(transaction.name, `is`("doInTransaction"))
+        }.failsToCompile()
+            .withErrorContaining(
+                ProcessorErrors.transactionMethodAsync(
+                    "io.reactivex.rxjava3.core.Completable"
+                )
+            )
+    }
+
+    @Test
+    fun deferredReturnType_rx2_single() {
+        singleTransactionMethod(
+            """
+                @Transaction
+                public io.reactivex.Single<String> doInTransaction(int param) { return null; }
                 """) { transaction, _ ->
             assertThat(transaction.name, `is`("doInTransaction"))
         }.failsToCompile()
@@ -149,6 +184,24 @@
     }
 
     @Test
+    fun deferredReturnType_rx3_single() {
+        singleTransactionMethod(
+            """
+                @Transaction
+                public io.reactivex.rxjava3.core.Single<String> doInTransaction(int param) {
+                    return null;
+                }
+                """) { transaction, _ ->
+            assertThat(transaction.name, `is`("doInTransaction"))
+        }.failsToCompile()
+            .withErrorContaining(
+                ProcessorErrors.transactionMethodAsync(
+                    "io.reactivex.rxjava3.core.Single"
+                )
+            )
+    }
+
+    @Test
     fun deferredReturnType_listenableFuture() {
         singleTransactionMethod(
             """
@@ -175,8 +228,9 @@
                 .that(listOf(JavaFileObjects.forSourceString("foo.bar.MyClass",
                         TransactionMethodProcessorTest.DAO_PREFIX + input.joinToString("\n") +
                                 TransactionMethodProcessorTest.DAO_SUFFIX
-                ), COMMON.LIVE_DATA, COMMON.FLOWABLE, COMMON.PUBLISHER, COMMON.COMPLETABLE,
-                    COMMON.SINGLE, COMMON.LISTENABLE_FUTURE))
+                ), COMMON.LIVE_DATA, COMMON.RX2_FLOWABLE, COMMON.PUBLISHER, COMMON.RX2_COMPLETABLE,
+                    COMMON.RX2_SINGLE, COMMON.RX3_FLOWABLE, COMMON.RX3_COMPLETABLE,
+                    COMMON.RX3_SINGLE, COMMON.LISTENABLE_FUTURE))
                 .processedWith(TestProcessor.builder()
                         .forAnnotations(Transaction::class, Dao::class)
                         .nextRunHandler { invocation ->
diff --git a/room/compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt b/room/compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt
index 6c79762..d7145df 100644
--- a/room/compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt
@@ -27,6 +27,7 @@
 import androidx.room.ext.ReactiveStreamsTypeNames
 import androidx.room.ext.RoomTypeNames.STRING_UTIL
 import androidx.room.ext.RxJava2TypeNames
+import androidx.room.ext.RxJava3TypeNames
 import androidx.room.ext.T
 import androidx.room.ext.typeName
 import androidx.room.parser.SQLTypeAffinity
@@ -232,8 +233,8 @@
     }
 
     @Test
-    fun testMissingRxRoom() {
-        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.FLOWABLE)) { invocation ->
+    fun testMissingRx2Room() {
+        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.RX2_FLOWABLE)) { invocation ->
             val publisherElement = invocation.processingEnv.elementUtils
                     .getTypeElement(ReactiveStreamsTypeNames.PUBLISHER.toString())
             assertThat(publisherElement, notNullValue())
@@ -245,87 +246,130 @@
     }
 
     @Test
-    fun testFindPublisher() {
-        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.FLOWABLE, COMMON.RX2_ROOM)) {
-            invocation ->
-            val publisher = invocation.processingEnv.elementUtils
-                    .getTypeElement(ReactiveStreamsTypeNames.PUBLISHER.toString())
-            assertThat(publisher, notNullValue())
+    fun testMissingRx3Room() {
+        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.RX3_FLOWABLE)) { invocation ->
+            val publisherElement = invocation.processingEnv.elementUtils
+                .getTypeElement(ReactiveStreamsTypeNames.PUBLISHER.toString())
+            assertThat(publisherElement, notNullValue())
             assertThat(
                 RxQueryResultBinderProvider.getAll(invocation.context).any {
-                    it.matches(MoreTypes.asDeclared(publisher.asType()))
+                    it.matches(MoreTypes.asDeclared(publisherElement.asType()))
                 }, `is`(true))
-        }.compilesWithoutError()
+        }.failsToCompile().withErrorContaining(ProcessorErrors.MISSING_ROOM_RXJAVA3_ARTIFACT)
+    }
+
+    @Test
+    fun testFindPublisher() {
+        listOf(
+            COMMON.RX2_FLOWABLE to COMMON.RX2_ROOM,
+            COMMON.RX3_FLOWABLE to COMMON.RX3_ROOM
+        ).forEach { (rxTypeSrc, rxRoomSrc) ->
+            simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, rxTypeSrc, rxRoomSrc)) {
+                    invocation ->
+                val publisher = invocation.processingEnv.elementUtils
+                    .getTypeElement(ReactiveStreamsTypeNames.PUBLISHER.toString())
+                assertThat(publisher, notNullValue())
+                assertThat(
+                    RxQueryResultBinderProvider.getAll(invocation.context).any {
+                        it.matches(MoreTypes.asDeclared(publisher.asType()))
+                    }, `is`(true))
+            }.compilesWithoutError()
+        }
     }
 
     @Test
     fun testFindFlowable() {
-        simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, COMMON.FLOWABLE, COMMON.RX2_ROOM)) {
-            invocation ->
-            val flowable = invocation.processingEnv.elementUtils
-                    .getTypeElement(RxJava2TypeNames.FLOWABLE.toString())
-            assertThat(flowable, notNullValue())
-            assertThat(
-                RxQueryResultBinderProvider.getAll(invocation.context).any {
-                    it.matches(MoreTypes.asDeclared(flowable.asType()))
-                }, `is`(true))
-        }.compilesWithoutError()
+        listOf(
+            Triple(COMMON.RX2_FLOWABLE, COMMON.RX2_ROOM, RxJava2TypeNames.FLOWABLE),
+            Triple(COMMON.RX3_FLOWABLE, COMMON.RX3_ROOM, RxJava3TypeNames.FLOWABLE)
+        ).forEach { (rxTypeSrc, rxRoomSrc, rxTypeClassName) ->
+            simpleRun(jfos = *arrayOf(COMMON.PUBLISHER, rxTypeSrc, rxRoomSrc)) {
+                invocation ->
+                val flowable = invocation.processingEnv.elementUtils
+                        .getTypeElement(rxTypeClassName.toString())
+                assertThat(flowable, notNullValue())
+                assertThat(
+                    RxQueryResultBinderProvider.getAll(invocation.context).any {
+                        it.matches(MoreTypes.asDeclared(flowable.asType()))
+                    }, `is`(true))
+            }.compilesWithoutError()
+        }
     }
 
     @Test
     fun testFindObservable() {
-        simpleRun(jfos = *arrayOf(COMMON.OBSERVABLE, COMMON.RX2_ROOM)) {
-            invocation ->
-            val observable = invocation.processingEnv.elementUtils
-                    .getTypeElement(RxJava2TypeNames.OBSERVABLE.toString())
-            assertThat(observable, notNullValue())
-            assertThat(
-                RxQueryResultBinderProvider.getAll(invocation.context).any {
-                    it.matches(MoreTypes.asDeclared(observable.asType()))
-                }, `is`(true))
-        }.compilesWithoutError()
+        listOf(
+            Triple(COMMON.RX2_OBSERVABLE, COMMON.RX2_ROOM, RxJava2TypeNames.OBSERVABLE),
+            Triple(COMMON.RX3_OBSERVABLE, COMMON.RX3_ROOM, RxJava3TypeNames.OBSERVABLE)
+        ).forEach { (rxTypeSrc, rxRoomSrc, rxTypeClassName) ->
+            simpleRun(jfos = *arrayOf(rxTypeSrc, rxRoomSrc)) {
+                invocation ->
+                val observable = invocation.processingEnv.elementUtils
+                        .getTypeElement(rxTypeClassName.toString())
+                assertThat(observable, notNullValue())
+                assertThat(
+                    RxQueryResultBinderProvider.getAll(invocation.context).any {
+                        it.matches(MoreTypes.asDeclared(observable.asType()))
+                    }, `is`(true))
+            }.compilesWithoutError()
+        }
     }
 
     @Test
     fun testFindInsertSingle() {
-        simpleRun(jfos = *arrayOf(COMMON.SINGLE)) {
-            invocation ->
-            val single = invocation.processingEnv.elementUtils
-                    .getTypeElement(RxJava2TypeNames.SINGLE.toString())
-            assertThat(single, notNullValue())
-            assertThat(
-                RxCallableInsertMethodBinderProvider.getAll(invocation.context).any {
-                    it.matches(MoreTypes.asDeclared(single.asType()))
-                }, `is`(true))
-        }.compilesWithoutError()
+        listOf(
+            Triple(COMMON.RX2_SINGLE, COMMON.RX2_ROOM, RxJava2TypeNames.SINGLE),
+            Triple(COMMON.RX3_SINGLE, COMMON.RX3_ROOM, RxJava3TypeNames.SINGLE)
+        ).forEach { (rxTypeSrc, _, rxTypeClassName) ->
+            simpleRun(jfos = *arrayOf(rxTypeSrc)) {
+                invocation ->
+                val single = invocation.processingEnv.elementUtils
+                        .getTypeElement(rxTypeClassName.toString())
+                assertThat(single, notNullValue())
+                assertThat(
+                    RxCallableInsertMethodBinderProvider.getAll(invocation.context).any {
+                        it.matches(MoreTypes.asDeclared(single.asType()))
+                    }, `is`(true))
+            }.compilesWithoutError()
+        }
     }
 
     @Test
     fun testFindInsertMaybe() {
-        simpleRun(jfos = *arrayOf(COMMON.MAYBE)) {
-            invocation ->
-            val maybe = invocation.processingEnv.elementUtils
-                    .getTypeElement(RxJava2TypeNames.MAYBE.toString())
-            assertThat(maybe, notNullValue())
-            assertThat(
-                RxCallableInsertMethodBinderProvider.getAll(invocation.context).any {
-                    it.matches(MoreTypes.asDeclared(maybe.asType()))
-                }, `is`(true))
-        }.compilesWithoutError()
+        listOf(
+            Triple(COMMON.RX2_MAYBE, COMMON.RX2_ROOM, RxJava2TypeNames.MAYBE),
+            Triple(COMMON.RX3_MAYBE, COMMON.RX3_ROOM, RxJava3TypeNames.MAYBE)
+        ).forEach { (rxTypeSrc, _, rxTypeClassName) ->
+            simpleRun(jfos = *arrayOf(rxTypeSrc)) {
+                invocation ->
+                val maybe = invocation.processingEnv.elementUtils
+                        .getTypeElement(rxTypeClassName.toString())
+                assertThat(maybe, notNullValue())
+                assertThat(
+                    RxCallableInsertMethodBinderProvider.getAll(invocation.context).any {
+                        it.matches(MoreTypes.asDeclared(maybe.asType()))
+                    }, `is`(true))
+            }.compilesWithoutError()
+        }
     }
 
     @Test
     fun testFindInsertCompletable() {
-        simpleRun(jfos = *arrayOf(COMMON.COMPLETABLE)) {
-            invocation ->
-            val completable = invocation.processingEnv.elementUtils
-                    .getTypeElement(RxJava2TypeNames.COMPLETABLE.toString())
-            assertThat(completable, notNullValue())
-            assertThat(
-                RxCallableInsertMethodBinderProvider.getAll(invocation.context).any {
-                    it.matches(MoreTypes.asDeclared(completable.asType()))
-                }, `is`(true))
-        }.compilesWithoutError()
+        listOf(
+            Triple(COMMON.RX2_COMPLETABLE, COMMON.RX2_ROOM, RxJava2TypeNames.COMPLETABLE),
+            Triple(COMMON.RX3_COMPLETABLE, COMMON.RX3_ROOM, RxJava3TypeNames.COMPLETABLE)
+        ).forEach { (rxTypeSrc, _, rxTypeClassName) ->
+            simpleRun(jfos = *arrayOf(rxTypeSrc)) {
+                invocation ->
+                val completable = invocation.processingEnv.elementUtils
+                        .getTypeElement(rxTypeClassName.toString())
+                assertThat(completable, notNullValue())
+                assertThat(
+                    RxCallableInsertMethodBinderProvider.getAll(invocation.context).any {
+                        it.matches(MoreTypes.asDeclared(completable.asType()))
+                    }, `is`(true))
+            }.compilesWithoutError()
+        }
     }
 
     @Test
@@ -342,7 +386,7 @@
 
     @Test
     fun testFindDeleteOrUpdateSingle() {
-        simpleRun(jfos = *arrayOf(COMMON.SINGLE)) {
+        simpleRun(jfos = *arrayOf(COMMON.RX2_SINGLE)) {
             invocation ->
             val single = invocation.processingEnv.elementUtils
                     .getTypeElement(RxJava2TypeNames.SINGLE.toString())
@@ -356,7 +400,7 @@
 
     @Test
     fun testFindDeleteOrUpdateMaybe() {
-        simpleRun(jfos = *arrayOf(COMMON.MAYBE)) {
+        simpleRun(jfos = *arrayOf(COMMON.RX2_MAYBE)) {
             invocation ->
             val maybe = invocation.processingEnv.elementUtils
                     .getTypeElement(RxJava2TypeNames.MAYBE.toString())
@@ -370,7 +414,7 @@
 
     @Test
     fun testFindDeleteOrUpdateCompletable() {
-        simpleRun(jfos = *arrayOf(COMMON.COMPLETABLE)) {
+        simpleRun(jfos = *arrayOf(COMMON.RX2_COMPLETABLE)) {
             invocation ->
             val completable = invocation.processingEnv.elementUtils
                     .getTypeElement(RxJava2TypeNames.COMPLETABLE.toString())
diff --git a/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt b/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt
index 45c259d..b50ec13 100644
--- a/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt
@@ -23,7 +23,9 @@
 import androidx.room.ext.ReactiveStreamsTypeNames
 import androidx.room.ext.RoomGuavaTypeNames
 import androidx.room.ext.RoomRxJava2TypeNames
+import androidx.room.ext.RoomRxJava3TypeNames
 import androidx.room.ext.RxJava2TypeNames
+import androidx.room.ext.RxJava3TypeNames
 import androidx.room.processor.DatabaseViewProcessor
 import androidx.room.processor.TableEntityProcessor
 import androidx.room.solver.CodeGenScope
@@ -100,23 +102,23 @@
         loadJavaCode("common/input/reactivestreams/Publisher.java",
                 ReactiveStreamsTypeNames.PUBLISHER.toString())
     }
-    val FLOWABLE by lazy {
+    val RX2_FLOWABLE by lazy {
         loadJavaCode("common/input/rxjava2/Flowable.java",
                 RxJava2TypeNames.FLOWABLE.toString())
     }
-    val OBSERVABLE by lazy {
+    val RX2_OBSERVABLE by lazy {
         loadJavaCode("common/input/rxjava2/Observable.java",
                 RxJava2TypeNames.OBSERVABLE.toString())
     }
-    val SINGLE by lazy {
+    val RX2_SINGLE by lazy {
         loadJavaCode("common/input/rxjava2/Single.java",
                 RxJava2TypeNames.SINGLE.toString())
     }
-    val MAYBE by lazy {
+    val RX2_MAYBE by lazy {
         loadJavaCode("common/input/rxjava2/Maybe.java",
                 RxJava2TypeNames.MAYBE.toString())
     }
-    val COMPLETABLE by lazy {
+    val RX2_COMPLETABLE by lazy {
         loadJavaCode("common/input/rxjava2/Completable.java",
                 RxJava2TypeNames.COMPLETABLE.toString())
     }
@@ -125,6 +127,31 @@
         loadJavaCode("common/input/Rx2Room.java", RoomRxJava2TypeNames.RX_ROOM.toString())
     }
 
+    val RX3_FLOWABLE by lazy {
+        loadJavaCode("common/input/rxjava3/Flowable.java",
+            RxJava3TypeNames.FLOWABLE.toString())
+    }
+    val RX3_OBSERVABLE by lazy {
+        loadJavaCode("common/input/rxjava3/Observable.java",
+            RxJava3TypeNames.OBSERVABLE.toString())
+    }
+    val RX3_SINGLE by lazy {
+        loadJavaCode("common/input/rxjava3/Single.java",
+            RxJava3TypeNames.SINGLE.toString())
+    }
+    val RX3_MAYBE by lazy {
+        loadJavaCode("common/input/rxjava3/Maybe.java",
+            RxJava3TypeNames.MAYBE.toString())
+    }
+    val RX3_COMPLETABLE by lazy {
+        loadJavaCode("common/input/rxjava3/Completable.java",
+            RxJava3TypeNames.COMPLETABLE.toString())
+    }
+
+    val RX3_ROOM by lazy {
+        loadJavaCode("common/input/Rx3Room.java", RoomRxJava3TypeNames.RX_ROOM.toString())
+    }
+
     val DATA_SOURCE_FACTORY by lazy {
         loadJavaCode("common/input/DataSource.java", "androidx.paging.DataSource")
     }
diff --git a/room/compiler/src/test/kotlin/androidx/room/writer/DaoWriterTest.kt b/room/compiler/src/test/kotlin/androidx/room/writer/DaoWriterTest.kt
index 4eb97e5..7d247b5 100644
--- a/room/compiler/src/test/kotlin/androidx/room/writer/DaoWriterTest.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/writer/DaoWriterTest.kt
@@ -75,10 +75,10 @@
     private fun singleDao(vararg jfo: JavaFileObject): CompileTester {
         return Truth.assertAbout(JavaSourcesSubjectFactory.javaSources())
                 .that(jfo.toList() + COMMON.USER + COMMON.MULTI_PKEY_ENTITY + COMMON.BOOK +
-                        COMMON.LIVE_DATA + COMMON.COMPUTABLE_LIVE_DATA + COMMON.SINGLE +
-                        COMMON.MAYBE + COMMON.COMPLETABLE + COMMON.USER_SUMMARY + COMMON.RX2_ROOM +
-                        COMMON.PARENT + COMMON.CHILD1 + COMMON.CHILD2 + COMMON.INFO +
-                        COMMON.LISTENABLE_FUTURE + COMMON.GUAVA_ROOM)
+                        COMMON.LIVE_DATA + COMMON.COMPUTABLE_LIVE_DATA + COMMON.RX2_SINGLE +
+                        COMMON.RX2_MAYBE + COMMON.RX2_COMPLETABLE + COMMON.USER_SUMMARY +
+                        COMMON.RX2_ROOM + COMMON.PARENT + COMMON.CHILD1 + COMMON.CHILD2 +
+                        COMMON.INFO + COMMON.LISTENABLE_FUTURE + COMMON.GUAVA_ROOM)
                 .processedWith(TestProcessor.builder()
                         .forAnnotations(androidx.room.Dao::class)
                         .nextRunHandler { invocation ->
diff --git a/room/integration-tests/testapp/build.gradle b/room/integration-tests/testapp/build.gradle
index a664d56..cfe99c8 100644
--- a/room/integration-tests/testapp/build.gradle
+++ b/room/integration-tests/testapp/build.gradle
@@ -92,6 +92,7 @@
 
     androidTestImplementation(project(":room:room-testing"))
     androidTestImplementation(project(":room:room-rxjava2"))
+    androidTestImplementation(project(":room:room-rxjava3"))
     androidTestImplementation(project(":room:room-guava"))
     androidTestImplementation("androidx.arch.core:core-testing:2.0.1")
     androidTestImplementation(projectOrArtifact(":paging:paging-runtime"))
@@ -103,6 +104,7 @@
     androidTestImplementation(FINDBUGS)
     androidTestImplementation(GUAVA_ANDROID)
     androidTestImplementation(RX_JAVA)
+    androidTestImplementation(RX_JAVA3)
     androidTestImplementation(ANDROIDX_TEST_EXT_JUNIT)
     androidTestImplementation(ANDROIDX_TEST_CORE)
     androidTestImplementation(ANDROIDX_TEST_RUNNER)
diff --git a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/MailDao.java b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/MailDao.java
index b6b2799..b073dce 100644
--- a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/MailDao.java
+++ b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/MailDao.java
@@ -24,8 +24,6 @@
 
 import java.util.List;
 
-import io.reactivex.Flowable;
-
 @Dao
 public interface MailDao {
 
@@ -48,7 +46,10 @@
     List<String> getMailBodySnippets(String searchQuery);
 
     @Query("SELECT rowId, * FROM mail")
-    Flowable<List<Mail>> getFlowableMail();
+    io.reactivex.Flowable<List<Mail>> rx2_getFlowableMail();
+
+    @Query("SELECT rowId, * FROM mail")
+    io.reactivex.rxjava3.core.Flowable<List<Mail>> rx3_getFlowableMail();
 
     @Query("SELECT rowId, * FROM mail")
     LiveData<List<Mail>> getLiveDataMail();
diff --git a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserDao.java b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserDao.java
index 040db8a..92a810a 100644
--- a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserDao.java
+++ b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserDao.java
@@ -52,8 +52,6 @@
 import java.util.concurrent.Callable;
 
 import io.reactivex.Completable;
-import io.reactivex.Flowable;
-import io.reactivex.Maybe;
 import io.reactivex.Observable;
 import io.reactivex.Single;
 
@@ -184,25 +182,46 @@
     public abstract Cursor findUsersAsCursor(int... ids);
 
     @Query("select * from user where mId = :id")
-    public abstract Flowable<User> flowableUserById(int id);
+    public abstract io.reactivex.Flowable<User> rx2_flowableUserById(int id);
 
     @Query("select * from user where mId = :id")
-    public abstract Observable<User> observableUserById(int id);
+    public abstract io.reactivex.rxjava3.core.Flowable<User> rx3_flowableUserById(int id);
 
     @Query("select * from user where mId = :id")
-    public abstract Maybe<User> maybeUserById(int id);
+    public abstract io.reactivex.Observable<User> rx2_observableUserById(int id);
+
+    @Query("select * from user where mId = :id")
+    public abstract io.reactivex.rxjava3.core.Observable<User> rx3_observableUserById(int id);
+
+    @Query("select * from user where mId = :id")
+    public abstract io.reactivex.Maybe<User> rx2_maybeUserById(int id);
+
+    @Query("select * from user where mId = :id")
+    public abstract io.reactivex.rxjava3.core.Maybe<User> rx3_maybeUserById(int id);
 
     @Query("select * from user where mId IN (:ids)")
-    public abstract Maybe<List<User>> maybeUsersByIds(int... ids);
-
-    @Query("select * from user where mId = :id")
-    public abstract Single<User> singleUserById(int id);
+    public abstract io.reactivex.Maybe<List<User>> rx2_maybeUsersByIds(int... ids);
 
     @Query("select * from user where mId IN (:ids)")
-    public abstract Single<List<User>> singleUsersByIds(int... ids);
+    public abstract io.reactivex.rxjava3.core.Maybe<List<User>> rx3_maybeUsersByIds(int... ids);
+
+    @Query("select * from user where mId = :id")
+    public abstract io.reactivex.Single<User> rx2_singleUserById(int id);
+
+    @Query("select * from user where mId = :id")
+    public abstract io.reactivex.rxjava3.core.Single<User> rx3_singleUserById(int id);
+
+    @Query("select * from user where mId IN (:ids)")
+    public abstract io.reactivex.Single<List<User>> rx2_singleUsersByIds(int... ids);
+
+    @Query("select * from user where mId IN (:ids)")
+    public abstract io.reactivex.rxjava3.core.Single<List<User>> rx3_singleUsersByIds(int... ids);
 
     @Query("select COUNT(*) from user")
-    public abstract Flowable<Integer> flowableCountUsers();
+    public abstract io.reactivex.Flowable<Integer> rx2_flowableCountUsers();
+
+    @Query("select COUNT(*) from user")
+    public abstract io.reactivex.rxjava3.core.Flowable<Integer> rx3_flowableCountUsers();
 
     @Query("select COUNT(*) from user")
     public abstract Publisher<Integer> publisherCountUsers();
diff --git a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserPetDao.java b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserPetDao.java
index 14c3cc1..da2996da 100644
--- a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserPetDao.java
+++ b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/dao/UserPetDao.java
@@ -40,9 +40,6 @@
 
 import java.util.List;
 
-import io.reactivex.Flowable;
-import io.reactivex.Observable;
-
 @Dao
 public interface UserPetDao {
     @Query("SELECT * FROM User u, Pet p WHERE u.mId = p.mUserId")
@@ -101,11 +98,19 @@
 
     @Transaction
     @Query("SELECT * FROM User u where u.mId = :userId")
-    Flowable<UserAndAllPets> flowableUserWithPets(int userId);
+    io.reactivex.Flowable<UserAndAllPets> rx2_flowableUserWithPets(int userId);
 
     @Transaction
     @Query("SELECT * FROM User u where u.mId = :userId")
-    Observable<UserAndAllPets> observableUserWithPets(int userId);
+    io.reactivex.rxjava3.core.Flowable<UserAndAllPets> rx3_flowableUserWithPets(int userId);
+
+    @Transaction
+    @Query("SELECT * FROM User u where u.mId = :userId")
+    io.reactivex.Observable<UserAndAllPets> rx2_observableUserWithPets(int userId);
+
+    @Transaction
+    @Query("SELECT * FROM User u where u.mId = :userId")
+    io.reactivex.rxjava3.core.Observable<UserAndAllPets> rx3_observableUserWithPets(int userId);
 
     @Transaction
     @Query("SELECT * FROM User u where u.mId = :uid")
diff --git a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/MainThreadCheckTest.java b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/MainThreadCheckTest.java
index 3ef690f..a2331d5 100644
--- a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/MainThreadCheckTest.java
+++ b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/MainThreadCheckTest.java
@@ -55,11 +55,23 @@
     }
 
     @Test
-    public void testFlowableOnMainThread() {
+    public void testRx2FlowableOnMainThread() {
         final Throwable error = test(false, new Function<TestDatabase, Void>() {
             @Override
             public Void apply(TestDatabase db) {
-                db.getUserDao().flowableUserById(3);
+                db.getUserDao().rx2_flowableUserById(3);
+                return null;
+            }
+        });
+        assertThat(error, nullValue());
+    }
+
+    @Test
+    public void testRx3FlowableOnMainThread() {
+        final Throwable error = test(false, new Function<TestDatabase, Void>() {
+            @Override
+            public Void apply(TestDatabase db) {
+                db.getUserDao().rx3_flowableUserById(3);
                 return null;
             }
         });
@@ -79,11 +91,23 @@
     }
 
     @Test
-    public void testObservableOnMainThread() {
+    public void testRx2ObservableOnMainThread() {
         final Throwable error = test(false, new Function<TestDatabase, Void>() {
             @Override
             public Void apply(TestDatabase db) {
-                db.getUserDao().observableUserById(3);
+                db.getUserDao().rx2_observableUserById(3);
+                return null;
+            }
+        });
+        assertThat(error, nullValue());
+    }
+
+    @Test
+    public void testRx3ObservableOnMainThread() {
+        final Throwable error = test(false, new Function<TestDatabase, Void>() {
+            @Override
+            public Void apply(TestDatabase db) {
+                db.getUserDao().rx3_observableUserById(3);
                 return null;
             }
         });
diff --git a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2Test.java b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2Test.java
index 88abfbc..7e63be9 100644
--- a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2Test.java
+++ b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2Test.java
@@ -107,7 +107,7 @@
     @Test
     public void maybeUser_Empty() throws InterruptedException {
         TestObserver<User> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.maybeUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_maybeUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         testObserver.assertComplete();
@@ -120,7 +120,7 @@
         User user = TestUtil.createUser(3);
         mUserDao.insert(user);
         TestObserver<User> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.maybeUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_maybeUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         testObserver.assertComplete();
@@ -132,7 +132,7 @@
     @Test
     public void maybeUsers_EmptyList() throws InterruptedException {
         TestObserver<List<User>> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.maybeUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_maybeUsersByIds(3, 5, 7).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         testObserver.assertComplete();
@@ -145,7 +145,7 @@
         User[] users = TestUtil.createUsersArray(3, 5);
         mUserDao.insertAll(users);
         TestObserver<List<User>> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.maybeUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_maybeUsersByIds(3, 5, 7).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         testObserver.assertComplete();
@@ -159,7 +159,7 @@
         User[] users = TestUtil.createUsersArray(1, 2);
         mUserDao.insertAll(users);
         TestObserver<User> testObserver1 = new TestObserver<>();
-        Maybe<User> maybe1 = mUserDao.maybeUserById(1);
+        Maybe<User> maybe1 = mUserDao.rx2_maybeUserById(1);
         Disposable disposable1 = maybe1.observeOn(mTestScheduler)
                 .subscribeWith(testObserver1);
         drain();
@@ -168,7 +168,7 @@
         testObserver1.assertValue(users[0]);
 
         TestObserver<User> testObserver2 = new TestObserver<>();
-        Maybe<User> maybe2 = mUserDao.maybeUserById(2);
+        Maybe<User> maybe2 = mUserDao.rx2_maybeUserById(2);
         Disposable disposable2 = maybe2.observeOn(mTestScheduler)
                 .subscribeWith(testObserver2);
         drain();
@@ -193,7 +193,7 @@
     @Test
     public void singleUser_Empty() throws InterruptedException {
         TestObserver<User> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.singleUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_singleUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         // figure out which error we should dispatch
@@ -207,7 +207,7 @@
         User user = TestUtil.createUser(3);
         mUserDao.insert(user);
         TestObserver<User> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.singleUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_singleUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         testObserver.assertComplete();
@@ -219,7 +219,7 @@
     @Test
     public void singleUsers_EmptyList() throws InterruptedException {
         TestObserver<List<User>> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.singleUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_singleUsersByIds(3, 5, 7).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         testObserver.assertComplete();
@@ -232,7 +232,7 @@
         User[] users = TestUtil.createUsersArray(3, 5);
         mUserDao.insertAll(users);
         TestObserver<List<User>> testObserver = new TestObserver<>();
-        Disposable disposable = mUserDao.singleUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_singleUsersByIds(3, 5, 7).observeOn(mTestScheduler)
                 .subscribeWith(testObserver);
         drain();
         testObserver.assertComplete();
@@ -246,7 +246,7 @@
         User[] users = TestUtil.createUsersArray(1, 2);
         mUserDao.insertAll(users);
         TestObserver<User> testObserver1 = new TestObserver<>();
-        Single<User> userSingle1 = mUserDao.singleUserById(1);
+        Single<User> userSingle1 = mUserDao.rx2_singleUserById(1);
         Disposable disposable1 = userSingle1.observeOn(mTestScheduler)
                 .subscribeWith(testObserver1);
         drain();
@@ -256,7 +256,7 @@
 
         // how get single for 2
         TestObserver<User> testObserver2 = new TestObserver<>();
-        Single<User> userSingle2 = mUserDao.singleUserById(2);
+        Single<User> userSingle2 = mUserDao.rx2_singleUserById(2);
         Disposable disposable2 = userSingle2.observeOn(mTestScheduler)
                 .subscribeWith(testObserver2);
         drain();
@@ -280,7 +280,7 @@
         mUserDao.insert(user);
         drain();
         TestSubscriber<User> consumer = new TestSubscriber<>();
-        Disposable disposable = mUserDao.flowableUserById(3).subscribeWith(consumer);
+        Disposable disposable = mUserDao.rx2_flowableUserById(3).subscribeWith(consumer);
         drain();
         consumer.assertValue(user);
         disposable.dispose();
@@ -292,7 +292,7 @@
         mUserDao.insert(user);
         drain();
         TestObserver<User> consumer = new TestObserver<>();
-        Disposable disposable = mUserDao.observableUserById(3).subscribeWith(consumer);
+        Disposable disposable = mUserDao.rx2_observableUserById(3).subscribeWith(consumer);
         drain();
         consumer.assertValue(user);
         disposable.dispose();
@@ -304,7 +304,7 @@
         mUserDao.insert(user);
         drain();
         TestSubscriber<User> consumer = new TestSubscriber<>();
-        Disposable disposable = mUserDao.flowableUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_flowableUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(consumer);
         drain();
         assertThat(consumer.values().get(0), is(user));
@@ -326,7 +326,7 @@
         mUserDao.insert(user);
         drain();
         TestObserver<User> consumer = new TestObserver<>();
-        Disposable disposable = mUserDao.observableUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_observableUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(consumer);
         drain();
         assertThat(consumer.values().get(0), is(user));
@@ -346,7 +346,7 @@
     @MediumTest
     public void observeEmpty_Flowable() throws InterruptedException {
         TestSubscriber<User> consumer = new TestSubscriber<>();
-        Disposable disposable = mUserDao.flowableUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_flowableUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(consumer);
         drain();
         consumer.assertNoValues();
@@ -365,7 +365,7 @@
     @MediumTest
     public void observeEmpty_Observable() throws InterruptedException {
         TestObserver<User> consumer = new TestObserver<>();
-        Disposable disposable = mUserDao.observableUserById(3).observeOn(mTestScheduler)
+        Disposable disposable = mUserDao.rx2_observableUserById(3).observeOn(mTestScheduler)
                 .subscribeWith(consumer);
         drain();
         consumer.assertNoValues();
@@ -387,13 +387,13 @@
         drain();
 
         TestSubscriber<User> consumer1 = new TestSubscriber<>();
-        Flowable<User> flowable1 = mUserDao.flowableUserById(1);
+        Flowable<User> flowable1 = mUserDao.rx2_flowableUserById(1);
         Disposable disposable1 = flowable1.subscribeWith(consumer1);
         drain();
         consumer1.assertValue(users[0]);
 
         TestSubscriber<User> consumer2 = new TestSubscriber<>();
-        Disposable disposable2 = mUserDao.flowableUserById(2).subscribeWith(consumer2);
+        Disposable disposable2 = mUserDao.rx2_flowableUserById(2).subscribeWith(consumer2);
         drain();
         consumer2.assertValue(users[1]);
 
@@ -414,13 +414,13 @@
         drain();
 
         TestObserver<User> consumer1 = new TestObserver<>();
-        Observable<User> flowable1 = mUserDao.observableUserById(1);
+        Observable<User> flowable1 = mUserDao.rx2_observableUserById(1);
         Disposable disposable1 = flowable1.subscribeWith(consumer1);
         drain();
         consumer1.assertValue(users[0]);
 
         TestObserver<User> consumer2 = new TestObserver<>();
-        Disposable disposable2 = mUserDao.observableUserById(2).subscribeWith(consumer2);
+        Disposable disposable2 = mUserDao.rx2_observableUserById(2).subscribeWith(consumer2);
         drain();
         consumer2.assertValue(users[1]);
 
@@ -437,7 +437,7 @@
     @Test
     public void countUsers_Flowable() throws InterruptedException {
         TestSubscriber<Integer> consumer = new TestSubscriber<>();
-        mUserDao.flowableCountUsers()
+        mUserDao.rx2_flowableCountUsers()
                 .observeOn(mTestScheduler)
                 .subscribe(consumer);
         drain();
@@ -474,7 +474,7 @@
     public void withRelation_Flowable() throws InterruptedException {
         final TestSubscriber<UserAndAllPets> subscriber = new TestSubscriber<>();
 
-        mUserPetDao.flowableUserWithPets(3).subscribe(subscriber);
+        mUserPetDao.rx2_flowableUserWithPets(3).subscribe(subscriber);
         drain();
         subscriber.assertSubscribed();
 
@@ -507,7 +507,7 @@
     public void withRelation_Observable() throws InterruptedException {
         final TestObserver<UserAndAllPets> subscriber = new TestObserver<>();
 
-        mUserPetDao.observableUserWithPets(3).subscribe(subscriber);
+        mUserPetDao.rx2_observableUserWithPets(3).subscribe(subscriber);
         drain();
         subscriber.assertSubscribed();
 
@@ -541,7 +541,7 @@
     public void updateInTransaction_Flowable() throws InterruptedException {
         // When subscribing to the emissions of the user
         final TestSubscriber<User> userTestSubscriber = mUserDao
-                .flowableUserById(3)
+                .rx2_flowableUserById(3)
                 .observeOn(mTestScheduler)
                 .test();
         drain();
@@ -565,7 +565,7 @@
     public void updateInTransaction_Observable() throws InterruptedException {
         // When subscribing to the emissions of the user
         final TestObserver<User> userTestSubscriber = mUserDao
-                .observableUserById(3)
+                .rx2_observableUserById(3)
                 .observeOn(mTestScheduler)
                 .test();
         drain();
@@ -593,7 +593,7 @@
         final MailDao mailDao = db.getMailDao();
         final TestSubscriber<List<Mail>> subscriber = new TestSubscriber<>();
 
-        mailDao.getFlowableMail().subscribe(subscriber);
+        mailDao.rx2_getFlowableMail().subscribe(subscriber);
         drain();
         subscriber.assertSubscribed();
         subscriber.assertValue(Collections.emptyList());
diff --git a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2WithInstantTaskExecutorTest.java b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2WithInstantTaskExecutorTest.java
index 34422834..ce5bd50 100644
--- a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2WithInstantTaskExecutorTest.java
+++ b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava2WithInstantTaskExecutorTest.java
@@ -54,7 +54,7 @@
     @Test
     public void testFlowableInTransaction() {
         // When subscribing to the emissions of the user
-        TestSubscriber<User> subscriber = mDatabase.getUserDao().flowableUserById(3).test();
+        TestSubscriber<User> subscriber = mDatabase.getUserDao().rx2_flowableUserById(3).test();
         subscriber.assertValueCount(0);
 
         // When inserting a new user in the data source
diff --git a/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava3Test.java b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava3Test.java
new file mode 100644
index 0000000..b3ef41e
--- /dev/null
+++ b/room/integration-tests/testapp/src/androidTest/java/androidx/room/integration/testapp/test/RxJava3Test.java
@@ -0,0 +1,692 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.integration.testapp.test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import android.content.Context;
+import android.os.Build;
+
+import androidx.arch.core.executor.ArchTaskExecutor;
+import androidx.arch.core.executor.TaskExecutor;
+import androidx.room.Room;
+import androidx.room.integration.testapp.FtsTestDatabase;
+import androidx.room.integration.testapp.dao.MailDao;
+import androidx.room.integration.testapp.vo.Mail;
+import androidx.room.integration.testapp.vo.Pet;
+import androidx.room.integration.testapp.vo.User;
+import androidx.room.integration.testapp.vo.UserAndAllPets;
+import androidx.room.rxjava3.EmptyResultSetException;
+import androidx.room.rxjava3.RxRoom;
+import androidx.test.core.app.ApplicationProvider;
+import androidx.test.ext.junit.runners.AndroidJUnit4;
+import androidx.test.filters.MediumTest;
+import androidx.test.filters.SdkSuppress;
+import androidx.test.filters.SmallTest;
+
+import com.google.common.collect.Lists;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.UndeliverableException;
+import io.reactivex.rxjava3.functions.Predicate;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+import io.reactivex.rxjava3.schedulers.TestScheduler;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+
+
+@SmallTest
+@RunWith(AndroidJUnit4.class)
+public class RxJava3Test extends TestDatabaseTest {
+
+    private TestScheduler mTestScheduler;
+
+    @Before
+    public void setupSchedulers() {
+        mTestScheduler = new TestScheduler();
+        mTestScheduler.start();
+        ArchTaskExecutor.getInstance().setDelegate(new TaskExecutor() {
+            @Override
+            public void executeOnDiskIO(@NotNull Runnable runnable) {
+                mTestScheduler.scheduleDirect(runnable);
+            }
+
+            @Override
+            public void postToMainThread(@NotNull Runnable runnable) {
+                Assert.fail("no main thread in this test");
+            }
+
+            @Override
+            public boolean isMainThread() {
+                return false;
+            }
+        });
+    }
+
+    @After
+    public void clearSchedulers() {
+        mTestScheduler.shutdown();
+        ArchTaskExecutor.getInstance().setDelegate(null);
+    }
+
+    private void drain() throws InterruptedException {
+        mTestScheduler.triggerActions();
+    }
+
+    @Test
+    public void maybeUser_Empty() throws InterruptedException {
+        TestObserver<User> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_maybeUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        testObserver.assertComplete();
+        testObserver.assertNoValues();
+        disposable.dispose();
+    }
+
+    @Test
+    public void maybeUser_WithData() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        TestObserver<User> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_maybeUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        testObserver.assertComplete();
+        testObserver.assertValue(user);
+
+        disposable.dispose();
+    }
+
+    @Test
+    public void maybeUsers_EmptyList() throws InterruptedException {
+        TestObserver<List<User>> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_maybeUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        testObserver.assertComplete();
+        testObserver.assertValue(Collections.<User>emptyList());
+        disposable.dispose();
+    }
+
+    @Test
+    public void maybeUsers_WithValue() throws InterruptedException {
+        User[] users = TestUtil.createUsersArray(3, 5);
+        mUserDao.insertAll(users);
+        TestObserver<List<User>> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_maybeUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        testObserver.assertComplete();
+        // since this is a clean db, it is ok to rely on the order for the test.
+        testObserver.assertValue(Arrays.asList(users));
+        disposable.dispose();
+    }
+
+    @Test
+    public void maybeUsers_keepMaybeReference() throws InterruptedException {
+        User[] users = TestUtil.createUsersArray(1, 2);
+        mUserDao.insertAll(users);
+        TestObserver<User> testObserver1 = new TestObserver<>();
+        Maybe<User> maybe1 = mUserDao.rx3_maybeUserById(1);
+        Disposable disposable1 = maybe1.observeOn(mTestScheduler)
+                .subscribeWith(testObserver1);
+        drain();
+        testObserver1.assertComplete();
+        // since this is a clean db, it is ok to rely on the order for the test.
+        testObserver1.assertValue(users[0]);
+
+        TestObserver<User> testObserver2 = new TestObserver<>();
+        Maybe<User> maybe2 = mUserDao.rx3_maybeUserById(2);
+        Disposable disposable2 = maybe2.observeOn(mTestScheduler)
+                .subscribeWith(testObserver2);
+        drain();
+        testObserver2.assertComplete();
+        // since this is a clean db, it is ok to rely on the order for the test.
+        testObserver2.assertValue(users[1]);
+
+        TestObserver<User> testObserver3 = new TestObserver<>();
+
+        Disposable disposable3 = maybe1.observeOn(mTestScheduler)
+                .subscribeWith(testObserver3);
+        drain();
+        testObserver3.assertComplete();
+        // since this is a clean db, it is ok to rely on the order for the test.
+        testObserver3.assertValue(users[0]);
+
+        disposable1.dispose();
+        disposable2.dispose();
+        disposable3.dispose();
+    }
+
+    @Test
+    public void singleUser_Empty() throws InterruptedException {
+        TestObserver<User> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_singleUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        // figure out which error we should dispatch
+        testObserver.assertError(EmptyResultSetException.class);
+        testObserver.assertNoValues();
+        disposable.dispose();
+    }
+
+    @Test
+    public void singleUser_WithData() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        TestObserver<User> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_singleUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        testObserver.assertComplete();
+        testObserver.assertValue(user);
+
+        disposable.dispose();
+    }
+
+    @Test
+    public void singleUsers_EmptyList() throws InterruptedException {
+        TestObserver<List<User>> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_singleUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        testObserver.assertComplete();
+        testObserver.assertValue(Collections.<User>emptyList());
+        disposable.dispose();
+    }
+
+    @Test
+    public void singleUsers_WithValue() throws InterruptedException {
+        User[] users = TestUtil.createUsersArray(3, 5);
+        mUserDao.insertAll(users);
+        TestObserver<List<User>> testObserver = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_singleUsersByIds(3, 5, 7).observeOn(mTestScheduler)
+                .subscribeWith(testObserver);
+        drain();
+        testObserver.assertComplete();
+        // since this is a clean db, it is ok to rely on the order for the test.
+        testObserver.assertValue(Arrays.asList(users));
+        disposable.dispose();
+    }
+
+    @Test
+    public void singleUser_keepSingleReference() throws InterruptedException {
+        User[] users = TestUtil.createUsersArray(1, 2);
+        mUserDao.insertAll(users);
+        TestObserver<User> testObserver1 = new TestObserver<>();
+        Single<User> userSingle1 = mUserDao.rx3_singleUserById(1);
+        Disposable disposable1 = userSingle1.observeOn(mTestScheduler)
+                .subscribeWith(testObserver1);
+        drain();
+        testObserver1.assertComplete();
+        testObserver1.assertValue(users[0]);
+        disposable1.dispose();
+
+        // how get single for 2
+        TestObserver<User> testObserver2 = new TestObserver<>();
+        Single<User> userSingle2 = mUserDao.rx3_singleUserById(2);
+        Disposable disposable2 = userSingle2.observeOn(mTestScheduler)
+                .subscribeWith(testObserver2);
+        drain();
+        testObserver2.assertComplete();
+        testObserver2.assertValue(users[1]);
+        disposable2.dispose();
+
+        // now re-use the first single
+        TestObserver<User> testObserver3 = new TestObserver<>();
+        Disposable disposable3 = userSingle1.observeOn(mTestScheduler)
+                .subscribeWith(testObserver3);
+        drain();
+        testObserver3.assertComplete();
+        testObserver3.assertValue(users[0]);
+        disposable3.dispose();
+    }
+
+    @Test
+    public void observeOnce_Flowable() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        TestSubscriber<User> consumer = new TestSubscriber<>();
+        mUserDao.rx3_flowableUserById(3).subscribe(consumer);
+        drain();
+        consumer.assertValue(user);
+        consumer.cancel();
+    }
+
+    @Test
+    public void observeOnce_Observable() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        TestObserver<User> consumer = new TestObserver<>();
+        mUserDao.rx3_observableUserById(3).subscribeWith(consumer);
+        drain();
+        consumer.assertValue(user);
+        consumer.dispose();
+    }
+
+    @Test
+    public void observeChangeAndDispose_Flowable() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        TestSubscriber<User> consumer = new TestSubscriber<>();
+        mUserDao.rx3_flowableUserById(3).observeOn(mTestScheduler).subscribe(consumer);
+        drain();
+        assertThat(consumer.values().get(0), is(user));
+        user.setName("rxy");
+        mUserDao.insertOrReplace(user);
+        drain();
+        User next = consumer.values().get(1);
+        assertThat(next, is(user));
+        consumer.cancel();
+        user.setName("foo");
+        mUserDao.insertOrReplace(user);
+        drain();
+        consumer.assertValueCount(2);
+    }
+
+    @Test
+    public void observeChangeAndDispose_Observable() throws InterruptedException {
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        TestObserver<User> consumer = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_observableUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(consumer);
+        drain();
+        assertThat(consumer.values().get(0), is(user));
+        user.setName("rxy");
+        mUserDao.insertOrReplace(user);
+        drain();
+        User next = consumer.values().get(1);
+        assertThat(next, is(user));
+        disposable.dispose();
+        user.setName("foo");
+        mUserDao.insertOrReplace(user);
+        drain();
+        consumer.assertValueCount(2);
+    }
+
+    @Test
+    @MediumTest
+    public void observeEmpty_Flowable() throws InterruptedException {
+        TestSubscriber<User> consumer = new TestSubscriber<>();
+        mUserDao.rx3_flowableUserById(3).observeOn(mTestScheduler).subscribe(consumer);
+        drain();
+        consumer.assertNoValues();
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        assertThat(consumer.values().get(0), is(user));
+        consumer.cancel();
+        user.setAge(88);
+        mUserDao.insertOrReplace(user);
+        drain();
+        consumer.assertValueCount(1);
+    }
+
+    @Test
+    @MediumTest
+    public void observeEmpty_Observable() throws InterruptedException {
+        TestObserver<User> consumer = new TestObserver<>();
+        Disposable disposable = mUserDao.rx3_observableUserById(3).observeOn(mTestScheduler)
+                .subscribeWith(consumer);
+        drain();
+        consumer.assertNoValues();
+        User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        assertThat(consumer.values().get(0), is(user));
+        disposable.dispose();
+        user.setAge(88);
+        mUserDao.insertOrReplace(user);
+        drain();
+        consumer.assertValueCount(1);
+    }
+
+    @Test
+    public void keepReference_Flowable() throws InterruptedException {
+        User[] users = TestUtil.createUsersArray(1, 2);
+        mUserDao.insertAll(users);
+        drain();
+
+        TestSubscriber<User> consumer1 = new TestSubscriber<>();
+        Flowable<User> flowable1 = mUserDao.rx3_flowableUserById(1);
+        flowable1.subscribe(consumer1);
+        drain();
+        consumer1.assertValue(users[0]);
+
+        TestSubscriber<User> consumer2 = new TestSubscriber<>();
+        mUserDao.rx3_flowableUserById(2).subscribe(consumer2);
+        drain();
+        consumer2.assertValue(users[1]);
+
+        TestSubscriber<User> consumer3 = new TestSubscriber<>();
+        flowable1.subscribe(consumer3);
+        drain();
+        consumer3.assertValue(users[0]);
+
+        consumer1.cancel();
+        consumer2.cancel();
+        consumer3.cancel();
+    }
+
+    @Test
+    public void keepReference_Observable() throws InterruptedException {
+        User[] users = TestUtil.createUsersArray(1, 2);
+        mUserDao.insertAll(users);
+        drain();
+
+        TestObserver<User> consumer1 = new TestObserver<>();
+        Observable<User> flowable1 = mUserDao.rx3_observableUserById(1);
+        Disposable disposable1 = flowable1.subscribeWith(consumer1);
+        drain();
+        consumer1.assertValue(users[0]);
+
+        TestObserver<User> consumer2 = new TestObserver<>();
+        Disposable disposable2 = mUserDao.rx3_observableUserById(2).subscribeWith(consumer2);
+        drain();
+        consumer2.assertValue(users[1]);
+
+        TestObserver<User> consumer3 = new TestObserver<>();
+        Disposable disposable3 = flowable1.subscribeWith(consumer3);
+        drain();
+        consumer3.assertValue(users[0]);
+
+        disposable1.dispose();
+        disposable2.dispose();
+        disposable3.dispose();
+    }
+
+    @Test
+    public void countUsers_Flowable() throws InterruptedException {
+        TestSubscriber<Integer> consumer = new TestSubscriber<>();
+        mUserDao.rx3_flowableCountUsers()
+                .observeOn(mTestScheduler)
+                .subscribe(consumer);
+        drain();
+        assertThat(consumer.values().get(0), is(0));
+        mUserDao.insertAll(TestUtil.createUsersArray(1, 3, 4, 6));
+        drain();
+        assertThat(consumer.values().get(1), is(4));
+        mUserDao.deleteByUids(3, 7);
+        drain();
+        assertThat(consumer.values().get(2), is(3));
+        mUserDao.deleteByUids(101);
+        drain();
+        consumer.assertValueCount(3);
+    }
+
+    @Test
+    @MediumTest
+    public void countUsers_Publisher() throws InterruptedException {
+        TestSubscriber<Integer> subscriber = new TestSubscriber<>();
+        mUserDao.publisherCountUsers().subscribe(subscriber);
+        drain();
+        subscriber.request(2);
+        drain();
+        subscriber.assertValue(0);
+        mUserDao.insert(TestUtil.createUser(2));
+        drain();
+        subscriber.assertValues(0, 1);
+        subscriber.cancel();
+        subscriber.assertNoErrors();
+    }
+
+    @Test
+    public void withRelation_Flowable() throws InterruptedException {
+        final TestSubscriber<UserAndAllPets> subscriber = new TestSubscriber<>();
+
+        mUserPetDao.rx3_flowableUserWithPets(3).subscribe(subscriber);
+
+        drain();
+        subscriber.assertNoValues();
+
+        final User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        subscriber.assertValue(new Predicate<UserAndAllPets>() {
+            @Override
+            public boolean test(UserAndAllPets userAndAllPets) throws Exception {
+                return userAndAllPets.user.equals(user);
+            }
+        });
+        subscriber.assertValueCount(1);
+        final Pet[] pets = TestUtil.createPetsForUser(3, 1, 2);
+        mPetDao.insertAll(pets);
+        drain();
+        subscriber.assertValueAt(1, new Predicate<UserAndAllPets>() {
+            @Override
+            public boolean test(UserAndAllPets userAndAllPets) throws Exception {
+                return userAndAllPets.user.equals(user)
+                        && userAndAllPets.pets.equals(Arrays.asList(pets));
+            }
+        });
+    }
+
+    @Test
+    public void withRelation_Observable() throws InterruptedException {
+        final TestObserver<UserAndAllPets> subscriber = new TestObserver<>();
+
+        mUserPetDao.rx3_observableUserWithPets(3).subscribe(subscriber);
+
+        drain();
+        subscriber.assertNoValues();
+
+        final User user = TestUtil.createUser(3);
+        mUserDao.insert(user);
+        drain();
+        subscriber.assertValue(new Predicate<UserAndAllPets>() {
+            @Override
+            public boolean test(UserAndAllPets userAndAllPets) throws Exception {
+                return userAndAllPets.user.equals(user);
+            }
+        });
+        subscriber.assertValueCount(1);
+        final Pet[] pets = TestUtil.createPetsForUser(3, 1, 2);
+        mPetDao.insertAll(pets);
+        drain();
+        subscriber.assertValueAt(1, new Predicate<UserAndAllPets>() {
+            @Override
+            public boolean test(UserAndAllPets userAndAllPets) throws Exception {
+                return userAndAllPets.user.equals(user)
+                        && userAndAllPets.pets.equals(Arrays.asList(pets));
+            }
+        });
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void updateInTransaction_Flowable() throws InterruptedException {
+        // When subscribing to the emissions of the user
+        final TestSubscriber<User> userTestSubscriber = mUserDao
+                .rx3_flowableUserById(3)
+                .observeOn(mTestScheduler)
+                .test();
+        drain();
+        userTestSubscriber.assertValueCount(0);
+
+        // When inserting a new user in the data source
+        mDatabase.beginTransaction();
+        try {
+            mUserDao.insert(TestUtil.createUser(3));
+            mDatabase.setTransactionSuccessful();
+
+        } finally {
+            mDatabase.endTransaction();
+        }
+        drain();
+        userTestSubscriber.assertValueCount(1);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void updateInTransaction_Observable() throws InterruptedException {
+        // When subscribing to the emissions of the user
+        final TestObserver<User> userTestSubscriber = mUserDao
+                .rx3_observableUserById(3)
+                .observeOn(mTestScheduler)
+                .test();
+        drain();
+        userTestSubscriber.assertValueCount(0);
+
+        // When inserting a new user in the data source
+        mDatabase.beginTransaction();
+        try {
+            mUserDao.insert(TestUtil.createUser(3));
+            mDatabase.setTransactionSuccessful();
+
+        } finally {
+            mDatabase.endTransaction();
+        }
+        drain();
+        userTestSubscriber.assertValueCount(1);
+    }
+
+    @Test
+    @SdkSuppress(minSdkVersion = Build.VERSION_CODES.JELLY_BEAN)
+    public void withFtsTable_Flowable() throws InterruptedException {
+        final Context context = ApplicationProvider.getApplicationContext();
+        final FtsTestDatabase db = Room.inMemoryDatabaseBuilder(context, FtsTestDatabase.class)
+                .build();
+        final MailDao mailDao = db.getMailDao();
+        final TestSubscriber<List<Mail>> subscriber = new TestSubscriber<>();
+
+        mailDao.rx3_getFlowableMail().subscribe(subscriber);
+        drain();
+        subscriber.assertValue(Collections.emptyList());
+
+        Mail mail0 = TestUtil.createMail(1, "subject0", "body0");
+        mailDao.insert(mail0);
+        drain();
+        subscriber.assertValueAt(1, new Predicate<List<Mail>>() {
+            @Override
+            public boolean test(List<Mail> mailList) throws Exception {
+                return mailList.equals(Lists.newArrayList(mail0));
+            }
+        });
+
+        Mail mail1 = TestUtil.createMail(2, "subject1", "body1");
+        mailDao.insert(mail1);
+        drain();
+        subscriber.assertValueAt(2, new Predicate<List<Mail>>() {
+            @Override
+            public boolean test(List<Mail> mailList) throws Exception {
+                return mailList.equals(Lists.newArrayList(mail0, mail1));
+            }
+        });
+    }
+
+    @Test
+    public void singleFromCallable_emptyResult_disposed() throws InterruptedException {
+        CountDownLatch queryLatch = new CountDownLatch(1);
+        CountDownLatch bgThreadLatch = new CountDownLatch(1);
+        TestObserver<Boolean> testObserver = new TestObserver<>();
+        Disposable disposable = Single.fromCallable(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                bgThreadLatch.countDown();
+                queryLatch.await();
+                throw new EmptyResultSetException("Empty result");
+            }
+        }).subscribeOn(mTestScheduler).subscribeWith(testObserver);
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    RxJavaPlugins.setErrorHandler(e -> {
+                        assertThat(e, instanceOf(UndeliverableException.class));
+                        RxJavaPlugins.setErrorHandler(null);
+                    });
+                    drain();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+        t.start();
+
+        bgThreadLatch.await();
+        testObserver.assertNotComplete();
+        disposable.dispose();
+        queryLatch.countDown();
+        t.join();
+
+        testObserver.assertNoValues();
+        testObserver.assertNotComplete();
+    }
+
+    @Test
+    public void createSingle_emptyResult_disposed() throws InterruptedException {
+        CountDownLatch queryLatch = new CountDownLatch(1);
+        CountDownLatch bgThreadLatch = new CountDownLatch(1);
+        TestObserver<Boolean> testObserver = new TestObserver<>();
+        Disposable disposable = RxRoom.createSingle(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                bgThreadLatch.countDown();
+                queryLatch.await();
+                throw new EmptyResultSetException("Empty result");
+            }
+        }).subscribeOn(mTestScheduler).subscribeWith(testObserver);
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    drain();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+        t.start();
+
+        bgThreadLatch.await();
+        testObserver.assertNotComplete();
+        disposable.dispose();
+        queryLatch.countDown();
+        t.join();
+
+        testObserver.assertNoValues();
+        testObserver.assertNotComplete();
+    }
+}
diff --git a/room/rxjava3/api/2.3.0-alpha02.txt b/room/rxjava3/api/2.3.0-alpha02.txt
new file mode 100644
index 0000000..50707ed
--- /dev/null
+++ b/room/rxjava3/api/2.3.0-alpha02.txt
@@ -0,0 +1,15 @@
+// Signature format: 3.0
+package androidx.room.rxjava3 {
+
+  public final class EmptyResultSetException extends java.lang.RuntimeException {
+    ctor public EmptyResultSetException(String);
+  }
+
+  public final class RxRoom {
+    method public static io.reactivex.rxjava3.core.Flowable<java.lang.Object!> createFlowable(androidx.room.RoomDatabase, java.lang.String!...);
+    method public static io.reactivex.rxjava3.core.Observable<java.lang.Object!> createObservable(androidx.room.RoomDatabase, java.lang.String!...);
+    field public static final Object NOTHING;
+  }
+
+}
+
diff --git a/room/rxjava3/api/current.txt b/room/rxjava3/api/current.txt
new file mode 100644
index 0000000..50707ed
--- /dev/null
+++ b/room/rxjava3/api/current.txt
@@ -0,0 +1,15 @@
+// Signature format: 3.0
+package androidx.room.rxjava3 {
+
+  public final class EmptyResultSetException extends java.lang.RuntimeException {
+    ctor public EmptyResultSetException(String);
+  }
+
+  public final class RxRoom {
+    method public static io.reactivex.rxjava3.core.Flowable<java.lang.Object!> createFlowable(androidx.room.RoomDatabase, java.lang.String!...);
+    method public static io.reactivex.rxjava3.core.Observable<java.lang.Object!> createObservable(androidx.room.RoomDatabase, java.lang.String!...);
+    field public static final Object NOTHING;
+  }
+
+}
+
diff --git a/room/rxjava3/api/public_plus_experimental_2.3.0-alpha02.txt b/room/rxjava3/api/public_plus_experimental_2.3.0-alpha02.txt
new file mode 100644
index 0000000..50707ed
--- /dev/null
+++ b/room/rxjava3/api/public_plus_experimental_2.3.0-alpha02.txt
@@ -0,0 +1,15 @@
+// Signature format: 3.0
+package androidx.room.rxjava3 {
+
+  public final class EmptyResultSetException extends java.lang.RuntimeException {
+    ctor public EmptyResultSetException(String);
+  }
+
+  public final class RxRoom {
+    method public static io.reactivex.rxjava3.core.Flowable<java.lang.Object!> createFlowable(androidx.room.RoomDatabase, java.lang.String!...);
+    method public static io.reactivex.rxjava3.core.Observable<java.lang.Object!> createObservable(androidx.room.RoomDatabase, java.lang.String!...);
+    field public static final Object NOTHING;
+  }
+
+}
+
diff --git a/room/rxjava3/api/public_plus_experimental_current.txt b/room/rxjava3/api/public_plus_experimental_current.txt
new file mode 100644
index 0000000..50707ed
--- /dev/null
+++ b/room/rxjava3/api/public_plus_experimental_current.txt
@@ -0,0 +1,15 @@
+// Signature format: 3.0
+package androidx.room.rxjava3 {
+
+  public final class EmptyResultSetException extends java.lang.RuntimeException {
+    ctor public EmptyResultSetException(String);
+  }
+
+  public final class RxRoom {
+    method public static io.reactivex.rxjava3.core.Flowable<java.lang.Object!> createFlowable(androidx.room.RoomDatabase, java.lang.String!...);
+    method public static io.reactivex.rxjava3.core.Observable<java.lang.Object!> createObservable(androidx.room.RoomDatabase, java.lang.String!...);
+    field public static final Object NOTHING;
+  }
+
+}
+
diff --git a/room/rxjava3/api/res-2.3.0-alpha02.txt b/room/rxjava3/api/res-2.3.0-alpha02.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/room/rxjava3/api/res-2.3.0-alpha02.txt
diff --git a/room/rxjava3/api/res-current.txt b/room/rxjava3/api/res-current.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/room/rxjava3/api/res-current.txt
diff --git a/room/rxjava3/api/restricted_2.3.0-alpha02.txt b/room/rxjava3/api/restricted_2.3.0-alpha02.txt
new file mode 100644
index 0000000..0178e4b
--- /dev/null
+++ b/room/rxjava3/api/restricted_2.3.0-alpha02.txt
@@ -0,0 +1,18 @@
+// Signature format: 3.0
+package androidx.room.rxjava3 {
+
+  public final class EmptyResultSetException extends java.lang.RuntimeException {
+    ctor public EmptyResultSetException(String);
+  }
+
+  public final class RxRoom {
+    method public static io.reactivex.rxjava3.core.Flowable<java.lang.Object!> createFlowable(androidx.room.RoomDatabase, java.lang.String!...);
+    method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public static <T> io.reactivex.rxjava3.core.Flowable<T!> createFlowable(androidx.room.RoomDatabase, boolean, String![], java.util.concurrent.Callable<T!>);
+    method public static io.reactivex.rxjava3.core.Observable<java.lang.Object!> createObservable(androidx.room.RoomDatabase, java.lang.String!...);
+    method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public static <T> io.reactivex.rxjava3.core.Observable<T!> createObservable(androidx.room.RoomDatabase, boolean, String![], java.util.concurrent.Callable<T!>);
+    method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public static <T> io.reactivex.rxjava3.core.Single<T!> createSingle(java.util.concurrent.Callable<T!>);
+    field public static final Object NOTHING;
+  }
+
+}
+
diff --git a/room/rxjava3/api/restricted_current.txt b/room/rxjava3/api/restricted_current.txt
new file mode 100644
index 0000000..0178e4b
--- /dev/null
+++ b/room/rxjava3/api/restricted_current.txt
@@ -0,0 +1,18 @@
+// Signature format: 3.0
+package androidx.room.rxjava3 {
+
+  public final class EmptyResultSetException extends java.lang.RuntimeException {
+    ctor public EmptyResultSetException(String);
+  }
+
+  public final class RxRoom {
+    method public static io.reactivex.rxjava3.core.Flowable<java.lang.Object!> createFlowable(androidx.room.RoomDatabase, java.lang.String!...);
+    method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public static <T> io.reactivex.rxjava3.core.Flowable<T!> createFlowable(androidx.room.RoomDatabase, boolean, String![], java.util.concurrent.Callable<T!>);
+    method public static io.reactivex.rxjava3.core.Observable<java.lang.Object!> createObservable(androidx.room.RoomDatabase, java.lang.String!...);
+    method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public static <T> io.reactivex.rxjava3.core.Observable<T!> createObservable(androidx.room.RoomDatabase, boolean, String![], java.util.concurrent.Callable<T!>);
+    method @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public static <T> io.reactivex.rxjava3.core.Single<T!> createSingle(java.util.concurrent.Callable<T!>);
+    field public static final Object NOTHING;
+  }
+
+}
+
diff --git a/room/rxjava3/build.gradle b/room/rxjava3/build.gradle
new file mode 100644
index 0000000..ac3ef8d
--- /dev/null
+++ b/room/rxjava3/build.gradle
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static androidx.build.dependencies.DependenciesKt.*
+import androidx.build.LibraryGroups
+import androidx.build.LibraryVersions
+import androidx.build.AndroidXExtension
+import androidx.build.Publish
+
+plugins {
+    id("AndroidXPlugin")
+    id("com.android.library")
+}
+
+dependencies {
+    api(project(":room:room-common"))
+    api(project(":room:room-runtime"))
+    implementation("androidx.arch.core:core-runtime:2.0.1")
+    api(RX_JAVA3)
+
+    testImplementation(JUNIT)
+    testImplementation(MOCKITO_CORE)
+    testImplementation("androidx.arch.core:core-testing:2.0.1")
+    testImplementation("androidx.lifecycle:lifecycle-livedata:2.0.0") // for mocking invalidation tracker
+}
+
+androidx {
+    name = "Android Room RXJava3"
+    publish = Publish.SNAPSHOT_AND_RELEASE
+    mavenGroup = LibraryGroups.ROOM
+    inceptionYear = "2020"
+    description = "Android Room RXJava3"
+    url = AndroidXExtension.ARCHITECTURE_URL
+}
\ No newline at end of file
diff --git a/room/rxjava3/src/main/AndroidManifest.xml b/room/rxjava3/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..fd2038f
--- /dev/null
+++ b/room/rxjava3/src/main/AndroidManifest.xml
@@ -0,0 +1,18 @@
+<!--
+  ~ Copyright (C) 2020 The Android Open Source Project
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<manifest package="androidx.room.rxjava3">
+</manifest>
diff --git a/room/rxjava3/src/main/java/androidx/room/rxjava3/EmptyResultSetException.java b/room/rxjava3/src/main/java/androidx/room/rxjava3/EmptyResultSetException.java
new file mode 100644
index 0000000..d902587
--- /dev/null
+++ b/room/rxjava3/src/main/java/androidx/room/rxjava3/EmptyResultSetException.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.rxjava3;
+
+import androidx.annotation.NonNull;
+
+/**
+ * Thrown by Room when the query in a Single&lt;T&gt; DAO method needs to return a result but the
+ * returned result from the database is empty.
+ * <p>
+ * Since a Single&lt;T&gt; must either emit a single non-null value or an error, this exception is
+ * thrown instead of emitting a null value when the query resulted empty. If the Single&lt;T&gt;
+ * contains a type argument of a collection (e.g. Single&lt;List&lt;Song&gt&gt;) then this
+ * exception is not thrown an an empty collection is emitted instead.
+ */
+public final class EmptyResultSetException extends RuntimeException {
+    /**
+     * Constructs a new EmptyResultSetException with the exception.
+     * @param message The SQL query which didn't return any results.
+     */
+    public EmptyResultSetException(@NonNull String message) {
+        super(message);
+    }
+}
diff --git a/room/rxjava3/src/main/java/androidx/room/rxjava3/RxRoom.java b/room/rxjava3/src/main/java/androidx/room/rxjava3/RxRoom.java
new file mode 100644
index 0000000..230dd83
--- /dev/null
+++ b/room/rxjava3/src/main/java/androidx/room/rxjava3/RxRoom.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.rxjava3;
+
+import androidx.annotation.NonNull;
+import androidx.annotation.RestrictTo;
+import androidx.room.InvalidationTracker;
+import androidx.room.RoomDatabase;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.MaybeSource;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.functions.Function;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+/**
+ * Helper class to add RxJava3 support to Room.
+ */
+public final class RxRoom {
+    /**
+     * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
+     */
+    @NonNull
+    public static final Object NOTHING = new Object();
+
+    /**
+     * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
+     * observed tables is updated.
+     * <p>
+     * You can easily chain a database operation to downstream of this {@link Flowable} to ensure
+     * that it re-runs when database is modified.
+     * <p>
+     * Since database invalidation is batched, multiple changes in the database may results in just
+     * 1 emission.
+     *
+     * @param database   The database instance
+     * @param tableNames The list of table names that should be observed
+     * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
+     * is modified (also once when the invalidation tracker connection is established).
+     */
+    @NonNull
+    public static Flowable<Object> createFlowable(@NonNull final RoomDatabase database,
+            @NonNull final String... tableNames) {
+        return Flowable.create(emitter -> {
+            final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
+                    tableNames) {
+                @Override
+                public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
+                    if (!emitter.isCancelled()) {
+                        emitter.onNext(NOTHING);
+                    }
+                }
+            };
+            if (!emitter.isCancelled()) {
+                database.getInvalidationTracker().addObserver(observer);
+                emitter.setDisposable(Disposable.fromAction(
+                        () -> database.getInvalidationTracker().removeObserver(observer)));
+            }
+
+            // emit once to avoid missing any data and also easy chaining
+            if (!emitter.isCancelled()) {
+                emitter.onNext(NOTHING);
+            }
+        }, BackpressureStrategy.LATEST);
+    }
+
+    /**
+     * Helper method used by generated code to bind a Callable such that it will be run in
+     * our disk io thread and will automatically block null values since RxJava3 does not like null.
+     *
+     * @hide
+     */
+    @NonNull
+    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
+    public static <T> Flowable<T> createFlowable(@NonNull final RoomDatabase database,
+            final boolean inTransaction, @NonNull final String[] tableNames,
+            @NonNull final Callable<T> callable) {
+        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
+        final Maybe<T> maybe = Maybe.fromCallable(callable);
+        return createFlowable(database, tableNames)
+                .subscribeOn(scheduler)
+                .unsubscribeOn(scheduler)
+                .observeOn(scheduler)
+                .flatMapMaybe((Function<Object, MaybeSource<T>>) o -> maybe);
+    }
+
+    /**
+     * Creates a {@link Observable} that emits at least once and also re-emits whenever one of the
+     * observed tables is updated.
+     * <p>
+     * You can easily chain a database operation to downstream of this {@link Observable} to ensure
+     * that it re-runs when database is modified.
+     * <p>
+     * Since database invalidation is batched, multiple changes in the database may results in just
+     * 1 emission.
+     *
+     * @param database   The database instance
+     * @param tableNames The list of table names that should be observed
+     * @return A {@link Observable} which emits {@link #NOTHING} when one of the observed tables
+     * is modified (also once when the invalidation tracker connection is established).
+     */
+    @NonNull
+    public static Observable<Object> createObservable(@NonNull final RoomDatabase database,
+            @NonNull final String... tableNames) {
+        return Observable.create(emitter -> {
+            final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
+                    tableNames) {
+                @Override
+                public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
+                    emitter.onNext(NOTHING);
+                }
+            };
+            database.getInvalidationTracker().addObserver(observer);
+            emitter.setDisposable(Disposable.fromAction(
+                    () -> database.getInvalidationTracker().removeObserver(observer)));
+
+            // emit once to avoid missing any data and also easy chaining
+            emitter.onNext(NOTHING);
+        });
+    }
+
+    /**
+     * Helper method used by generated code to bind a Callable such that it will be run in
+     * our disk io thread and will automatically block null values since RxJava3 does not like null.
+     *
+     * @hide
+     */
+    @NonNull
+    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
+    public static <T> Observable<T> createObservable(@NonNull final RoomDatabase database,
+            final boolean inTransaction, @NonNull final String[] tableNames,
+            @NonNull final Callable<T> callable) {
+        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
+        final Maybe<T> maybe = Maybe.fromCallable(callable);
+        return createObservable(database, tableNames)
+                .subscribeOn(scheduler)
+                .unsubscribeOn(scheduler)
+                .observeOn(scheduler)
+                .flatMapMaybe(o -> maybe);
+    }
+
+    /**
+     * Helper method used by generated code to create a Single from a Callable that will ignore
+     * the EmptyResultSetException if the stream is already disposed.
+     *
+     * @hide
+     */
+    @NonNull
+    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
+    public static <T> Single<T> createSingle(@NonNull final Callable<T> callable) {
+        return Single.create(emitter -> {
+            try {
+                emitter.onSuccess(callable.call());
+            } catch (EmptyResultSetException e) {
+                emitter.tryOnError(e);
+            }
+        });
+    }
+
+    private static Executor getExecutor(@NonNull RoomDatabase database, boolean inTransaction) {
+        if (inTransaction) {
+            return database.getTransactionExecutor();
+        } else {
+            return database.getQueryExecutor();
+        }
+    }
+
+    private RxRoom() {
+    }
+}
diff --git a/room/rxjava3/src/test/java/androidx/room/rxjava3/RxRoomTest.java b/room/rxjava3/src/test/java/androidx/room/rxjava3/RxRoomTest.java
new file mode 100644
index 0000000..1598297
--- /dev/null
+++ b/room/rxjava3/src/test/java/androidx/room/rxjava3/RxRoomTest.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.rxjava3;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import androidx.arch.core.executor.ArchTaskExecutor;
+import androidx.arch.core.executor.testing.CountingTaskExecutorRule;
+import androidx.room.InvalidationTracker;
+import androidx.room.RoomDatabase;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.functions.Consumer;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+@RunWith(JUnit4.class)
+public class RxRoomTest {
+    @Rule
+    public CountingTaskExecutorRule mExecutor = new CountingTaskExecutorRule();
+
+    private RoomDatabase mDatabase;
+    private InvalidationTracker mInvalidationTracker;
+    private List<InvalidationTracker.Observer> mAddedObservers = new ArrayList<>();
+
+    @Before
+    public void init() {
+        mDatabase = mock(RoomDatabase.class);
+        mInvalidationTracker = mock(InvalidationTracker.class);
+        when(mDatabase.getInvalidationTracker()).thenReturn(mInvalidationTracker);
+        when(mDatabase.getQueryExecutor()).thenReturn(ArchTaskExecutor.getIOThreadExecutor());
+        doAnswer(invocation -> {
+            mAddedObservers.add((InvalidationTracker.Observer) invocation.getArguments()[0]);
+            return null;
+        }).when(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class));
+    }
+
+    @Test
+    public void basicAddRemove_Flowable() {
+        Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, "a", "b");
+        verify(mInvalidationTracker, never()).addObserver(any(InvalidationTracker.Observer.class));
+        Disposable disposable = flowable.subscribe();
+        verify(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class));
+        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
+
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        disposable.dispose();
+
+        verify(mInvalidationTracker).removeObserver(observer);
+
+        disposable = flowable.subscribe();
+        verify(mInvalidationTracker, times(2))
+                .addObserver(any(InvalidationTracker.Observer.class));
+        assertThat(mAddedObservers.size(), CoreMatchers.is(2));
+        assertThat(mAddedObservers.get(1), CoreMatchers.not(CoreMatchers.sameInstance(observer)));
+        InvalidationTracker.Observer observer2 = mAddedObservers.get(1);
+        disposable.dispose();
+        verify(mInvalidationTracker).removeObserver(observer2);
+    }
+
+    @Test
+    public void basicAddRemove_Observable() {
+        Observable<Object> observable = RxRoom.createObservable(mDatabase, "a", "b");
+        verify(mInvalidationTracker, never()).addObserver(any(InvalidationTracker.Observer.class));
+        Disposable disposable = observable.subscribe();
+        verify(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class));
+        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
+
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        disposable.dispose();
+
+        verify(mInvalidationTracker).removeObserver(observer);
+
+        disposable = observable.subscribe();
+        verify(mInvalidationTracker, times(2))
+                .addObserver(any(InvalidationTracker.Observer.class));
+        assertThat(mAddedObservers.size(), CoreMatchers.is(2));
+        assertThat(mAddedObservers.get(1), CoreMatchers.not(CoreMatchers.sameInstance(observer)));
+        InvalidationTracker.Observer observer2 = mAddedObservers.get(1);
+        disposable.dispose();
+        verify(mInvalidationTracker).removeObserver(observer2);
+    }
+
+    @Test
+    public void basicNotify_Flowable() {
+        String[] tables = {"a", "b"};
+        Set<String> tableSet = new HashSet<>(Arrays.asList(tables));
+        Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, tables);
+        CountingConsumer consumer = new CountingConsumer();
+        Disposable disposable = flowable.subscribe(consumer);
+        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        assertThat(consumer.mCount, CoreMatchers.is(1));
+        observer.onInvalidated(tableSet);
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        observer.onInvalidated(tableSet);
+        assertThat(consumer.mCount, CoreMatchers.is(3));
+        disposable.dispose();
+        observer.onInvalidated(tableSet);
+        assertThat(consumer.mCount, CoreMatchers.is(3));
+    }
+
+    @Test
+    public void basicNotify_Observable() {
+        String[] tables = {"a", "b"};
+        Set<String> tableSet = new HashSet<>(Arrays.asList(tables));
+        Observable<Object> observable = RxRoom.createObservable(mDatabase, tables);
+        CountingConsumer consumer = new CountingConsumer();
+        Disposable disposable = observable.subscribe(consumer);
+        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        assertThat(consumer.mCount, CoreMatchers.is(1));
+        observer.onInvalidated(tableSet);
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        observer.onInvalidated(tableSet);
+        assertThat(consumer.mCount, CoreMatchers.is(3));
+        disposable.dispose();
+        observer.onInvalidated(tableSet);
+        assertThat(consumer.mCount, CoreMatchers.is(3));
+    }
+
+    @Test
+    public void internalCallable_Flowable() throws Exception {
+        final AtomicReference<String> value = new AtomicReference<>(null);
+        String[] tables = {"a", "b"};
+        Set<String> tableSet = new HashSet<>(Arrays.asList(tables));
+        final Flowable<String> flowable = RxRoom.createFlowable(
+                mDatabase, false, tables, value::get);
+        final CountingConsumer consumer = new CountingConsumer();
+        final Disposable disposable = flowable.subscribe(consumer);
+        drain();
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        // no value because it is null
+        assertThat(consumer.mCount, CoreMatchers.is(0));
+        value.set("bla");
+        observer.onInvalidated(tableSet);
+        drain();
+        // get value
+        assertThat(consumer.mCount, CoreMatchers.is(1));
+        observer.onInvalidated(tableSet);
+        drain();
+        // get value
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        value.set(null);
+        observer.onInvalidated(tableSet);
+        drain();
+        // no value
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        disposable.dispose();
+    }
+
+    @Test
+    public void internalCallable_Observable() throws Exception {
+        final AtomicReference<String> value = new AtomicReference<>(null);
+        String[] tables = {"a", "b"};
+        Set<String> tableSet = new HashSet<>(Arrays.asList(tables));
+        final Observable<String> flowable = RxRoom.createObservable(
+                mDatabase, false, tables, value::get);
+        final CountingConsumer consumer = new CountingConsumer();
+        final Disposable disposable = flowable.subscribe(consumer);
+        drain();
+        InvalidationTracker.Observer observer = mAddedObservers.get(0);
+        // no value because it is null
+        assertThat(consumer.mCount, CoreMatchers.is(0));
+        value.set("bla");
+        observer.onInvalidated(tableSet);
+        drain();
+        // get value
+        assertThat(consumer.mCount, CoreMatchers.is(1));
+        observer.onInvalidated(tableSet);
+        drain();
+        // get value
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        value.set(null);
+        observer.onInvalidated(tableSet);
+        drain();
+        // no value
+        assertThat(consumer.mCount, CoreMatchers.is(2));
+        disposable.dispose();
+    }
+
+    @Test
+    public void exception_Flowable() throws Exception {
+        final Flowable<String> flowable = RxRoom.createFlowable(
+                mDatabase,
+                false,
+                new String[]{"a"},
+                () -> {
+                    throw new Exception("i want exception");
+                });
+        TestSubscriber<String> subscriber = new TestSubscriber<>();
+        flowable.subscribe(subscriber);
+        drain();
+        subscriber.assertError(
+                throwable -> Objects.equals(throwable.getMessage(), "i want exception"));
+    }
+
+    @Test
+    public void exception_Observable() throws Exception {
+        final Observable<String> flowable = RxRoom.createObservable(
+                mDatabase,
+                false,
+                new String[]{"a"},
+                () -> {
+                    throw new Exception("i want exception");
+                });
+        TestObserver<String> observer = new TestObserver<>();
+        flowable.subscribe(observer);
+        drain();
+        observer.assertError(
+                throwable -> Objects.equals(throwable.getMessage(), "i want exception"));
+    }
+
+    private void drain() throws Exception {
+        mExecutor.drainTasks(10, TimeUnit.SECONDS);
+    }
+
+    private static class CountingConsumer implements Consumer<Object> {
+        int mCount = 0;
+
+        @Override
+        public void accept(@NonNull Object o) {
+            mCount++;
+        }
+    }
+}
diff --git a/settings.gradle b/settings.gradle
index a51cfa4..cbac5d6 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -264,6 +264,7 @@
 includeProject(":room:room-migration", "room/migration")
 includeProject(":room:room-runtime", "room/runtime")
 includeProject(":room:room-rxjava2", "room/rxjava2")
+includeProject(":room:room-rxjava3", "room/rxjava3")
 includeProject(":room:room-testing", "room/testing")
 includeProject(":remotecallback:remotecallback-processor", "remotecallback/processor")
 includeProject(":remotecallback:remotecallback", "remotecallback/remotecallback")