Hook up room-paging-guava to room-compiler

Test: ./gradlew :room:room-compiler:test
Test: ./gradlew :room:integration-tests:room-testapp-kotlin:cC
Bug: 203666906
Change-Id: Iaa2e7b542b4398f2e364c0b68896c046589e9a09
diff --git a/room/integration-tests/kotlintestapp/build.gradle b/room/integration-tests/kotlintestapp/build.gradle
index 00e68bc..eedd3de 100644
--- a/room/integration-tests/kotlintestapp/build.gradle
+++ b/room/integration-tests/kotlintestapp/build.gradle
@@ -100,13 +100,16 @@
     androidTestImplementation(libs.kotlinTest)
     androidTestImplementation(project(":room:room-guava"))
     androidTestImplementation(project(":room:room-testing"))
+    androidTestImplementation(project(":room:room-paging-guava"))
+    // we need latest guava android because room-paging-guava's guava-android gets override by
+    // its kotlinx-coroutines-guava dependency's guava-jre version
+    androidTestImplementation(libs.guavaAndroid)
     androidTestImplementation(project(":room:room-rxjava2"))
     androidTestImplementation(project(":room:room-rxjava3"))
     androidTestImplementation(project(":room:room-ktx"))
     androidTestImplementation(project(":internal-testutils-common"))
     androidTestImplementation("androidx.arch.core:core-testing:2.0.1")
     androidTestImplementation("androidx.paging:paging-runtime:3.1.1")
-    androidTestImplementation(libs.guavaAndroid)
     androidTestImplementation(libs.rxjava2)
     testImplementation(libs.mockitoCore)
 }
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/ListenableFuturePagingSourceTest.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/ListenableFuturePagingSourceTest.kt
new file mode 100644
index 0000000..7eec936
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/ListenableFuturePagingSourceTest.kt
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.integration.kotlintestapp.test
+
+import androidx.paging.ListenableFuturePagingSource
+import androidx.paging.Pager
+import androidx.paging.PagingState
+import androidx.room.Room
+import androidx.room.androidx.room.integration.kotlintestapp.testutil.ItemStore
+import androidx.room.androidx.room.integration.kotlintestapp.testutil.PagingDb
+import androidx.room.androidx.room.integration.kotlintestapp.testutil.PagingEntity
+import androidx.test.core.app.ApplicationProvider
+import androidx.test.ext.junit.runners.AndroidJUnit4
+import androidx.test.filters.SmallTest
+import androidx.testutils.FilteringExecutor
+import com.google.common.truth.Truth.assertThat
+import com.google.common.util.concurrent.ListenableFuture
+import java.util.concurrent.Executors
+import kotlin.test.assertFailsWith
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.flow.collectLatest
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+
+@RunWith(AndroidJUnit4::class)
+@SmallTest
+class ListenableFuturePagingSourceTest {
+
+    private lateinit var coroutineScope: CoroutineScope
+    private lateinit var db: PagingDb
+    private lateinit var itemStore: ItemStore
+
+    // Multiple threads are necessary to prevent deadlock, since Room will acquire a thread to
+    // dispatch on, when using the query / transaction dispatchers.
+    private val queryExecutor = FilteringExecutor(Executors.newFixedThreadPool(2))
+    private val mainThreadQueries = mutableListOf<Pair<String, String>>()
+    private val pagingSources = mutableListOf<ListenableFuturePagingSourceImpl>()
+
+    @Before
+    fun init() {
+        coroutineScope = CoroutineScope(Dispatchers.Main)
+        itemStore = ItemStore(coroutineScope)
+
+        val mainThread: Thread = runBlocking(Dispatchers.Main) {
+            Thread.currentThread()
+        }
+        db = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(),
+            PagingDb::class.java
+        ).setQueryCallback(
+            { sqlQuery, _ ->
+                if (Thread.currentThread() === mainThread) {
+                    mainThreadQueries.add(
+                        sqlQuery to Throwable().stackTraceToString()
+                    )
+                }
+            },
+            {
+                // instantly execute the log callback so that we can check the thread.
+                it.run()
+            }
+        ).setQueryExecutor(queryExecutor)
+            .build()
+    }
+
+    @After
+    fun tearDown() {
+        // Check no mainThread queries happened.
+        assertThat(mainThreadQueries).isEmpty()
+        coroutineScope.cancel()
+    }
+
+    @Test
+    fun refresh_canceledCoroutine_cancelsFuture() {
+        val items = createItems(startId = 0, count = 90)
+        db.dao.insert(items)
+
+        // filter right away to block initial load
+        queryExecutor.filterFunction = { runnable ->
+            // filtering out the transform async function called inside loadFuture
+            // filtering as String b/c `AbstractTransformFuture` is a package-private class
+            !runnable.javaClass.enclosingClass.toString()
+                .contains("AbstractTransformFuture")
+        }
+
+        runTest {
+            val expectError = assertFailsWith<AssertionError> {
+                itemStore.awaitInitialLoad(timeOutDuration = 2)
+            }
+            assertThat(expectError.message).isEqualTo("didn't complete in expected time")
+
+            val futures = pagingSources[0].futures
+            assertThat(futures.size).isEqualTo(1)
+            assertThat(futures[0].isDone).isFalse() // initial load future is pending
+
+            // now cancel collection which should also cancel the future
+            coroutineScope.cancel()
+
+            // just making sure no new futures are created, and ensuring that the pending future
+            // is now cancelled
+            assertThat(futures.size).isEqualTo(1)
+            assertThat(futures[0].isCancelled).isTrue()
+            assertThat(futures[0].isDone).isTrue()
+        }
+    }
+
+    @Test
+    fun append_canceledCoroutine_cancelsFuture() {
+        val items = createItems(startId = 0, count = 90)
+        db.dao.insert(items)
+
+        runTest {
+            itemStore.awaitInitialLoad()
+
+            val futures = pagingSources[0].futures
+            assertThat(futures.size).isEqualTo(1)
+            assertThat(futures[0].isDone).isTrue() // initial load future is complete
+
+            queryExecutor.filterFunction = { runnable ->
+                // filtering out the transform async function called inside loadFuture
+                // filtering as String b/c `AbstractTransformFuture` is a package-private class
+                !runnable.javaClass.enclosingClass.toString()
+                    .contains("AbstractTransformFuture")
+            }
+
+            // now access more items that should trigger loading more
+            withContext(Dispatchers.Main) {
+                itemStore.get(10)
+            }
+
+            // await should fail because we have blocked the paging source' async function,
+            // which calls nonInitialLoad in this case, from executing
+            val expectError = assertFailsWith<AssertionError> {
+                assertThat(itemStore.awaitItem(index = 10, timeOutDuration = 2))
+                    .isEqualTo(items[10])
+            }
+            assertThat(expectError.message).isEqualTo("didn't complete in expected time")
+            queryExecutor.awaitDeferredSizeAtLeast(1)
+
+            // even though the load runnable was blocked, a new future should have been returned
+            assertThat(futures.size).isEqualTo(2)
+            // ensure future is pending
+            assertThat(futures[1].isDone).isFalse()
+
+            // now cancel collection which should also cancel the future
+            coroutineScope.cancel()
+
+            // just making sure no new futures are created, and ensuring that the pending future
+            // is now cancelled
+            assertThat(futures.size).isEqualTo(2)
+            assertThat(futures[1].isCancelled).isTrue()
+            assertThat(futures[1].isDone).isTrue()
+        }
+    }
+
+    @Test
+    fun prepend_canceledCoroutine_cancelsFuture() {
+        val items = createItems(startId = 0, count = 90)
+        db.dao.insert(items)
+
+        runTest {
+            itemStore.awaitInitialLoad()
+
+            val futures = pagingSources[0].futures
+            assertThat(futures.size).isEqualTo(1)
+            assertThat(futures[0].isDone).isTrue() // initial load future is complete
+
+            queryExecutor.filterFunction = { runnable ->
+                // filtering out the transform async function called inside loadFuture
+                // filtering as String b/c `AbstractTransformFuture` is a package-private class
+                !runnable.javaClass.enclosingClass.toString()
+                    .contains("AbstractTransformFuture")
+            }
+
+            // now access more items that should trigger loading more
+            withContext(Dispatchers.Main) {
+                itemStore.get(40)
+            }
+
+            // await should fail because we have blocked the paging source' async function,
+            // which calls nonInitialLoad in this case, from executing
+            val expectError = assertFailsWith<AssertionError> {
+                assertThat(itemStore.awaitItem(index = 40, timeOutDuration = 2))
+                    .isEqualTo(items[40])
+            }
+            assertThat(expectError.message).isEqualTo("didn't complete in expected time")
+            queryExecutor.awaitDeferredSizeAtLeast(1)
+
+            // even though the load runnable was blocked, a new future should have been returned
+            assertThat(futures.size).isEqualTo(2)
+            // ensure future is pending
+            assertThat(futures[1].isDone).isFalse()
+
+            // now cancel collection which should also cancel the future
+            coroutineScope.cancel()
+
+            // just making sure no new futures are created, and ensuring that the pending future
+            // is now cancelled
+            assertThat(futures.size).isEqualTo(2)
+            assertThat(futures[1].isCancelled).isTrue()
+            assertThat(futures[1].isDone).isTrue()
+        }
+    }
+
+    private fun runTest(
+        pager: Pager<Int, PagingEntity> =
+            Pager(config = CONFIG) {
+                val baseSource = db.dao.loadItemsListenableFuture()
+                // to get access to the futures returned from loadFuture. Also to
+                // mimic real use case of wrapping the source returned from Room.
+                ListenableFuturePagingSourceImpl(baseSource).also { pagingSources.add(it) }
+            },
+        block: suspend () -> Unit
+    ) {
+        val collection = coroutineScope.launch(Dispatchers.Main) {
+            pager.flow.collectLatest {
+                itemStore.collectFrom(it)
+            }
+        }
+        runBlocking {
+            try {
+                block()
+            } finally {
+                collection.cancelAndJoin()
+            }
+        }
+    }
+}
+
+private class ListenableFuturePagingSourceImpl(
+    private val baseSource: ListenableFuturePagingSource<Int, PagingEntity>
+) : ListenableFuturePagingSource<Int, PagingEntity>() {
+
+    val futures = mutableListOf<ListenableFuture<LoadResult<Int, PagingEntity>>>()
+
+    override fun getRefreshKey(state: PagingState<Int, PagingEntity>): Int? {
+        return baseSource.getRefreshKey(state)
+    }
+
+    override fun loadFuture(params: LoadParams<Int>):
+        ListenableFuture<LoadResult<Int, PagingEntity>> {
+            return baseSource.loadFuture(params).also { futures.add(it) }
+    }
+}
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/MultiTypedPagingSourceTest.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/MultiTypedPagingSourceTest.kt
index f80e077..df7a5ec 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/MultiTypedPagingSourceTest.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/MultiTypedPagingSourceTest.kt
@@ -527,7 +527,10 @@
          */
         @Parameterized.Parameters(name = "pagingSourceFactory={0}")
         @JvmStatic
-        fun parameters() = listOf(PagingEntityDao::loadItems)
+        fun parameters() = listOf(
+            PagingEntityDao::loadItems,
+            PagingEntityDao::loadItemsListenableFuture
+        )
     }
 }
 
@@ -730,7 +733,10 @@
          */
         @Parameterized.Parameters(name = "pagingSourceFactory={0}")
         @JvmStatic
-        fun parameters() = listOf(PagingEntityDao::loadItemsRaw)
+        fun parameters() = listOf(
+            PagingEntityDao::loadItemsRaw,
+            PagingEntityDao::loadItemsRawListenableFuture
+        )
     }
 }
 
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/ItemStore.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/ItemStore.kt
index 0cacc49..c232edb 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/ItemStore.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/ItemStore.kt
@@ -102,10 +102,10 @@
     }
 
     @OptIn(ExperimentalCoroutinesApi::class)
-    suspend fun awaitItem(index: Int): PagingEntity = withTestTimeout {
-        generation.mapLatest {
-            asyncDiffer.peek(index)
-        }.filterNotNull().first()
+    suspend fun awaitItem(index: Int, timeOutDuration: Long = 3): PagingEntity =
+        withTestTimeout(timeOutDuration) {
+            generation.mapLatest { asyncDiffer.peek(index) }
+        .filterNotNull().first()
     }
 
     suspend fun collectFrom(data: PagingData<PagingEntity>) {
@@ -128,8 +128,8 @@
         }
     }
 
-    suspend fun awaitInitialLoad(): ItemSnapshotList<PagingEntity> =
-        withTestTimeout {
+    suspend fun awaitInitialLoad(timeOutDuration: Long = 3): ItemSnapshotList<PagingEntity> =
+        withTestTimeout(timeOutDuration) {
             withContext(Dispatchers.Main) {
                 generation.filter { it.initialLoadCompleted }.first()
                 asyncDiffer.snapshot()
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntityDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntityDao.kt
index 658d150..46d3fd1 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntityDao.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntityDao.kt
@@ -16,6 +16,7 @@
 
 package androidx.room.androidx.room.integration.kotlintestapp.testutil
 
+import androidx.paging.ListenableFuturePagingSource
 import androidx.paging.PagingSource
 import androidx.room.Dao
 import androidx.room.Insert
@@ -39,4 +40,11 @@
 
     @RawQuery(observedEntities = [PagingEntity::class])
     fun loadItemsRaw(query: SupportSQLiteQuery): PagingSource<Int, PagingEntity>
+
+    @Query("SELECT * FROM PagingEntity ORDER BY id ASC")
+    fun loadItemsListenableFuture(): ListenableFuturePagingSource<Int, PagingEntity>
+
+    @RawQuery(observedEntities = [PagingEntity::class])
+    fun loadItemsRawListenableFuture(query: SupportSQLiteQuery):
+        ListenableFuturePagingSource<Int, PagingEntity>
 }
diff --git a/room/room-compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt b/room/room-compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
index 1d21c1a..33c58fa 100644
--- a/room/room-compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
+++ b/room/room-compiler/src/main/kotlin/androidx/room/ext/javapoet_ext.kt
@@ -110,6 +110,8 @@
         ClassName.get(PAGING_PACKAGE, "DataSource", "Factory")
     val PAGING_SOURCE: ClassName =
         ClassName.get(PAGING_PACKAGE, "PagingSource")
+    val LISTENABLE_FUTURE_PAGING_SOURCE: ClassName =
+        ClassName.get(PAGING_PACKAGE, "ListenableFuturePagingSource")
 }
 
 object LifecyclesTypeNames {
@@ -198,6 +200,14 @@
         ClassName.get("$ROOM_PACKAGE.paging", "LimitOffsetPagingSource")
 }
 
+object RoomPagingGuavaTypeNames {
+    val LIMIT_OFFSET_LISTENABLE_FUTURE_PAGING_SOURCE: ClassName =
+        ClassName.get(
+            "$ROOM_PACKAGE.paging.guava",
+            "LimitOffsetListenableFuturePagingSource"
+        )
+}
+
 object RoomCoroutinesTypeNames {
     val COROUTINES_ROOM = ClassName.get(ROOM_PACKAGE, "CoroutinesRoom")
 }
diff --git a/room/room-compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt b/room/room-compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
index c48afce..b8344d5 100644
--- a/room/room-compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
+++ b/room/room-compiler/src/main/kotlin/androidx/room/processor/ProcessorErrors.kt
@@ -619,6 +619,10 @@
     val MISSING_ROOM_PAGING_ARTIFACT = "To use PagingSource, you must add `room-paging`" +
         " artifact from Room as a dependency. androidx.room:room-paging:<version>"
 
+    val MISSING_ROOM_PAGING_GUAVA_ARTIFACT = "To use ListenableFuturePagingSource, you must " +
+        "add `room-paging-guava` artifact from Room as a dependency. " +
+        "androidx.room:room-paging-guava:<version>"
+
     val MISSING_ROOM_COROUTINE_ARTIFACT = "To use Coroutine features, you must add `ktx`" +
         " artifact from Room as a dependency. androidx.room:room-ktx:<version>"
 
diff --git a/room/room-compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt b/room/room-compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt
index bd8e0f7..8f16715 100644
--- a/room/room-compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt
+++ b/room/room-compiler/src/main/kotlin/androidx/room/solver/TypeAdapterStore.kt
@@ -44,6 +44,7 @@
 import androidx.room.solver.binderprovider.DataSourceQueryResultBinderProvider
 import androidx.room.solver.binderprovider.GuavaListenableFutureQueryResultBinderProvider
 import androidx.room.solver.binderprovider.InstantQueryResultBinderProvider
+import androidx.room.solver.binderprovider.ListenableFuturePagingSourceQueryResultBinderProvider
 import androidx.room.solver.binderprovider.LiveDataQueryResultBinderProvider
 import androidx.room.solver.binderprovider.PagingSourceQueryResultBinderProvider
 import androidx.room.solver.binderprovider.RxCallableQueryResultBinderProvider
@@ -200,6 +201,7 @@
             addAll(RxCallableQueryResultBinderProvider.getAll(context))
             add(DataSourceQueryResultBinderProvider(context))
             add(DataSourceFactoryQueryResultBinderProvider(context))
+            add(ListenableFuturePagingSourceQueryResultBinderProvider(context))
             add(PagingSourceQueryResultBinderProvider(context))
             add(CoroutineFlowResultBinderProvider(context))
             add(InstantQueryResultBinderProvider(context))
diff --git a/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/ListenableFuturePagingSourceQueryResultBinderProvider.kt b/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/ListenableFuturePagingSourceQueryResultBinderProvider.kt
new file mode 100644
index 0000000..8b78461
--- /dev/null
+++ b/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/ListenableFuturePagingSourceQueryResultBinderProvider.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.solver.binderprovider
+
+import androidx.room.ext.PagingTypeNames
+import androidx.room.ext.RoomPagingGuavaTypeNames
+import androidx.room.processor.Context
+import androidx.room.processor.ProcessorErrors.MISSING_ROOM_PAGING_GUAVA_ARTIFACT
+import androidx.room.solver.QueryResultBinderProvider
+
+fun ListenableFuturePagingSourceQueryResultBinderProvider(
+    context: Context
+): QueryResultBinderProvider {
+    val limitOffsetListenableFuturePagingSource =
+        RoomPagingGuavaTypeNames.LIMIT_OFFSET_LISTENABLE_FUTURE_PAGING_SOURCE
+
+    return MultiTypedPagingSourceQueryResultBinderProvider(
+        context = context,
+        roomPagingClassName = limitOffsetListenableFuturePagingSource,
+        pagingSourceTypeName = PagingTypeNames.LISTENABLE_FUTURE_PAGING_SOURCE
+    ).requireArtifact(
+        context = context,
+        requiredType = limitOffsetListenableFuturePagingSource,
+        missingArtifactErrorMsg = MISSING_ROOM_PAGING_GUAVA_ARTIFACT
+    )
+}
diff --git a/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/MultiTypedPagingSourceQueryResultBinderProvider.kt b/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/MultiTypedPagingSourceQueryResultBinderProvider.kt
new file mode 100644
index 0000000..eb300a8
--- /dev/null
+++ b/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/MultiTypedPagingSourceQueryResultBinderProvider.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.solver.binderprovider
+
+import androidx.room.compiler.processing.XRawType
+import androidx.room.compiler.processing.XType
+import androidx.room.parser.ParsedQuery
+import androidx.room.processor.Context
+import androidx.room.processor.ProcessorErrors
+import androidx.room.solver.QueryResultBinderProvider
+import androidx.room.solver.TypeAdapterExtras
+import androidx.room.solver.query.result.ListQueryResultAdapter
+import androidx.room.solver.query.result.MultiTypedPagingSourceQueryResultBinder
+import androidx.room.solver.query.result.QueryResultBinder
+import com.squareup.javapoet.ClassName
+import com.squareup.javapoet.TypeName
+
+class MultiTypedPagingSourceQueryResultBinderProvider(
+    private val context: Context,
+    private val roomPagingClassName: ClassName,
+    pagingSourceTypeName: TypeName,
+) : QueryResultBinderProvider {
+
+    private val pagingSourceType: XRawType? by lazy {
+        context.processingEnv.findType(pagingSourceTypeName)?.rawType
+    }
+
+    override fun provide(
+        declared: XType,
+        query: ParsedQuery,
+        extras: TypeAdapterExtras
+    ): QueryResultBinder {
+        if (query.tables.isEmpty()) {
+            context.logger.e(ProcessorErrors.OBSERVABLE_QUERY_NOTHING_TO_OBSERVE)
+        }
+        val typeArg = declared.typeArguments.last()
+        val listAdapter = context.typeAdapterStore.findRowAdapter(typeArg, query)?.let {
+            ListQueryResultAdapter(typeArg, it)
+        }
+        val tableNames = (
+            (listAdapter?.accessedTableNames() ?: emptyList()) +
+                query.tables.map { it.name }
+            ).toSet()
+
+        return MultiTypedPagingSourceQueryResultBinder(
+            listAdapter = listAdapter,
+            tableNames = tableNames,
+            className = roomPagingClassName
+        )
+    }
+
+    override fun matches(declared: XType): Boolean {
+        val collectionTypeRaw = context.COMMON_TYPES.READONLY_COLLECTION.rawType
+
+        if (pagingSourceType == null) {
+            return false
+        }
+
+        if (declared.typeArguments.isEmpty()) {
+            return false
+        }
+
+        if (!pagingSourceType!!.isAssignableFrom(declared)) {
+            return false
+        }
+
+        if (declared.typeArguments.first().typeName != TypeName.INT.box()) {
+            context.logger.e(ProcessorErrors.PAGING_SPECIFY_PAGING_SOURCE_TYPE)
+        }
+
+        if (collectionTypeRaw.isAssignableFrom(declared.typeArguments.last().rawType)) {
+            context.logger.e(ProcessorErrors.PAGING_SPECIFY_PAGING_SOURCE_VALUE_TYPE)
+        }
+
+        return true
+    }
+}
\ No newline at end of file
diff --git a/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/PagingSourceQueryResultBinderProvider.kt b/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/PagingSourceQueryResultBinderProvider.kt
index 32add64..dfb9ee6 100644
--- a/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/PagingSourceQueryResultBinderProvider.kt
+++ b/room/room-compiler/src/main/kotlin/androidx/room/solver/binderprovider/PagingSourceQueryResultBinderProvider.kt
@@ -16,83 +16,23 @@
 
 package androidx.room.solver.binderprovider
 
-import androidx.room.compiler.processing.XRawType
-import androidx.room.compiler.processing.XType
 import androidx.room.ext.PagingTypeNames
 import androidx.room.ext.RoomPagingTypeNames
-import androidx.room.parser.ParsedQuery
 import androidx.room.processor.Context
 import androidx.room.processor.ProcessorErrors
 import androidx.room.solver.QueryResultBinderProvider
-import androidx.room.solver.TypeAdapterExtras
-import androidx.room.solver.query.result.MultiTypedPagingSourceQueryResultBinder
-import androidx.room.solver.query.result.ListQueryResultAdapter
-import androidx.room.solver.query.result.QueryResultBinder
-import com.squareup.javapoet.TypeName
 
 @Suppress("FunctionName")
-fun PagingSourceQueryResultBinderProvider(context: Context): QueryResultBinderProvider =
-    PagingSourceQueryResultBinderProviderImpl(
-        context = context
+fun PagingSourceQueryResultBinderProvider(context: Context): QueryResultBinderProvider {
+    val limitOffsetPagingSource = RoomPagingTypeNames.LIMIT_OFFSET_PAGING_SOURCE
+
+    return MultiTypedPagingSourceQueryResultBinderProvider(
+        context = context,
+        roomPagingClassName = limitOffsetPagingSource,
+        pagingSourceTypeName = PagingTypeNames.PAGING_SOURCE
     ).requireArtifact(
         context = context,
-        requiredType = RoomPagingTypeNames.LIMIT_OFFSET_PAGING_SOURCE,
+        requiredType = limitOffsetPagingSource,
         missingArtifactErrorMsg = ProcessorErrors.MISSING_ROOM_PAGING_ARTIFACT
     )
-
-private class PagingSourceQueryResultBinderProviderImpl(
-    val context: Context
-) : QueryResultBinderProvider {
-    private val pagingSourceType: XRawType? by lazy {
-        context.processingEnv.findType(PagingTypeNames.PAGING_SOURCE)?.rawType
-    }
-
-    override fun provide(
-        declared: XType,
-        query: ParsedQuery,
-        extras: TypeAdapterExtras
-    ): QueryResultBinder {
-        if (query.tables.isEmpty()) {
-            context.logger.e(ProcessorErrors.OBSERVABLE_QUERY_NOTHING_TO_OBSERVE)
-        }
-        val typeArg = declared.typeArguments.last()
-        val listAdapter = context.typeAdapterStore.findRowAdapter(typeArg, query)?.let {
-            ListQueryResultAdapter(typeArg, it)
-        }
-        val tableNames = (
-            (listAdapter?.accessedTableNames() ?: emptyList()) +
-                query.tables.map { it.name }
-            ).toSet()
-        return MultiTypedPagingSourceQueryResultBinder(
-            listAdapter = listAdapter,
-            tableNames = tableNames,
-            className = RoomPagingTypeNames.LIMIT_OFFSET_PAGING_SOURCE
-        )
-    }
-
-    override fun matches(declared: XType): Boolean {
-        val collectionTypeRaw = context.COMMON_TYPES.READONLY_COLLECTION.rawType
-
-        if (pagingSourceType == null) {
-            return false
-        }
-
-        if (declared.typeArguments.isEmpty()) {
-            return false
-        }
-
-        if (!pagingSourceType!!.isAssignableFrom(declared)) {
-            return false
-        }
-
-        if (declared.typeArguments.first().typeName != TypeName.INT.box()) {
-            context.logger.e(ProcessorErrors.PAGING_SPECIFY_PAGING_SOURCE_TYPE)
-        }
-
-        if (collectionTypeRaw.isAssignableFrom(declared.typeArguments.last().rawType)) {
-            context.logger.e(ProcessorErrors.PAGING_SPECIFY_PAGING_SOURCE_VALUE_TYPE)
-        }
-
-        return true
-    }
 }
diff --git a/room/room-compiler/src/test/data/common/input/LimitOffsetListenableFuturePagingSource.java b/room/room-compiler/src/test/data/common/input/LimitOffsetListenableFuturePagingSource.java
new file mode 100644
index 0000000..bf4934e
--- /dev/null
+++ b/room/room-compiler/src/test/data/common/input/LimitOffsetListenableFuturePagingSource.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.paging.guava;
+
+public abstract class LimitOffsetListenableFuturePagingSource<T>
+        extends androidx.paging.ListenableFuturePagingSource<Integer, T> {
+
+}
diff --git a/room/room-compiler/src/test/data/common/input/ListenableFuturePagingSource.java b/room/room-compiler/src/test/data/common/input/ListenableFuturePagingSource.java
new file mode 100644
index 0000000..7343d49
--- /dev/null
+++ b/room/room-compiler/src/test/data/common/input/ListenableFuturePagingSource.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.paging;
+
+public abstract class ListenableFuturePagingSource<Integer, T>
+        extends androidx.paging.PagingSource<Integer, T> {
+
+}
diff --git a/room/room-compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt b/room/room-compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt
index 3208dfa..6096d8b 100644
--- a/room/room-compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt
+++ b/room/room-compiler/src/test/kotlin/androidx/room/solver/TypeAdapterStoreTest.kt
@@ -46,6 +46,7 @@
 import androidx.room.processor.ProcessorErrors
 import androidx.room.solver.binderprovider.DataSourceFactoryQueryResultBinderProvider
 import androidx.room.solver.binderprovider.DataSourceQueryResultBinderProvider
+import androidx.room.solver.binderprovider.ListenableFuturePagingSourceQueryResultBinderProvider
 import androidx.room.solver.binderprovider.LiveDataQueryResultBinderProvider
 import androidx.room.solver.binderprovider.PagingSourceQueryResultBinderProvider
 import androidx.room.solver.binderprovider.RxQueryResultBinderProvider
@@ -523,6 +524,29 @@
     }
 
     @Test
+    fun testMissingRoomPagingGuava() {
+        runProcessorTest(
+            sources = listOf(COMMON.LISTENABLE_FUTURE_PAGING_SOURCE)
+        ) { invocation ->
+            val listenableFuturePagingSourceElement = invocation.processingEnv
+                .requireTypeElement(PagingTypeNames.LISTENABLE_FUTURE_PAGING_SOURCE)
+            val intType = invocation.processingEnv.requireType(Integer::class)
+            val listenableFuturePagingSourceIntIntType = invocation.processingEnv
+                .getDeclaredType(listenableFuturePagingSourceElement, intType, intType)
+
+            assertThat(listenableFuturePagingSourceElement, notNullValue())
+            assertThat(
+                ListenableFuturePagingSourceQueryResultBinderProvider(invocation.context)
+                    .matches(listenableFuturePagingSourceIntIntType),
+                `is`(true)
+            )
+            invocation.assertCompilationResult {
+                hasError(ProcessorErrors.MISSING_ROOM_PAGING_GUAVA_ARTIFACT)
+            }
+        }
+    }
+
+    @Test
     fun testFindPublisher() {
         listOf(
             COMMON.RX2_FLOWABLE to COMMON.RX2_ROOM,
@@ -862,6 +886,52 @@
     }
 
     @Test
+    fun findListenableFuturePagingSourceJavaCollectionValue() {
+        runProcessorTest(
+            sources = listOf(COMMON.LISTENABLE_FUTURE_PAGING_SOURCE)
+        ) { invocation ->
+            val listenableFuturePagingSourceElement = invocation.processingEnv
+                .requireTypeElement(PagingTypeNames.LISTENABLE_FUTURE_PAGING_SOURCE)
+            val intType = invocation.processingEnv.requireType(Integer::class)
+            val collectionType = invocation.processingEnv.requireType("java.util.Collection")
+            val listenableFuturePagingSourceIntCollectionType = invocation.processingEnv
+                .getDeclaredType(listenableFuturePagingSourceElement, intType, collectionType)
+
+            assertThat(listenableFuturePagingSourceIntCollectionType).isNotNull()
+            assertThat(
+                ListenableFuturePagingSourceQueryResultBinderProvider(invocation.context)
+                    .matches(listenableFuturePagingSourceIntCollectionType)
+            ).isTrue()
+            invocation.assertCompilationResult {
+                hasError(ProcessorErrors.PAGING_SPECIFY_PAGING_SOURCE_VALUE_TYPE)
+            }
+        }
+    }
+
+    @Test
+    fun findListenableFutureKotlinCollectionValue() {
+        runProcessorTest(
+            sources = listOf(COMMON.LISTENABLE_FUTURE_PAGING_SOURCE)
+        ) { invocation ->
+            val listenableFuturePagingSourceElement = invocation.processingEnv
+                .requireTypeElement(PagingTypeNames.LISTENABLE_FUTURE_PAGING_SOURCE)
+            val intType = invocation.processingEnv.requireType(Integer::class)
+            val kotlinCollectionType = invocation.processingEnv.requireType(Collection::class)
+            val listenableFuturePagingSourceIntCollectionType = invocation.processingEnv
+                .getDeclaredType(listenableFuturePagingSourceElement, intType, kotlinCollectionType)
+
+            assertThat(listenableFuturePagingSourceIntCollectionType).isNotNull()
+            assertThat(
+                ListenableFuturePagingSourceQueryResultBinderProvider(invocation.context)
+                    .matches(listenableFuturePagingSourceIntCollectionType)
+            ).isTrue()
+            invocation.assertCompilationResult {
+                hasError(ProcessorErrors.PAGING_SPECIFY_PAGING_SOURCE_VALUE_TYPE)
+            }
+        }
+    }
+
+    @Test
     fun testPagingSourceBinder() {
         val inputSource =
             Source.java(
@@ -882,6 +952,7 @@
                 COMMON.USER,
                 COMMON.PAGING_SOURCE,
                 COMMON.LIMIT_OFFSET_PAGING_SOURCE,
+                COMMON.LISTENABLE_FUTURE_PAGING_SOURCE,
             ),
         ) { invocation: XTestInvocation ->
             val dao = invocation.roundEnv
@@ -906,6 +977,63 @@
                 .filterIsInstance<ReadQueryMethod>().first().returnType.rawType
             // make sure returned type is the original PagingSource
             assertThat(returnedXRawType).isEqualTo(pagingSourceXRawType)
+
+            val listenableFuturePagingSourceXRawType: XRawType? = invocation.context.processingEnv
+                .findType(PagingTypeNames.LISTENABLE_FUTURE_PAGING_SOURCE)?.rawType
+            assertThat(listenableFuturePagingSourceXRawType!!.isAssignableFrom(returnedXRawType))
+                .isFalse()
+        }
+    }
+
+    @Test
+    fun testListenableFuturePagingSourceBinder() {
+        val inputSource =
+            Source.java(
+                qName = "foo.bar.MyDao",
+                code =
+                """
+                ${DaoProcessorTest.DAO_PREFIX}
+
+                @Dao abstract class MyDao {
+                    @Query("SELECT uid FROM User")
+                    abstract androidx.paging.ListenableFuturePagingSource<Integer, User> getAllIds();
+                }
+                    """.trimIndent()
+            )
+        runProcessorTest(
+            sources = listOf(
+                inputSource,
+                COMMON.USER,
+                COMMON.LISTENABLE_FUTURE_PAGING_SOURCE,
+                COMMON.LIMIT_OFFSET_LISTENABLE_FUTURE_PAGING_SOURCE,
+            ),
+        ) { invocation: XTestInvocation ->
+            val dao = invocation.roundEnv
+                .getElementsAnnotatedWith(
+                    Dao::class.qualifiedName!!
+                ).first()
+            check(dao.isTypeElement())
+            val dbType = invocation.context.processingEnv
+                .requireType(RoomTypeNames.ROOM_DB)
+            val parser = DaoProcessor(
+                invocation.context,
+                dao, dbType, null,
+            )
+            val parsedDao = parser.process()
+            val binder = parsedDao.queryMethods.filterIsInstance<ReadQueryMethod>()
+                .first().queryResultBinder
+
+            // assert that room correctly binds to ListenableFuturePagingSource instead of
+            // its supertype PagingSource. ListenableFuturePagingSourceBinderProvider
+            // must be added into list of binder providers in TypeAdapterStore before
+            // generic PagingSource.
+            assertThat(binder is MultiTypedPagingSourceQueryResultBinder).isTrue()
+            val listenableFuturePagingSourceXRawType: XRawType? = invocation.context.processingEnv
+                .findType(PagingTypeNames.LISTENABLE_FUTURE_PAGING_SOURCE)?.rawType
+            val returnedXRawType = parsedDao.queryMethods
+                .filterIsInstance<ReadQueryMethod>().first().returnType.rawType
+            // make sure the actual returned type from Provider is ListenableFuturePagingSource
+            assertThat(returnedXRawType).isEqualTo(listenableFuturePagingSourceXRawType)
         }
     }
 
diff --git a/room/room-compiler/src/test/kotlin/androidx/room/testing/test_util.kt b/room/room-compiler/src/test/kotlin/androidx/room/testing/test_util.kt
index aede69a..0b76f25 100644
--- a/room/room-compiler/src/test/kotlin/androidx/room/testing/test_util.kt
+++ b/room/room-compiler/src/test/kotlin/androidx/room/testing/test_util.kt
@@ -29,6 +29,7 @@
 import androidx.room.ext.ReactiveStreamsTypeNames
 import androidx.room.ext.RoomCoroutinesTypeNames
 import androidx.room.ext.RoomGuavaTypeNames
+import androidx.room.ext.RoomPagingGuavaTypeNames
 import androidx.room.ext.RoomPagingTypeNames
 import androidx.room.ext.RoomRxJava2TypeNames
 import androidx.room.ext.RoomRxJava3TypeNames
@@ -232,6 +233,20 @@
         )
     }
 
+    val LISTENABLE_FUTURE_PAGING_SOURCE by lazy {
+        loadJavaCode(
+            "common/input/ListenableFuturePagingSource.java",
+            PagingTypeNames.LISTENABLE_FUTURE_PAGING_SOURCE.toString()
+        )
+    }
+
+    val LIMIT_OFFSET_LISTENABLE_FUTURE_PAGING_SOURCE by lazy {
+        loadJavaCode(
+            "common/input/LimitOffsetListenableFuturePagingSource.java",
+            RoomPagingGuavaTypeNames.LIMIT_OFFSET_LISTENABLE_FUTURE_PAGING_SOURCE.toString()
+        )
+    }
+
     val COROUTINES_ROOM by lazy {
         loadJavaCode(
             "common/input/CoroutinesRoom.java",
diff --git a/room/room-paging-guava/build.gradle b/room/room-paging-guava/build.gradle
index 2e69703..4a073c9 100644
--- a/room/room-paging-guava/build.gradle
+++ b/room/room-paging-guava/build.gradle
@@ -39,7 +39,7 @@
     api(libs.kotlinStdlib)
     implementation(project(":room:room-paging"))
     implementation(project(":room:room-guava"))
-    implementation(projectOrArtifact(":paging:paging-guava"))
+    api("androidx.paging:paging-guava:3.1.1")
 
     androidTestImplementation(libs.truth)
     androidTestImplementation(libs.testExtJunitKtx)
diff --git a/testutils/testutils-common/src/main/java/androidx/testutils/FilteringExecutor.kt b/testutils/testutils-common/src/main/java/androidx/testutils/FilteringExecutor.kt
index 3c8f192..e981829 100644
--- a/testutils/testutils-common/src/main/java/androidx/testutils/FilteringExecutor.kt
+++ b/testutils/testutils-common/src/main/java/androidx/testutils/FilteringExecutor.kt
@@ -87,10 +87,10 @@
     }
 }
 
-suspend fun <T> withTestTimeout(block: suspend () -> T): T {
+suspend fun <T> withTestTimeout(duration: Long = 3, block: suspend () -> T): T {
     try {
         return withTimeout(
-            timeMillis = TimeUnit.SECONDS.toMillis(3)
+            timeMillis = TimeUnit.SECONDS.toMillis(duration)
         ) {
             block()
         }