Support Kotlin Coroutines Flow as query return types.

This CL adds QueryResultBinders and Providers Flow. Similar to Rx the
strategy is to generate code that calls into a runtime helper method to
setup Flow given a Callable that actually executes the query.

Channels are not directly supported and instead we show an error
suggesting to change the DAO function return type to Flow and then use
the neighboring functions to create a Channel. The reason is that we
can't reasonably produce a channel without a CoroutineScope and it is
a weird API to ask users to pass a CoroutineScope in their DAO method.

Suspend DAO function that returns Flow is an error.

Bug: 127328278
Bug: 130428884
Test: CoroutinesRoomTest, FlowQueryTest
Change-Id: If0fc6fdb741735f2e4b6b42015730bbf76e8dc18
diff --git a/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt b/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
index 2a3e0c8..81ee4b7 100644
--- a/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
+++ b/buildSrc/src/main/kotlin/androidx/build/dependencies/Dependencies.kt
@@ -68,6 +68,8 @@
     "org.jetbrains.kotlinx:kotlinx-coroutines-guava:$KOTLIN_COROUTINES_VERSION"
 const val KOTLIN_COROUTINES_TEST =
     "org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLIN_COROUTINES_VERSION"
+const val KOTLIN_COROUTINES_PREVIEW =
+    "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC"
 
 const val LEAKCANARY_INSTRUMENTATION =
     "com.squareup.leakcanary:leakcanary-android-instrumentation:1.6.2"
diff --git a/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt b/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
index 70e30ff..6e7a114 100644
--- a/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
@@ -165,6 +165,10 @@
     val UNIT = ClassName.get("kotlin", "Unit")
     val CONTINUATION = ClassName.get("kotlin.coroutines", "Continuation")
     val COROUTINE_SCOPE = ClassName.get("kotlinx.coroutines", "CoroutineScope")
+    val CHANNEL = ClassName.get("kotlinx.coroutines.channels", "Channel")
+    val RECEIVE_CHANNEL = ClassName.get("kotlinx.coroutines.channels", "ReceiveChannel")
+    val SEND_CHANNEL = ClassName.get("kotlinx.coroutines.channels", "SendChannel")
+    val FLOW = ClassName.get("kotlinx.coroutines.flow", "Flow")
 }
 
 fun TypeName.defaultValue(): String {
diff --git a/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt b/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
index e313569..d331aa1 100644
--- a/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
@@ -21,6 +21,7 @@
 import androidx.room.Query
 import androidx.room.RawQuery
 import androidx.room.Update
+import androidx.room.ext.KotlinTypeNames
 import androidx.room.ext.RoomTypeNames
 import androidx.room.ext.SupportDbTypeNames
 import androidx.room.parser.QueryType
@@ -725,4 +726,8 @@
                 " (https://bugs.openjdk.java.net/browse/JDK-8007720)" +
                 " that prevents Room from being incremental." +
                 " Consider using JDK 11+ or the embedded JDK shipped with Android Studio 3.5+."
+
+    fun invalidChannelType(typeName: String) = "'$typeName' is not supported as a return type. " +
+            "Instead declare return type as ${KotlinTypeNames.FLOW} and use Flow transforming " +
+            "functions that converts the Flow into a Channel."
 }
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt b/room/compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt
index ddfdc4e..d5a8ce3 100644
--- a/room/compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt
@@ -28,6 +28,7 @@
 import androidx.room.processor.EntityProcessor
 import androidx.room.processor.FieldProcessor
 import androidx.room.processor.PojoProcessor
+import androidx.room.solver.binderprovider.CoroutineFlowResultBinderProvider
 import androidx.room.solver.binderprovider.CursorQueryResultBinderProvider
 import androidx.room.solver.binderprovider.DataSourceFactoryQueryResultBinderProvider
 import androidx.room.solver.binderprovider.DataSourceQueryResultBinderProvider
@@ -175,6 +176,7 @@
             RxSingleQueryResultBinderProvider(context),
             DataSourceQueryResultBinderProvider(context),
             DataSourceFactoryQueryResultBinderProvider(context),
+            CoroutineFlowResultBinderProvider(context),
             InstantQueryResultBinderProvider(context)
     )
 
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/CoroutineFlowResultBinderProvider.kt b/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/CoroutineFlowResultBinderProvider.kt
new file mode 100644
index 0000000..b419a57
--- /dev/null
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/binderprovider/CoroutineFlowResultBinderProvider.kt
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.solver.binderprovider
+
+import androidx.room.ext.KotlinTypeNames
+import androidx.room.ext.RoomCoroutinesTypeNames
+import androidx.room.ext.typeName
+import androidx.room.parser.ParsedQuery
+import androidx.room.processor.Context
+import androidx.room.processor.ProcessorErrors
+import androidx.room.solver.QueryResultBinderProvider
+import androidx.room.solver.query.result.CoroutineFlowResultBinder
+import androidx.room.solver.query.result.QueryResultBinder
+import javax.lang.model.type.DeclaredType
+
+class CoroutineFlowResultBinderProvider(val context: Context) : QueryResultBinderProvider {
+
+    companion object {
+        val CHANNEL_TYPE_NAMES = listOf(
+            KotlinTypeNames.CHANNEL,
+            KotlinTypeNames.SEND_CHANNEL,
+            KotlinTypeNames.RECEIVE_CHANNEL
+        )
+    }
+
+    private val hasCoroutinesArtifact by lazy {
+        context.processingEnv.elementUtils
+            .getTypeElement(RoomCoroutinesTypeNames.COROUTINES_ROOM.toString()) != null
+    }
+
+    override fun provide(declared: DeclaredType, query: ParsedQuery): QueryResultBinder {
+        val typeArg = declared.typeArguments.first()
+        val adapter = context.typeAdapterStore.findQueryResultAdapter(typeArg, query)
+        val tableNames = ((adapter?.accessedTableNames() ?: emptyList()) +
+                query.tables.map { it.name }).toSet()
+        if (tableNames.isEmpty()) {
+            context.logger.e(ProcessorErrors.OBSERVABLE_QUERY_NOTHING_TO_OBSERVE)
+        }
+        return CoroutineFlowResultBinder(typeArg, tableNames, adapter)
+    }
+
+    override fun matches(declared: DeclaredType): Boolean {
+        if (declared.typeArguments.size != 1) {
+            return false
+        }
+        val typeName = context.processingEnv.typeUtils.erasure(declared).typeName()
+        if (typeName in CHANNEL_TYPE_NAMES) {
+            context.logger.e(ProcessorErrors.invalidChannelType(typeName.toString()))
+            return false
+        }
+        val match = typeName == KotlinTypeNames.FLOW
+        if (match && !hasCoroutinesArtifact) {
+            context.logger.e(ProcessorErrors.MISSING_ROOM_COROUTINE_ARTIFACT)
+        }
+        return match
+    }
+}
\ No newline at end of file
diff --git a/room/compiler/src/main/kotlin/androidx/room/solver/query/result/CoroutineFlowResultBinder.kt b/room/compiler/src/main/kotlin/androidx/room/solver/query/result/CoroutineFlowResultBinder.kt
new file mode 100644
index 0000000..bc2fb8e
--- /dev/null
+++ b/room/compiler/src/main/kotlin/androidx/room/solver/query/result/CoroutineFlowResultBinder.kt
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.solver.query.result
+
+import androidx.room.ext.AndroidTypeNames
+import androidx.room.ext.CallableTypeSpecBuilder
+import androidx.room.ext.L
+import androidx.room.ext.N
+import androidx.room.ext.RoomCoroutinesTypeNames
+import androidx.room.ext.RoomTypeNames
+import androidx.room.ext.T
+import androidx.room.ext.arrayTypeName
+import androidx.room.ext.typeName
+import androidx.room.solver.CodeGenScope
+import com.squareup.javapoet.FieldSpec
+import com.squareup.javapoet.MethodSpec
+import javax.lang.model.type.TypeMirror
+
+/**
+ * Binds the result of a of a Kotlin Coroutine Flow<T>
+ */
+class CoroutineFlowResultBinder(
+    val typeArg: TypeMirror,
+    val tableNames: Set<String>,
+    adapter: QueryResultAdapter?
+) : QueryResultBinder(adapter) {
+
+    override fun convertAndReturn(
+        roomSQLiteQueryVar: String,
+        canReleaseQuery: Boolean,
+        dbField: FieldSpec,
+        inTransaction: Boolean,
+        scope: CodeGenScope
+    ) {
+        val callableImpl = CallableTypeSpecBuilder(typeArg.typeName()) {
+            createRunQueryAndReturnStatements(
+                builder = this,
+                roomSQLiteQueryVar = roomSQLiteQueryVar,
+                canReleaseQuery = canReleaseQuery,
+                dbField = dbField,
+                inTransaction = inTransaction,
+                scope = scope)
+        }.build()
+
+        scope.builder().apply {
+            val tableNamesList = tableNames.joinToString(",") { "\"$it\"" }
+            addStatement(
+                "return $T.createFlow($N, $L, new $T{$L}, $L)",
+                RoomCoroutinesTypeNames.COROUTINES_ROOM,
+                dbField,
+                if (inTransaction) "true" else "false",
+                String::class.arrayTypeName(),
+                tableNamesList,
+                callableImpl)
+        }
+    }
+
+    private fun createRunQueryAndReturnStatements(
+        builder: MethodSpec.Builder,
+        roomSQLiteQueryVar: String,
+        canReleaseQuery: Boolean,
+        dbField: FieldSpec,
+        inTransaction: Boolean,
+        scope: CodeGenScope
+    ) {
+        val transactionWrapper = if (inTransaction) {
+            builder.transactionWrapper(dbField)
+        } else {
+            null
+        }
+        val shouldCopyCursor = adapter?.shouldCopyCursor() == true
+        val outVar = scope.getTmpVar("_result")
+        val cursorVar = scope.getTmpVar("_cursor")
+        transactionWrapper?.beginTransactionWithControlFlow()
+        builder.apply {
+            addStatement("final $T $L = $T.query($N, $L, $L)",
+                AndroidTypeNames.CURSOR,
+                cursorVar,
+                RoomTypeNames.DB_UTIL,
+                dbField,
+                roomSQLiteQueryVar,
+                if (shouldCopyCursor) "true" else "false")
+            beginControlFlow("try").apply {
+                val adapterScope = scope.fork()
+                adapter?.convert(outVar, cursorVar, adapterScope)
+                addCode(adapterScope.builder().build())
+                transactionWrapper?.commitTransaction()
+                addStatement("return $L", outVar)
+            }
+            nextControlFlow("finally").apply {
+                addStatement("$L.close()", cursorVar)
+                if (canReleaseQuery) {
+                    addStatement("$L.release()", roomSQLiteQueryVar)
+                }
+            }
+            endControlFlow()
+        }
+        transactionWrapper?.endTransactionWithControlFlow()
+    }
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/coroutines/Channel.java b/room/compiler/src/test/data/common/input/coroutines/Channel.java
new file mode 100644
index 0000000..092782a
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/coroutines/Channel.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.channels;
+
+public interface Channel<T> extends SendChannel<T>, ReceiveChannel<T> {
+
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/coroutines/ReceiveChannel.java b/room/compiler/src/test/data/common/input/coroutines/ReceiveChannel.java
new file mode 100644
index 0000000..a822127
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/coroutines/ReceiveChannel.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.channels;
+
+public interface ReceiveChannel<T> {
+
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/data/common/input/coroutines/SendChannel.java b/room/compiler/src/test/data/common/input/coroutines/SendChannel.java
new file mode 100644
index 0000000..a8c4b11
--- /dev/null
+++ b/room/compiler/src/test/data/common/input/coroutines/SendChannel.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.channels;
+
+public interface SendChannel<T> {
+
+}
\ No newline at end of file
diff --git a/room/compiler/src/test/kotlin/androidx/room/processor/QueryMethodProcessorTest.kt b/room/compiler/src/test/kotlin/androidx/room/processor/QueryMethodProcessorTest.kt
index 47d039d..aee9801 100644
--- a/room/compiler/src/test/kotlin/androidx/room/processor/QueryMethodProcessorTest.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/processor/QueryMethodProcessorTest.kt
@@ -25,6 +25,7 @@
 import androidx.room.Relation
 import androidx.room.Transaction
 import androidx.room.ext.CommonTypeNames
+import androidx.room.ext.KotlinTypeNames
 import androidx.room.ext.LifecyclesTypeNames
 import androidx.room.ext.PagingTypeNames
 import androidx.room.ext.hasAnnotation
@@ -73,6 +74,7 @@
 import javax.lang.model.type.DeclaredType
 import javax.lang.model.type.TypeKind.INT
 import javax.lang.model.type.TypeMirror
+import javax.tools.JavaFileObject
 
 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
 @RunWith(Parameterized::class)
@@ -560,6 +562,48 @@
     }
 
     @Test
+    fun testBadChannelReturnForQuery() {
+        singleQueryMethod<QueryMethod>(
+            """
+                @Query("select * from user")
+                abstract ${KotlinTypeNames.CHANNEL}<User> getUsersChannel();
+                """,
+            jfos = listOf(COMMON.CHANNEL)
+        ) { _, _ ->
+        }.failsToCompile()
+            .withErrorContaining(ProcessorErrors.invalidChannelType(
+                KotlinTypeNames.CHANNEL.toString()))
+    }
+
+    @Test
+    fun testBadSendChannelReturnForQuery() {
+        singleQueryMethod<QueryMethod>(
+            """
+                @Query("select * from user")
+                abstract ${KotlinTypeNames.SEND_CHANNEL}<User> getUsersChannel();
+                """,
+            jfos = listOf(COMMON.SEND_CHANNEL)
+        ) { _, _ ->
+        }.failsToCompile()
+            .withErrorContaining(ProcessorErrors.invalidChannelType(
+                KotlinTypeNames.SEND_CHANNEL.toString()))
+    }
+
+    @Test
+    fun testBadReceiveChannelReturnForQuery() {
+        singleQueryMethod<QueryMethod>(
+            """
+                @Query("select * from user")
+                abstract ${KotlinTypeNames.RECEIVE_CHANNEL}<User> getUsersChannel();
+                """,
+            jfos = listOf(COMMON.RECEIVE_CHANNEL)
+        ) { _, _ ->
+        }.failsToCompile()
+            .withErrorContaining(ProcessorErrors.invalidChannelType(
+                KotlinTypeNames.RECEIVE_CHANNEL.toString()))
+    }
+
+    @Test
     fun query_detectTransaction_select() {
         singleQueryMethod<ReadQueryMethod>(
                 """
@@ -857,6 +901,7 @@
 
     private fun <T : QueryMethod> singleQueryMethod(
         vararg input: String,
+        jfos: Iterable<JavaFileObject> = emptyList(),
         handler: (T, TestInvocation) -> Unit
     ): CompileTester {
         return assertAbout(JavaSourcesSubjectFactory.javaSources())
@@ -866,7 +911,7 @@
                         "foo.bar.MyClass",
                         DAO_PREFIX + input.joinToString("\n") + DAO_SUFFIX
                     ), COMMON.LIVE_DATA, COMMON.COMPUTABLE_LIVE_DATA, COMMON.USER, COMMON.BOOK
-                )
+                ) + jfos
             )
             .processedWith(TestProcessor.builder()
                 .forAnnotations(
diff --git a/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt b/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt
index 1bf74f5..8e2c368 100644
--- a/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt
+++ b/room/compiler/src/test/kotlin/androidx/room/testing/test_util.kt
@@ -17,6 +17,7 @@
 import androidx.room.DatabaseView
 import androidx.room.Entity
 import androidx.room.ext.GuavaUtilConcurrentTypeNames
+import androidx.room.ext.KotlinTypeNames
 import androidx.room.ext.LifecyclesTypeNames
 import androidx.room.ext.PagingTypeNames
 import androidx.room.ext.ReactiveStreamsTypeNames
@@ -143,6 +144,21 @@
         loadJavaCode("common/input/GuavaRoom.java",
             RoomGuavaTypeNames.GUAVA_ROOM.toString())
     }
+
+    val CHANNEL by lazy {
+        loadJavaCode("common/input/coroutines/Channel.java",
+            KotlinTypeNames.CHANNEL.toString())
+    }
+
+    val SEND_CHANNEL by lazy {
+        loadJavaCode("common/input/coroutines/SendChannel.java",
+            KotlinTypeNames.SEND_CHANNEL.toString())
+    }
+
+    val RECEIVE_CHANNEL by lazy {
+        loadJavaCode("common/input/coroutines/ReceiveChannel.java",
+            KotlinTypeNames.RECEIVE_CHANNEL.toString())
+    }
 }
 fun testCodeGenScope(): CodeGenScope {
     return CodeGenScope(Mockito.mock(ClassWriter::class.java))
diff --git a/room/integration-tests/kotlintestapp/build.gradle b/room/integration-tests/kotlintestapp/build.gradle
index 61bfc6d..d247001 100644
--- a/room/integration-tests/kotlintestapp/build.gradle
+++ b/room/integration-tests/kotlintestapp/build.gradle
@@ -47,7 +47,7 @@
     implementation(project(":arch:core-runtime"))
     implementation(project(":lifecycle:lifecycle-livedata"))
     implementation(KOTLIN_STDLIB)
-    implementation(KOTLIN_COROUTINES)
+    implementation(KOTLIN_COROUTINES_PREVIEW)
     kaptAndroidTest project(":room:room-compiler")
 
     androidTestImplementation(ANDROIDX_TEST_EXT_JUNIT)
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
index 290bdb1..6cadb2f 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
@@ -43,6 +43,7 @@
 import io.reactivex.Flowable
 import io.reactivex.Maybe
 import io.reactivex.Single
+import kotlinx.coroutines.flow.Flow
 import java.util.Date
 
 @Dao
@@ -376,4 +377,11 @@
             insertBookSuspend(book)
         }
     }
+
+    @Query("SELECT * FROM book")
+    fun getBooksFlow(): Flow<List<Book>>
+
+    @Transaction
+    @Query("SELECT * FROM book")
+    fun getBooksFlowInTransaction(): Flow<List<Book>>
 }
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/FlowQueryTest.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/FlowQueryTest.kt
new file mode 100644
index 0000000..0abc393
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/FlowQueryTest.kt
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.integration.kotlintestapp.test
+
+import androidx.room.integration.kotlintestapp.vo.Book
+import androidx.test.ext.junit.runners.AndroidJUnit4
+import androidx.test.filters.MediumTest
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.async
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.buffer
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.produceIn
+import kotlinx.coroutines.flow.take
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.yield
+import org.junit.After
+import org.junit.Assert.fail
+import org.junit.Test
+import org.junit.runner.RunWith
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Phaser
+import java.util.concurrent.TimeUnit
+
+@MediumTest
+@RunWith(AndroidJUnit4::class)
+@FlowPreview
+@ExperimentalCoroutinesApi
+class FlowQueryTest : TestDatabaseTest() {
+
+    @After
+    fun teardown() {
+        // At the end of all tests, query executor should be idle.
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+        assertThat(countingTaskExecutorRule.isIdle).isTrue()
+    }
+
+    @Test
+    fun collectBooks_takeOne() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        booksDao.getBooksFlow().take(1).collect {
+            assertThat(it)
+                .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+        }
+    }
+
+    @Test
+    fun collectBooks_async() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val latch = CountDownLatch(1)
+        val job = async(Dispatchers.IO) {
+            booksDao.getBooksFlow().collect {
+                assertThat(it)
+                    .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+                latch.countDown()
+            }
+        }
+
+        latch.await()
+        job.cancelAndJoin()
+    }
+
+    @Test
+    fun receiveBooks_async_update() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val barrier = Phaser(2)
+        val results = mutableListOf<List<Book>>()
+        val job = async(Dispatchers.IO) {
+            booksDao.getBooksFlow().collect {
+                when (results.size) {
+                    0 -> {
+                        results.add(it)
+                        barrier.arrive()
+                    }
+                    1 -> {
+                        results.add(it)
+                        barrier.arrive()
+                    }
+                    else -> fail("Should have only collected 2 results.")
+                }
+            }
+        }
+
+        barrier.arriveAndAwaitAdvance()
+        booksDao.insertBookSuspend(TestUtil.BOOK_3)
+
+        barrier.arriveAndAwaitAdvance()
+        assertThat(results.size).isEqualTo(2)
+        assertThat(results[0])
+            .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+        assertThat(results[1])
+            .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2, TestUtil.BOOK_3))
+
+        job.cancelAndJoin()
+    }
+
+    @Test
+    fun receiveBooks() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val channel = booksDao.getBooksFlow().produceIn(this)
+        assertThat(channel.receive())
+            .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+        assertThat(channel.isEmpty).isTrue()
+
+        channel.cancel()
+    }
+
+    @Test
+    fun receiveBooks_update() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val channel = booksDao.getBooksFlow().produceIn(this)
+
+        assertThat(channel.receive())
+            .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+
+        booksDao.insertBookSuspend(TestUtil.BOOK_3)
+        drain() // drain async invalidate
+        yield()
+
+        assertThat(channel.receive())
+            .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2, TestUtil.BOOK_3))
+        assertThat(channel.isEmpty).isTrue()
+
+        channel.cancel()
+    }
+
+    @Test
+    fun receiveBooks_update_multipleChannels() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val channels = Array(4) {
+            booksDao.getBooksFlow().produceIn(this)
+        }
+
+        channels.forEach {
+            assertThat(it.receive())
+                .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+        }
+
+        booksDao.insertBookSuspend(TestUtil.BOOK_3)
+        drain() // drain async invalidate
+        yield()
+
+        channels.forEach {
+            assertThat(it.receive())
+                .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2, TestUtil.BOOK_3))
+            assertThat(it.isEmpty).isTrue()
+            it.cancel()
+        }
+    }
+
+    @Test
+    fun receiveBooks_update_multipleChannels_inTransaction() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val channels = Array(4) {
+            booksDao.getBooksFlowInTransaction().produceIn(this)
+        }
+
+        channels.forEach {
+            assertThat(it.receive())
+                .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+        }
+
+        booksDao.insertBookSuspend(TestUtil.BOOK_3)
+        drain() // drain async invalidate
+        yield()
+
+        channels.forEach {
+            assertThat(it.receive())
+                .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2, TestUtil.BOOK_3))
+            assertThat(it.isEmpty).isTrue()
+            it.cancel()
+        }
+    }
+
+    @Test
+    fun receiveBooks_latestUpdateOnly() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val channel = booksDao.getBooksFlow().buffer(Channel.CONFLATED).produceIn(this)
+
+        assertThat(channel.receive())
+            .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+
+        booksDao.insertBookSuspend(TestUtil.BOOK_3)
+        drain() // drain async invalidate
+        yield()
+        booksDao.deleteBookSuspend(TestUtil.BOOK_1)
+        drain() // drain async invalidate
+        yield()
+
+        assertThat(channel.receive())
+            .isEqualTo(listOf(TestUtil.BOOK_2, TestUtil.BOOK_3))
+        assertThat(channel.isEmpty).isTrue()
+
+        channel.cancel()
+    }
+
+    @Test
+    fun receiveBooks_async() = runBlocking {
+        booksDao.addAuthors(TestUtil.AUTHOR_1)
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.addBooks(TestUtil.BOOK_1, TestUtil.BOOK_2)
+
+        val latch = CountDownLatch(1)
+        val job = async(Dispatchers.IO) {
+            for (result in booksDao.getBooksFlow().produceIn(this)) {
+                assertThat(result)
+                    .isEqualTo(listOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
+                latch.countDown()
+            }
+        }
+
+        latch.await()
+        job.cancelAndJoin()
+    }
+}
\ No newline at end of file
diff --git a/room/ktx/api/restricted_2.2.0-alpha01.txt b/room/ktx/api/restricted_2.2.0-alpha01.txt
index dc4a42bf..20837edc 100644
--- a/room/ktx/api/restricted_2.2.0-alpha01.txt
+++ b/room/ktx/api/restricted_2.2.0-alpha01.txt
@@ -2,11 +2,13 @@
 package androidx.room {
 
   @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public final class CoroutinesRoom {
+    method public static <R> kotlinx.coroutines.flow.Flow<R> createFlow(androidx.room.RoomDatabase db, boolean inTransaction, String![] tableNames, java.util.concurrent.Callable<R> callable);
     method public static suspend <R> Object! execute(androidx.room.RoomDatabase p, boolean db, java.util.concurrent.Callable<R> inTransaction, kotlin.coroutines.Continuation<? super R> callable);
     field public static final androidx.room.CoroutinesRoom.Companion! Companion;
   }
 
   public static final class CoroutinesRoom.Companion {
+    method public <R> kotlinx.coroutines.flow.Flow<R> createFlow(androidx.room.RoomDatabase db, boolean inTransaction, String![] tableNames, java.util.concurrent.Callable<R> callable);
     method public suspend <R> Object! execute(androidx.room.RoomDatabase db, boolean inTransaction, java.util.concurrent.Callable<R> callable, kotlin.coroutines.Continuation<? super R> p);
   }
 
diff --git a/room/ktx/api/restricted_2.2.0-alpha02.txt b/room/ktx/api/restricted_2.2.0-alpha02.txt
index dc4a42bf..20837edc 100644
--- a/room/ktx/api/restricted_2.2.0-alpha02.txt
+++ b/room/ktx/api/restricted_2.2.0-alpha02.txt
@@ -2,11 +2,13 @@
 package androidx.room {
 
   @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public final class CoroutinesRoom {
+    method public static <R> kotlinx.coroutines.flow.Flow<R> createFlow(androidx.room.RoomDatabase db, boolean inTransaction, String![] tableNames, java.util.concurrent.Callable<R> callable);
     method public static suspend <R> Object! execute(androidx.room.RoomDatabase p, boolean db, java.util.concurrent.Callable<R> inTransaction, kotlin.coroutines.Continuation<? super R> callable);
     field public static final androidx.room.CoroutinesRoom.Companion! Companion;
   }
 
   public static final class CoroutinesRoom.Companion {
+    method public <R> kotlinx.coroutines.flow.Flow<R> createFlow(androidx.room.RoomDatabase db, boolean inTransaction, String![] tableNames, java.util.concurrent.Callable<R> callable);
     method public suspend <R> Object! execute(androidx.room.RoomDatabase db, boolean inTransaction, java.util.concurrent.Callable<R> callable, kotlin.coroutines.Continuation<? super R> p);
   }
 
diff --git a/room/ktx/api/restricted_current.txt b/room/ktx/api/restricted_current.txt
index dc4a42bf..20837edc 100644
--- a/room/ktx/api/restricted_current.txt
+++ b/room/ktx/api/restricted_current.txt
@@ -2,11 +2,13 @@
 package androidx.room {
 
   @RestrictTo(androidx.annotation.RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public final class CoroutinesRoom {
+    method public static <R> kotlinx.coroutines.flow.Flow<R> createFlow(androidx.room.RoomDatabase db, boolean inTransaction, String![] tableNames, java.util.concurrent.Callable<R> callable);
     method public static suspend <R> Object! execute(androidx.room.RoomDatabase p, boolean db, java.util.concurrent.Callable<R> inTransaction, kotlin.coroutines.Continuation<? super R> callable);
     field public static final androidx.room.CoroutinesRoom.Companion! Companion;
   }
 
   public static final class CoroutinesRoom.Companion {
+    method public <R> kotlinx.coroutines.flow.Flow<R> createFlow(androidx.room.RoomDatabase db, boolean inTransaction, String![] tableNames, java.util.concurrent.Callable<R> callable);
     method public suspend <R> Object! execute(androidx.room.RoomDatabase db, boolean inTransaction, java.util.concurrent.Callable<R> callable, kotlin.coroutines.Continuation<? super R> p);
   }
 
diff --git a/room/ktx/build.gradle b/room/ktx/build.gradle
index 3a13204..6615260 100644
--- a/room/ktx/build.gradle
+++ b/room/ktx/build.gradle
@@ -30,9 +30,11 @@
     api(project(":room:room-common"))
     api(project(":room:room-runtime"))
     api(KOTLIN_STDLIB)
-    api(KOTLIN_COROUTINES)
+    api(KOTLIN_COROUTINES_PREVIEW)
     testImplementation(JUNIT)
     testImplementation(MOCKITO_CORE)
+    testImplementation(TRUTH)
+    testImplementation(ARCH_LIFECYCLE_LIVEDATA_CORE)
 }
 
 androidx {
diff --git a/room/ktx/src/main/java/androidx/room/CoroutinesRoom.kt b/room/ktx/src/main/java/androidx/room/CoroutinesRoom.kt
index d38ed58..c7befed 100644
--- a/room/ktx/src/main/java/androidx/room/CoroutinesRoom.kt
+++ b/room/ktx/src/main/java/androidx/room/CoroutinesRoom.kt
@@ -19,6 +19,9 @@
 import androidx.annotation.RestrictTo
 import kotlinx.coroutines.CoroutineDispatcher
 import kotlinx.coroutines.asCoroutineDispatcher
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.flow
 import kotlinx.coroutines.withContext
 import java.util.concurrent.Callable
 import kotlin.coroutines.coroutineContext
@@ -51,6 +54,38 @@
                 callable.call()
             }
         }
+
+        @JvmStatic
+        fun <R> createFlow(
+            db: RoomDatabase,
+            inTransaction: Boolean,
+            tableNames: Array<String>,
+            callable: Callable<R>
+        ): Flow<@JvmSuppressWildcards R> = flow {
+            // Observer channel receives signals from the invalidation tracker to emit queries.
+            val observerChannel = Channel<Unit>(Channel.CONFLATED)
+            val observer = object : InvalidationTracker.Observer(tableNames) {
+                override fun onInvalidated(tables: MutableSet<String>) {
+                    observerChannel.offer(Unit)
+                }
+            }
+            observerChannel.offer(Unit) // Initial signal to perform first query.
+            val flowContext = coroutineContext
+            val queryContext = if (inTransaction) db.transactionDispatcher else db.queryDispatcher
+            withContext(queryContext) {
+                db.invalidationTracker.addObserver(observer)
+                try {
+                    // Iterate until cancelled, transforming observer signals to query results to
+                    // be emitted to the flow.
+                    for (signal in observerChannel) {
+                        val result = callable.call()
+                        withContext(flowContext) { emit(result) }
+                    }
+                } finally {
+                    db.invalidationTracker.removeObserver(observer)
+                }
+            }
+        }
     }
 }
 
@@ -71,5 +106,5 @@
  */
 internal val RoomDatabase.transactionDispatcher: CoroutineDispatcher
     get() = backingFieldMap.getOrPut("TransactionDispatcher") {
-        queryExecutor.asCoroutineDispatcher()
+        transactionExecutor.asCoroutineDispatcher()
     } as CoroutineDispatcher
diff --git a/room/ktx/src/test/java/androidx/room/CoroutinesRoomTest.kt b/room/ktx/src/test/java/androidx/room/CoroutinesRoomTest.kt
new file mode 100644
index 0000000..ad1841f
--- /dev/null
+++ b/room/ktx/src/test/java/androidx/room/CoroutinesRoomTest.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room
+
+import androidx.sqlite.db.SupportSQLiteOpenHelper
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.async
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.flow.single
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.yield
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.util.concurrent.Callable
+import kotlin.coroutines.ContinuationInterceptor
+
+@FlowPreview
+@RunWith(JUnit4::class)
+class CoroutinesRoomTest {
+
+    private val database = TestDatabase()
+    private val invalidationTracker = database.invalidationTracker as TestInvalidationTracker
+
+    @Test
+    fun testCreateFlow() = testRun {
+        var callableExecuted = false
+        val flow = CoroutinesRoom.createFlow(
+            db = database,
+            inTransaction = false,
+            tableNames = arrayOf("Pet"),
+            callable = Callable { callableExecuted = true }
+        )
+
+        assertThat(invalidationTracker.observers.isEmpty()).isTrue()
+        assertThat(callableExecuted).isFalse()
+
+        val job = async {
+            flow.single()
+        }
+        yield(); yield() // yield for async and flow
+
+        assertThat(invalidationTracker.observers.size).isEqualTo(1)
+        assertThat(callableExecuted).isTrue()
+
+        job.cancelAndJoin()
+        assertThat(invalidationTracker.observers.isEmpty()).isTrue()
+    }
+
+    // Use runBlocking dispatcher as query dispatchers, keeps the tests consistent.
+    private fun testRun(block: suspend CoroutineScope.() -> Unit) = runBlocking {
+        database.backingFieldMap["QueryDispatcher"] = coroutineContext[ContinuationInterceptor]
+        block.invoke(this)
+    }
+
+    private class TestDatabase : RoomDatabase() {
+        override fun createOpenHelper(config: DatabaseConfiguration?): SupportSQLiteOpenHelper {
+            throw UnsupportedOperationException("Shouldn't be called!")
+        }
+
+        override fun createInvalidationTracker(): InvalidationTracker {
+            return TestInvalidationTracker(this)
+        }
+
+        override fun clearAllTables() {
+            throw UnsupportedOperationException("Shouldn't be called!")
+        }
+    }
+
+    private class TestInvalidationTracker(db: RoomDatabase) : InvalidationTracker(db) {
+        val observers = mutableListOf<Observer>()
+
+        override fun addObserver(observer: Observer) {
+            observers.add(observer)
+        }
+
+        override fun removeObserver(observer: Observer) {
+            observers.remove(observer)
+        }
+    }
+}