Refactor room-paging integration test into parameterized test

Convert existing PagingSourceTest into two parameterized test classes that can
run tests against all variations of room paging implementations.

Enables other room-paging variations to leverage the same integration tests.

Test: ./gradlew :room:integration-tests:room-testapp-kotlin:cC
Change-Id: If5dc248c6f0079b6cb81bfb45b0e9f710d9746b6
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/PagingSourceTest.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/MultiTypedPagingSourceTest.kt
similarity index 71%
rename from room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/PagingSourceTest.kt
rename to room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/MultiTypedPagingSourceTest.kt
index 0457592..f80e077 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/PagingSourceTest.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/MultiTypedPagingSourceTest.kt
@@ -16,76 +16,57 @@
 
 package androidx.room.integration.kotlintestapp.test
 
-import androidx.paging.AsyncPagingDataDiffer
-import androidx.paging.ItemSnapshotList
-import androidx.paging.LoadState
 import androidx.paging.Pager
 import androidx.paging.PagingConfig
-import androidx.paging.PagingData
 import androidx.paging.PagingSource
-import androidx.recyclerview.widget.DiffUtil
-import androidx.recyclerview.widget.ListUpdateCallback
-import androidx.room.Dao
-import androidx.room.Database
-import androidx.room.Entity
-import androidx.room.Insert
 import androidx.room.InvalidationTracker
-import androidx.room.PrimaryKey
-import androidx.room.Query
-import androidx.room.RawQuery
 import androidx.room.Room
-import androidx.room.RoomDatabase
+import androidx.room.androidx.room.integration.kotlintestapp.testutil.ItemStore
+import androidx.room.androidx.room.integration.kotlintestapp.testutil.PagingEntityDao
+import androidx.room.androidx.room.integration.kotlintestapp.testutil.PagingDb
+import androidx.room.androidx.room.integration.kotlintestapp.testutil.PagingEntity
 import androidx.room.awaitPendingRefresh
 import androidx.sqlite.db.SimpleSQLiteQuery
-import androidx.sqlite.db.SupportSQLiteQuery
 import androidx.test.core.app.ApplicationProvider
-import androidx.test.espresso.base.MainThread
-import androidx.test.ext.junit.runners.AndroidJUnit4
 import androidx.test.filters.MediumTest
+import androidx.test.filters.SmallTest
 import androidx.testutils.FilteringExecutor
-import androidx.testutils.withTestTimeout
 import com.google.common.truth.Truth.assertThat
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.cancelAndJoin
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.collectLatest
-import kotlinx.coroutines.flow.distinctUntilChangedBy
-import kotlinx.coroutines.flow.drop
-import kotlinx.coroutines.flow.filter
-import kotlinx.coroutines.flow.filterNotNull
-import kotlinx.coroutines.flow.first
-import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.flow.mapLatest
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withContext
-import kotlinx.coroutines.yield
-import org.junit.After
-import org.junit.Before
-import org.junit.Test
-import org.junit.runner.RunWith
 import java.util.concurrent.Executors
 import kotlin.test.assertFailsWith
 import kotlin.test.assertFalse
 import kotlin.test.assertTrue
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.flow.collectLatest
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 
 /**
  * This test intentionally uses real dispatchers to mimic the real use case.
+ *
+ * Runs paging source integration tests against different variants of Room Paging implementations
  */
-@OptIn(ExperimentalCoroutinesApi::class)
-@RunWith(AndroidJUnit4::class)
+@RunWith(Parameterized::class)
 @MediumTest
-class PagingSourceTest {
+class MultiTypedPagingSourceTest(
+    private val pagingSourceFactory: (PagingEntityDao) -> PagingSource<Int, PagingEntity>,
+) {
     private lateinit var coroutineScope: CoroutineScope
-    private lateinit var db: Paging3Db
+    private lateinit var db: PagingDb
     private lateinit var itemStore: ItemStore
 
     // Multiple threads are necessary to prevent deadlock, since Room will acquire a thread to
     // dispatch on, when using the query / transaction dispatchers.
-    private val queryExecutor = FilteringExecutor(delegate = Executors.newFixedThreadPool(2))
+    private val queryExecutor = FilteringExecutor(Executors.newFixedThreadPool(2))
     private val mainThreadQueries = mutableListOf<Pair<String, String>>()
     private val pagingSources = mutableListOf<PagingSource<Int, PagingEntity>>()
 
@@ -93,27 +74,7 @@
     fun init() {
         coroutineScope = CoroutineScope(Dispatchers.Main)
         itemStore = ItemStore(coroutineScope)
-
-        val mainThread: Thread = runBlocking(Dispatchers.Main) {
-            Thread.currentThread()
-        }
-        db = Room.inMemoryDatabaseBuilder(
-            ApplicationProvider.getApplicationContext(),
-            Paging3Db::class.java
-        ).setQueryCallback(
-            { sqlQuery, _ ->
-                if (Thread.currentThread() === mainThread) {
-                    mainThreadQueries.add(
-                        sqlQuery to Throwable().stackTraceToString()
-                    )
-                }
-            },
-            {
-                // instantly execute the log callback so that we can check the thread.
-                it.run()
-            }
-        ).setQueryExecutor(queryExecutor)
-            .build()
+        db = buildAndReturnDb(queryExecutor, mainThreadQueries)
     }
 
     @After
@@ -165,43 +126,6 @@
     }
 
     @Test
-    fun loadEverythingRawQuery() {
-        // open db
-        val items = createItems(startId = 15, count = 50)
-        db.dao.insert(items)
-        val query = SimpleSQLiteQuery(
-            "SELECT * FROM PagingEntity ORDER BY id ASC"
-        )
-        val pager = Pager(
-            config = CONFIG,
-            pagingSourceFactory = { db.dao.loadItemsRaw(query) }
-        )
-        runTest(pager = pager) {
-            val initialLoad = itemStore.awaitInitialLoad()
-            assertThat(
-                initialLoad
-            ).containsExactlyElementsIn(
-                items.createExpected(
-                    fromIndex = 0,
-                    toIndex = CONFIG.initialLoadSize
-                )
-            )
-            // now access more items that should trigger loading more
-            withContext(Dispatchers.Main) {
-                itemStore.get(20)
-            }
-            assertThat(itemStore.awaitItem(20)).isEqualTo(items[20])
-            // now access to the end of the list, it should load everything as we disabled jumping
-            withContext(Dispatchers.Main) {
-                itemStore.get(items.size - 1)
-            }
-            assertThat(itemStore.awaitItem(items.size - 1)).isEqualTo(items.last())
-            assertThat(itemStore.peekItems()).isEqualTo(items)
-            assertThat(itemStore.currentGenerationId).isEqualTo(1)
-        }
-    }
-
-    @Test
     fun loadEverything_inReverse() {
         // open db
         val items = createItems(startId = 0, count = 100)
@@ -240,134 +164,6 @@
     }
 
     @Test
-    fun loadEverythingRawQuery_inReverse() {
-        // open db
-        val items = createItems(startId = 0, count = 100)
-        db.dao.insert(items)
-        val query = SimpleSQLiteQuery(
-            "SELECT * FROM PagingEntity ORDER BY id ASC"
-        )
-        val pager = Pager(
-            config = CONFIG,
-            initialKey = 98,
-            pagingSourceFactory = { db.dao.loadItemsRaw(query) }
-        )
-        runTest(pager) {
-            val initialLoad = itemStore.awaitInitialLoad()
-            assertThat(
-                initialLoad
-            ).containsExactlyElementsIn(
-                items.createExpected(
-                    // Paging 3 implementation loads starting from initial key
-                    fromIndex = 98,
-                    toIndex = 100
-                )
-            )
-            // now access more items that should trigger loading more
-            withContext(Dispatchers.Main) {
-                itemStore.get(40)
-            }
-            assertThat(itemStore.awaitItem(40)).isEqualTo(items[40])
-            // now access to the beginning of the list, it should load everything as we don't
-            // support jumping
-            withContext(Dispatchers.Main) {
-                itemStore.get(0)
-            }
-            assertThat(itemStore.awaitItem(0)).isEqualTo(items[0])
-            assertThat(itemStore.peekItems()).isEqualTo(items)
-            assertThat(itemStore.currentGenerationId).isEqualTo(1)
-        }
-    }
-
-    @Test
-    fun rawQuery_userSuppliedLimitOffset() {
-        val items = createItems(startId = 15, count = 70)
-        db.dao.insert(items)
-
-        val query = SimpleSQLiteQuery(
-            "SELECT * FROM PagingEntity ORDER BY id ASC LIMIT 30 OFFSET 5"
-        )
-        val pager = Pager(
-            config = CONFIG,
-            pagingSourceFactory = { db.dao.loadItemsRaw(query) }
-        )
-
-        runTest(pager = pager) {
-            val initialLoad = itemStore.awaitInitialLoad()
-            assertThat(
-                initialLoad
-            ).containsExactlyElementsIn(
-                // returns items 20 to 28 with 21 null place holders after
-                items.createBoundedExpected(
-                    fromIndex = 5,
-                    toIndex = 5 + CONFIG.initialLoadSize,
-                    toPlaceholderIndex = 35,
-                )
-            )
-            // now access more items that should trigger loading more
-            withContext(Dispatchers.Main) {
-                itemStore.get(15)
-            }
-            // item 15 is offset by 5 = 20
-            assertThat(itemStore.awaitItem(15)).isEqualTo(items[20])
-            // normally itemStore.get(50) is valid, but user-set LIMIT should bound item count to 30
-            // itemStore.get(50) should now become invalid
-            val expectedException = assertFailsWith<IndexOutOfBoundsException> {
-                withContext(Dispatchers.Main) {
-                    itemStore.get(50)
-                }
-            }
-            assertThat(expectedException.message).isEqualTo("Index: 50, Size: 30")
-            assertThat(itemStore.currentGenerationId).isEqualTo(1)
-        }
-    }
-
-    @Test
-    fun rawQuery_multipleArguments() {
-        val items = createItems(startId = 0, count = 80)
-        db.dao.insert(items)
-        val query = SimpleSQLiteQuery(
-            "SELECT * " +
-                "FROM PagingEntity " +
-                "WHERE id > 49 AND id < 76 " +
-                "ORDER BY id ASC " +
-                "LIMIT 20"
-        )
-        val pager = Pager(
-            config = CONFIG,
-            pagingSourceFactory = { db.dao.loadItemsRaw(query) }
-        )
-
-        runTest(pager = pager) {
-            val initialLoad = itemStore.awaitInitialLoad()
-            assertThat(
-                initialLoad
-            ).containsExactlyElementsIn(
-                // returns items 50 to 58 with 11 null place holders after
-                items.createBoundedExpected(
-                    fromIndex = 50,
-                    toIndex = 50 + CONFIG.initialLoadSize,
-                    toPlaceholderIndex = 70,
-                )
-            )
-            // now access more items that should trigger loading more
-            withContext(Dispatchers.Main) {
-                itemStore.get(15)
-            }
-            // item 15 is offset by 50 because of `WHERE id > 49` arg
-            assertThat(itemStore.awaitItem(15)).isEqualTo(items[65])
-            // normally itemStore.get(50) is valid, but user-set LIMIT should bound item count to 20
-            val expectedException = assertFailsWith<IndexOutOfBoundsException> {
-                withContext(Dispatchers.Main) {
-                    itemStore.get(50)
-                }
-            }
-            assertThat(expectedException.message).isEqualTo("Index: 50, Size: 20")
-            assertThat(itemStore.currentGenerationId).isEqualTo(1)
-        }
-    }
-
-    @Test
     fun keyTooLarge_returnLastPage() {
         val items = createItems(startId = 0, count = 50)
         db.dao.insert(items)
@@ -606,27 +402,6 @@
         }
     }
 
-    private fun simple_emptyStart_thenAddAnItem(
-        preOpenDb: Boolean
-    ) {
-        if (preOpenDb) {
-            // trigger db open
-            db.openHelper.writableDatabase
-        }
-
-        runTest {
-            itemStore.awaitGeneration(1)
-            itemStore.awaitInitialLoad()
-            assertThat(itemStore.peekItems()).isEmpty()
-
-            val entity = PagingEntity(id = 1, value = "foo")
-            db.dao.insert(entity)
-            itemStore.awaitGeneration(2)
-            itemStore.awaitInitialLoad()
-            assertThat(itemStore.peekItems()).containsExactly(entity)
-        }
-    }
-
     @Test
     fun appendWithDelayedInvalidation() {
         val items = createItems(startId = 0, count = 90)
@@ -714,245 +489,335 @@
         }
     }
 
+    private fun simple_emptyStart_thenAddAnItem(
+        preOpenDb: Boolean
+    ) {
+        if (preOpenDb) {
+            // trigger db open
+            db.openHelper.writableDatabase
+        }
+
+        runTest {
+            itemStore.awaitGeneration(1)
+            itemStore.awaitInitialLoad()
+            assertThat(itemStore.peekItems()).isEmpty()
+
+            val entity = PagingEntity(id = 1, value = "foo")
+            db.dao.insert(entity)
+            itemStore.awaitGeneration(2)
+            itemStore.awaitInitialLoad()
+            assertThat(itemStore.peekItems()).containsExactly(entity)
+        }
+    }
+
     private fun runTest(
         pager: Pager<Int, PagingEntity> =
             Pager(
                 config = CONFIG,
-                pagingSourceFactory = { db.dao.loadItems().also { pagingSources.add(it) } }
+                pagingSourceFactory = { pagingSourceFactory(db.dao).also { pagingSources.add(it) } }
             ),
         block: suspend () -> Unit
     ) {
-        val collection = coroutineScope.launch(Dispatchers.Main) {
-            pager.flow.collectLatest {
-                itemStore.collectFrom(it)
-            }
-        }
-        runBlocking {
-            try {
-                block()
-            } finally {
-                collection.cancelAndJoin()
-            }
-        }
+        runTestWithPager(coroutineScope, itemStore, pager, block)
     }
 
-    private fun createItems(
-        startId: Int,
-        count: Int
-    ): List<PagingEntity> {
-        return List(count) { pos ->
-            PagingEntity(
-                id = pos + startId
-            )
-        }
+    private companion object {
+        /**
+         * Runs this test class against all variants of Room Paging
+         */
+        @Parameterized.Parameters(name = "pagingSourceFactory={0}")
+        @JvmStatic
+        fun parameters() = listOf(PagingEntityDao::loadItems)
+    }
+}
+
+/**
+ * Tests the secondary constructor of Room Paging implementations via RawQuery
+ */
+@RunWith(Parameterized::class)
+@SmallTest
+class MultiTypedPagingSourceTestWithRawQuery(
+    private val pagingSourceFactoryRaw:
+        (PagingEntityDao, SimpleSQLiteQuery) -> PagingSource<Int, PagingEntity>
+) {
+    private lateinit var coroutineScope: CoroutineScope
+    private lateinit var db: PagingDb
+    private lateinit var itemStore: ItemStore
+
+    // Multiple threads are necessary to prevent deadlock, since Room will acquire a thread to
+    // dispatch on, when using the query / transaction dispatchers.
+    private val queryExecutor = FilteringExecutor(Executors.newFixedThreadPool(2))
+    private val mainThreadQueries = mutableListOf<Pair<String, String>>()
+
+    @Before
+    fun init() {
+        coroutineScope = CoroutineScope(Dispatchers.Main)
+        itemStore = ItemStore(coroutineScope)
+        db = buildAndReturnDb(queryExecutor, mainThreadQueries)
     }
 
-    /**
-     * Created an expected elements list from the current list.
-     */
-    private fun List<PagingEntity?>.createExpected(
-        fromIndex: Int,
-        toIndex: Int,
-    ): List<PagingEntity?> {
-        val result = mutableListOf<PagingEntity?>()
-        (0 until fromIndex).forEach { _ -> result.add(null) }
-        result.addAll(this.subList(fromIndex, toIndex))
-        (toIndex until size).forEach { _ -> result.add(null) }
-        return result
+    @After
+    fun tearDown() {
+        // Check no mainThread queries happened.
+        assertThat(mainThreadQueries).isEmpty()
+        coroutineScope.cancel()
     }
 
-    private fun List<PagingEntity?>.createBoundedExpected(
-        fromIndex: Int,
-        toIndex: Int,
-        toPlaceholderIndex: Int,
-    ): List<PagingEntity?> {
-        val result = mutableListOf<PagingEntity?>()
-        result.addAll(this.subList(fromIndex, toIndex))
-        (toIndex until toPlaceholderIndex).forEach { _ -> result.add(null) }
-        return result
-    }
-
-    @Database(
-        version = 1,
-        exportSchema = false,
-        entities = [PagingEntity::class]
-    )
-    abstract class Paging3Db : RoomDatabase() {
-        abstract val dao: Paging3Dao
-    }
-
-    @Entity
-    data class PagingEntity(
-        @PrimaryKey
-        val id: Int,
-        val value: String = "item_$id"
-    ) {
-        companion object {
-            val DIFF_CALLBACK = object : DiffUtil.ItemCallback<PagingEntity>() {
-                override fun areItemsTheSame(
-                    oldItem: PagingEntity,
-                    newItem: PagingEntity
-                ): Boolean {
-                    return oldItem.id == newItem.id
-                }
-
-                override fun areContentsTheSame(
-                    oldItem: PagingEntity,
-                    newItem: PagingEntity
-                ): Boolean {
-                    return oldItem == newItem
-                }
-            }
-        }
-    }
-
-    @Dao
-    interface Paging3Dao {
-        @Insert
-        fun insert(items: List<PagingEntity>)
-
-        @Insert
-        fun insert(vararg items: PagingEntity)
-
-        @Query("DELETE FROM PagingEntity WHERE id IN (:ids)")
-        fun deleteItems(ids: List<Int>)
-
-        @Query("SELECT * FROM PagingEntity ORDER BY id ASC")
-        fun loadItems(): PagingSource<Int, PagingEntity>
-
-        @RawQuery(observedEntities = [PagingEntity::class])
-        fun loadItemsRaw(query: SupportSQLiteQuery): PagingSource<Int, PagingEntity>
-    }
-
-    /**
-     * Our fake adapter that holds the items.
-     */
-    private class ItemStore(private val coroutineScope: CoroutineScope) {
-        // We get a new generation each time list changes. This is used to await certain events
-        // happening. Each generation have an id that maps to a paging generation.
-        // This value is modified only on the main thread.
-        private val generation = MutableStateFlow(Generation(0))
-
-        val currentGenerationId
-            get() = generation.value.id
-
-        val asyncDiffer = AsyncPagingDataDiffer(
-            diffCallback = PagingEntity.DIFF_CALLBACK,
-            updateCallback = object : ListUpdateCallback {
-                override fun onInserted(position: Int, count: Int) {
-                    onDataSetChanged(generation.value.id)
-                }
-
-                override fun onRemoved(position: Int, count: Int) {
-                    onDataSetChanged(generation.value.id)
-                }
-
-                override fun onMoved(fromPosition: Int, toPosition: Int) {
-                    onDataSetChanged(generation.value.id)
-                }
-
-                override fun onChanged(position: Int, count: Int, payload: Any?) {
-                    onDataSetChanged(generation.value.id)
-                }
-            }
+    @Test
+    fun loadEverythingRawQuery() {
+        // open db
+        val items = createItems(startId = 15, count = 50)
+        db.dao.insert(items)
+        val query = SimpleSQLiteQuery(
+            "SELECT * FROM PagingEntity ORDER BY id ASC"
         )
-
-        init {
-            coroutineScope.launch {
-                asyncDiffer.loadStateFlow
-                    .drop(1) // Ignore initial state
-                    .distinctUntilChangedBy { it.source.refresh }
-                    .map { it.source.refresh }
-                    .filter { it is LoadState.NotLoading }
-                    .collect {
-                        val current = generation.value
-                        generation.value = current.copy(
-                            initialLoadCompleted = true,
-                        )
-                    }
-            }
-        }
-
-        private fun incrementGeneration() {
-            val current = generation.value
-            generation.value = current.copy(
-                initialLoadCompleted = false,
-                id = current.id + 1,
+        runTest(query) {
+            val initialLoad = itemStore.awaitInitialLoad()
+            assertThat(
+                initialLoad
+            ).containsExactlyElementsIn(
+                items.createExpected(
+                    fromIndex = 0,
+                    toIndex = CONFIG.initialLoadSize
+                )
             )
-        }
-
-        fun peekItems() = (0 until asyncDiffer.itemCount).map {
-            asyncDiffer.peek(it)
-        }
-
-        fun get(index: Int): PagingEntity? {
-            return asyncDiffer.getItem(index)
-        }
-
-        suspend fun awaitItem(index: Int): PagingEntity = withTestTimeout {
-            generation.mapLatest {
-                asyncDiffer.peek(index)
-            }.filterNotNull().first()
-        }
-
-        suspend fun collectFrom(data: PagingData<PagingEntity>) {
-            incrementGeneration()
-            asyncDiffer.submitData(data)
-        }
-
-        @MainThread
-        private fun onDataSetChanged(id: Int) {
-            coroutineScope.launch(Dispatchers.Main) {
-                // deferring this
-                yield()
-                val curGen = generation.value
-                if (curGen.id == id) {
-                    generation.value = curGen.copy(
-                        initialLoadCompleted = true,
-                        changeCount = curGen.changeCount + 1
-                    )
-                }
-            }
-        }
-
-        suspend fun awaitInitialLoad(): ItemSnapshotList<PagingEntity> =
-            withTestTimeout {
-                 withContext(Dispatchers.Main) {
-                    generation.filter { it.initialLoadCompleted }.first()
-                    asyncDiffer.snapshot()
-                }
-            }
-
-        suspend fun awaitGeneration(id: Int) = withTestTimeout {
+            // now access more items that should trigger loading more
             withContext(Dispatchers.Main) {
-                generation.filter { it.id == id }.first()
+                itemStore.get(20)
             }
+            assertThat(itemStore.awaitItem(20)).isEqualTo(items[20])
+            // now access to the end of the list, it should load everything as we disabled jumping
+            withContext(Dispatchers.Main) {
+                itemStore.get(items.size - 1)
+            }
+            assertThat(itemStore.awaitItem(items.size - 1)).isEqualTo(items.last())
+            assertThat(itemStore.peekItems()).isEqualTo(items)
+            assertThat(itemStore.currentGenerationId).isEqualTo(1)
         }
     }
 
-    /**
-     * Holds some metadata about the backing paging list
-     */
-    private data class Generation(
-        /**
-         * Generation id, incremented each time data source is invalidated
-         */
-        val id: Int,
-        /**
-         * True when the data source completes its initial load
-         */
-        val initialLoadCompleted: Boolean = false,
-        /**
-         * Incremented each time we receive some update events.
-         */
-        val changeCount: Int = 0
-    )
+    @Test
+    fun loadEverythingRawQuery_inReverse() {
+        // open db
+        val items = createItems(startId = 0, count = 100)
+        db.dao.insert(items)
+        val query = SimpleSQLiteQuery(
+            "SELECT * FROM PagingEntity ORDER BY id ASC"
+        )
+        val pager = Pager(config = CONFIG, initialKey = 98) {
+            pagingSourceFactoryRaw(db.dao, query)
+        }
+        runTest(query, pager) {
+            val initialLoad = itemStore.awaitInitialLoad()
+            assertThat(
+                initialLoad
+            ).containsExactlyElementsIn(
+                items.createExpected(
+                    // Paging 3 implementation loads starting from initial key
+                    fromIndex = 98,
+                    toIndex = 100
+                )
+            )
+            // now access more items that should trigger loading more
+            withContext(Dispatchers.Main) {
+                itemStore.get(40)
+            }
+            assertThat(itemStore.awaitItem(40)).isEqualTo(items[40])
+            // now access to the beginning of the list, it should load everything as we don't
+            // support jumping
+            withContext(Dispatchers.Main) {
+                itemStore.get(0)
+            }
+            assertThat(itemStore.awaitItem(0)).isEqualTo(items[0])
+            assertThat(itemStore.peekItems()).isEqualTo(items)
+            assertThat(itemStore.currentGenerationId).isEqualTo(1)
+        }
+    }
 
-    companion object {
-        private val CONFIG = PagingConfig(
-            pageSize = 3,
-            initialLoadSize = 9,
-            enablePlaceholders = true,
+    @Test
+    fun rawQuery_userSuppliedLimitOffset() {
+        val items = createItems(startId = 15, count = 70)
+        db.dao.insert(items)
+
+        val query = SimpleSQLiteQuery(
+            "SELECT * FROM PagingEntity ORDER BY id ASC LIMIT 30 OFFSET 5"
+        )
+        runTest(query) {
+            val initialLoad = itemStore.awaitInitialLoad()
+            assertThat(
+                initialLoad
+            ).containsExactlyElementsIn(
+                // returns items 20 to 28 with 21 null place holders after
+                items.createBoundedExpected(
+                    fromIndex = 5,
+                    toIndex = 5 + CONFIG.initialLoadSize,
+                    toPlaceholderIndex = 35,
+                )
+            )
+            // now access more items that should trigger loading more
+            withContext(Dispatchers.Main) {
+                itemStore.get(15)
+            }
+            // item 15 is offset by 5 = 20
+            assertThat(itemStore.awaitItem(15)).isEqualTo(items[20])
+            // normally itemStore.get(50) is valid, but user-set LIMIT should bound item count to 30
+            // itemStore.get(50) should now become invalid
+            val expectedException = assertFailsWith<IndexOutOfBoundsException> {
+                withContext(Dispatchers.Main) {
+                    itemStore.get(50)
+                }
+            }
+            assertThat(expectedException.message).isEqualTo("Index: 50, Size: 30")
+            assertThat(itemStore.currentGenerationId).isEqualTo(1)
+        }
+    }
+
+    @Test
+    fun rawQuery_multipleArguments() {
+        val items = createItems(startId = 0, count = 80)
+        db.dao.insert(items)
+        val query = SimpleSQLiteQuery(
+            "SELECT * " +
+                "FROM PagingEntity " +
+                "WHERE id > 49 AND id < 76 " +
+                "ORDER BY id ASC " +
+                "LIMIT 20"
+        )
+        runTest(query) {
+            val initialLoad = itemStore.awaitInitialLoad()
+            assertThat(
+                initialLoad
+            ).containsExactlyElementsIn(
+                // returns items 50 to 58 with 11 null place holders after
+                items.createBoundedExpected(
+                    fromIndex = 50,
+                    toIndex = 50 + CONFIG.initialLoadSize,
+                    toPlaceholderIndex = 70,
+                )
+            )
+            // now access more items that should trigger loading more
+            withContext(Dispatchers.Main) {
+                itemStore.get(15)
+            }
+            // item 15 is offset by 50 because of `WHERE id > 49` arg
+            assertThat(itemStore.awaitItem(15)).isEqualTo(items[65])
+            // normally itemStore.get(50) is valid, but user-set LIMIT should bound item count to 20
+            val expectedException = assertFailsWith<IndexOutOfBoundsException> {
+                withContext(Dispatchers.Main) {
+                    itemStore.get(50)
+                }
+            }
+            assertThat(expectedException.message).isEqualTo("Index: 50, Size: 20")
+            assertThat(itemStore.currentGenerationId).isEqualTo(1)
+        }
+    }
+
+    private fun runTest(
+        query: SimpleSQLiteQuery,
+        pager: Pager<Int, PagingEntity> =
+            Pager(
+                config = CONFIG,
+                pagingSourceFactory = { pagingSourceFactoryRaw(db.dao, query) }
+            ),
+        block: suspend () -> Unit
+    ) {
+        runTestWithPager(coroutineScope, itemStore, pager, block)
+    }
+
+    private companion object {
+        /**
+         * Runs this test class against all variants of Room Paging
+         */
+        @Parameterized.Parameters(name = "pagingSourceFactory={0}")
+        @JvmStatic
+        fun parameters() = listOf(PagingEntityDao::loadItemsRaw)
+    }
+}
+
+private fun buildAndReturnDb(
+    queryExecutor: FilteringExecutor,
+    mainThreadQueries: MutableList<Pair<String, String>>
+): PagingDb {
+    val mainThread: Thread = runBlocking(Dispatchers.Main) {
+        Thread.currentThread()
+    }
+    return Room.inMemoryDatabaseBuilder(
+        ApplicationProvider.getApplicationContext(),
+        PagingDb::class.java
+    ).setQueryCallback(
+        { sqlQuery, _ ->
+            if (Thread.currentThread() === mainThread) {
+                mainThreadQueries.add(
+                    sqlQuery to Throwable().stackTraceToString()
+                )
+            }
+        },
+        {
+            // instantly execute the log callback so that we can check the thread.
+            it.run()
+        }
+    ).setQueryExecutor(queryExecutor)
+        .build()
+}
+
+private fun runTestWithPager(
+    coroutineScope: CoroutineScope,
+    itemStore: ItemStore,
+    pager: Pager<Int, PagingEntity>,
+    block: suspend () -> Unit
+) {
+    val collection = coroutineScope.launch(Dispatchers.Main) {
+        pager.flow.collectLatest {
+            itemStore.collectFrom(it)
+        }
+    }
+    runBlocking {
+        try {
+            block()
+        } finally {
+            collection.cancelAndJoin()
+        }
+    }
+}
+
+internal fun createItems(
+    startId: Int,
+    count: Int
+): List<PagingEntity> {
+    return List(count) { pos ->
+        PagingEntity(
+            id = pos + startId
         )
     }
-}
\ No newline at end of file
+}
+
+/**
+ * Created an expected elements list from the current list.
+ */
+internal fun List<PagingEntity>.createExpected(
+    fromIndex: Int,
+    toIndex: Int,
+): List<PagingEntity?> {
+    val result = mutableListOf<PagingEntity?>()
+    (0 until fromIndex).forEach { _ -> result.add(null) }
+    result.addAll(this.subList(fromIndex, toIndex))
+    (toIndex until size).forEach { _ -> result.add(null) }
+    return result
+}
+
+internal fun List<PagingEntity>.createBoundedExpected(
+    fromIndex: Int,
+    toIndex: Int,
+    toPlaceholderIndex: Int,
+): List<PagingEntity?> {
+    val result = mutableListOf<PagingEntity?>()
+    result.addAll(this.subList(fromIndex, toIndex))
+    (toIndex until toPlaceholderIndex).forEach { _ -> result.add(null) }
+    return result
+}
+
+internal val CONFIG = PagingConfig(
+    pageSize = 3,
+    initialLoadSize = 9,
+    enablePlaceholders = true,
+)
\ No newline at end of file
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/ItemStore.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/ItemStore.kt
new file mode 100644
index 0000000..0cacc49
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/ItemStore.kt
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.testutil
+
+import androidx.paging.AsyncPagingDataDiffer
+import androidx.paging.ItemSnapshotList
+import androidx.paging.LoadState
+import androidx.paging.PagingData
+import androidx.recyclerview.widget.ListUpdateCallback
+import androidx.test.espresso.base.MainThread
+import androidx.testutils.withTestTimeout
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.distinctUntilChangedBy
+import kotlinx.coroutines.flow.filter
+import kotlinx.coroutines.flow.filterNotNull
+import kotlinx.coroutines.flow.first
+import kotlinx.coroutines.flow.map
+import kotlinx.coroutines.flow.mapLatest
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.yield
+
+/**
+ * An item store that contains a mock differ for multi-generational pagination
+ */
+class ItemStore(private val coroutineScope: CoroutineScope) {
+    // We get a new generation each time list changes. This is used to await certain events
+    // happening. Each generation have an id that maps to a paging generation.
+    // This value is modified only on the main thread.
+    private val generation = MutableStateFlow(Generation(0))
+
+    val currentGenerationId
+        get() = generation.value.id
+
+    private val asyncDiffer = AsyncPagingDataDiffer(
+        diffCallback = PagingEntity.DIFF_CALLBACK,
+        updateCallback = object : ListUpdateCallback {
+            override fun onInserted(position: Int, count: Int) {
+                onDataSetChanged(generation.value.id)
+            }
+
+            override fun onRemoved(position: Int, count: Int) {
+                onDataSetChanged(generation.value.id)
+            }
+
+            override fun onMoved(fromPosition: Int, toPosition: Int) {
+                onDataSetChanged(generation.value.id)
+            }
+
+            override fun onChanged(position: Int, count: Int, payload: Any?) {
+                onDataSetChanged(generation.value.id)
+            }
+        }
+    )
+
+    init {
+        coroutineScope.launch {
+            asyncDiffer.loadStateFlow
+                .distinctUntilChangedBy { it.source.refresh }
+                .map { it.source.refresh }
+                .filter { it is LoadState.NotLoading }
+                .collect {
+                    val current = generation.value
+                    generation.value = current.copy(
+                        initialLoadCompleted = true,
+                    )
+                }
+        }
+    }
+
+    private fun incrementGeneration() {
+        val current = generation.value
+        generation.value = current.copy(
+            initialLoadCompleted = false,
+            id = current.id + 1,
+        )
+    }
+
+    fun peekItems() = (0 until asyncDiffer.itemCount).map {
+        asyncDiffer.peek(it)
+    }
+
+    fun get(index: Int): PagingEntity? {
+        return asyncDiffer.getItem(index)
+    }
+
+    @OptIn(ExperimentalCoroutinesApi::class)
+    suspend fun awaitItem(index: Int): PagingEntity = withTestTimeout {
+        generation.mapLatest {
+            asyncDiffer.peek(index)
+        }.filterNotNull().first()
+    }
+
+    suspend fun collectFrom(data: PagingData<PagingEntity>) {
+        incrementGeneration()
+        asyncDiffer.submitData(data)
+    }
+
+    @MainThread
+    private fun onDataSetChanged(id: Int) {
+        coroutineScope.launch(Dispatchers.Main) {
+            // deferring this
+            yield()
+            val curGen = generation.value
+            if (curGen.id == id) {
+                generation.value = curGen.copy(
+                    initialLoadCompleted = true,
+                    changeCount = curGen.changeCount + 1
+                )
+            }
+        }
+    }
+
+    suspend fun awaitInitialLoad(): ItemSnapshotList<PagingEntity> =
+        withTestTimeout {
+            withContext(Dispatchers.Main) {
+                generation.filter { it.initialLoadCompleted }.first()
+                asyncDiffer.snapshot()
+            }
+        }
+
+    suspend fun awaitGeneration(id: Int) = withTestTimeout {
+        withContext(Dispatchers.Main) {
+            generation.filter { it.id == id }.first()
+        }
+    }
+}
+
+/**
+ * Holds some metadata about the backing paging list
+ */
+data class Generation(
+    /**
+     * Generation id, incremented each time data source is invalidated
+     */
+    val id: Int,
+    /**
+     * True when the data source completes its initial load
+     */
+    val initialLoadCompleted: Boolean = false,
+    /**
+     * Incremented each time we receive some update events.
+     */
+    val changeCount: Int = 0
+)
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingDb.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingDb.kt
new file mode 100644
index 0000000..2ab2da8
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingDb.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.testutil
+
+import androidx.room.Database
+import androidx.room.RoomDatabase
+
+@Database(
+    version = 1,
+    exportSchema = false,
+    entities = [PagingEntity::class]
+)
+abstract class PagingDb : RoomDatabase() {
+    abstract val dao: PagingEntityDao
+}
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntity.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntity.kt
new file mode 100644
index 0000000..a75ccda
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntity.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.testutil
+
+import androidx.recyclerview.widget.DiffUtil
+import androidx.room.Entity
+import androidx.room.PrimaryKey
+
+@Entity
+data class PagingEntity(
+    @PrimaryKey
+    val id: Int,
+    val value: String = "item_$id"
+) {
+    companion object {
+        val DIFF_CALLBACK = object : DiffUtil.ItemCallback<PagingEntity>() {
+            override fun areItemsTheSame(
+                oldItem: PagingEntity,
+                newItem: PagingEntity
+            ): Boolean {
+                return oldItem.id == newItem.id
+            }
+
+            override fun areContentsTheSame(
+                oldItem: PagingEntity,
+                newItem: PagingEntity
+            ): Boolean {
+                return oldItem == newItem
+            }
+        }
+    }
+}
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntityDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntityDao.kt
new file mode 100644
index 0000000..658d150
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/testutil/PagingEntityDao.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.testutil
+
+import androidx.paging.PagingSource
+import androidx.room.Dao
+import androidx.room.Insert
+import androidx.room.Query
+import androidx.room.RawQuery
+import androidx.sqlite.db.SupportSQLiteQuery
+
+@Dao
+interface PagingEntityDao {
+    @Insert
+    fun insert(items: List<PagingEntity>)
+
+    @Insert
+    fun insert(vararg items: PagingEntity)
+
+    @Query("DELETE FROM PagingEntity WHERE id IN (:ids)")
+    fun deleteItems(ids: List<Int>)
+
+    @Query("SELECT * FROM PagingEntity ORDER BY id ASC")
+    fun loadItems(): PagingSource<Int, PagingEntity>
+
+    @RawQuery(observedEntities = [PagingEntity::class])
+    fun loadItemsRaw(query: SupportSQLiteQuery): PagingSource<Int, PagingEntity>
+}