Implement rxjava2 LimitOffsetRxPagingSource

Test: ./gradlew room:room-paging-rxjava2:cC
Bug: 203666906
Change-Id: Iefd883d45b18b032bb3a29d51209fc7de169c1aa
diff --git a/room/room-paging-rxjava2/build.gradle b/room/room-paging-rxjava2/build.gradle
index 0049b86..4e57d8d 100644
--- a/room/room-paging-rxjava2/build.gradle
+++ b/room/room-paging-rxjava2/build.gradle
@@ -22,11 +22,33 @@
     id("AndroidXPlugin")
     id("com.android.library")
     id("org.jetbrains.kotlin.android")
+    id("com.google.devtools.ksp")
 }
 
 dependencies {
     api(libs.kotlinStdlib)
-    // Add dependencies here
+    api('androidx.paging:paging-rxjava2:3.1.1')
+    implementation(project(":room:room-paging"))
+    implementation(project(":room:room-rxjava2"))
+
+    androidTestImplementation(libs.truth)
+    androidTestImplementation(libs.testExtJunitKtx)
+    androidTestImplementation(libs.testRunner)
+    androidTestImplementation(libs.kotlinTestJunit) //
+    androidTestImplementation(libs.kotlinCoroutinesTest)
+    androidTestImplementation(libs.kotlinCoroutinesRx2)
+    androidTestImplementation("androidx.arch.core:core-testing:2.0.1")
+    androidTestImplementation(project(":internal-testutils-common"))
+    kspAndroidTest(
+            project(path: ":room:room-compiler", configuration: "shadowAndImplementation")
+    )
+}
+
+// Allow usage of Kotlin's @OptIn.
+tasks.withType(KotlinCompile).configureEach {
+    kotlinOptions {
+        freeCompilerArgs += ["-opt-in=kotlin.RequiresOptIn"]
+    }
 }
 
 androidx {
diff --git a/room/room-paging-rxjava2/src/androidTest/kotlin/androidx/room/paging/rxjava2/LimitOffsetRxPagingSourceTest.kt b/room/room-paging-rxjava2/src/androidTest/kotlin/androidx/room/paging/rxjava2/LimitOffsetRxPagingSourceTest.kt
new file mode 100644
index 0000000..47c7485
--- /dev/null
+++ b/room/room-paging-rxjava2/src/androidTest/kotlin/androidx/room/paging/rxjava2/LimitOffsetRxPagingSourceTest.kt
@@ -0,0 +1,695 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.paging.rxjava2
+
+import android.database.Cursor
+import androidx.arch.core.executor.testing.CountingTaskExecutorRule
+import androidx.paging.LoadType
+import androidx.paging.PagingConfig
+import androidx.paging.PagingSource.LoadParams
+import androidx.paging.PagingSource.LoadResult
+import androidx.room.Dao
+import androidx.room.Database
+import androidx.room.Entity
+import androidx.room.Insert
+import androidx.room.PrimaryKey
+import androidx.room.Query
+import androidx.room.Room
+import androidx.room.RoomDatabase
+import androidx.room.RoomSQLiteQuery
+import androidx.room.paging.util.ThreadSafeInvalidationObserver
+import androidx.room.util.getColumnIndexOrThrow
+import androidx.sqlite.db.SimpleSQLiteQuery
+import androidx.test.core.app.ApplicationProvider
+import androidx.test.ext.junit.runners.AndroidJUnit4
+import androidx.test.filters.MediumTest
+import androidx.testutils.TestExecutor
+import androidx.testutils.withTestTimeout
+import com.google.common.truth.Truth
+import com.google.common.truth.Truth.assertThat
+import io.reactivex.Single
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.test.assertFailsWith
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.rx2.await
+import kotlinx.coroutines.test.advanceUntilIdle
+import kotlinx.coroutines.test.runTest
+import org.junit.After
+import org.junit.Rule
+import org.junit.Test
+import org.junit.runner.RunWith
+
+private const val tableName: String = "TestItem"
+
+@OptIn(ExperimentalCoroutinesApi::class)
+@RunWith(AndroidJUnit4::class)
+@MediumTest
+class LimitOffsetRxPagingSourceTest {
+
+    @JvmField
+    @Rule
+    val countingTaskExecutorRule = CountingTaskExecutorRule()
+
+    @After
+    fun tearDown() {
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+        assertTrue(countingTaskExecutorRule.isIdle)
+    }
+
+    @Test
+    fun initialLoad_empty() = setupAndRun { db ->
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.refresh()
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).isEmpty()
+    }
+
+    @Test
+    fun initialLoad() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.refresh()
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(0, 15)
+        )
+    }
+
+    @Test
+    fun simpleAppend() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.append(key = 15)
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(15, 20)
+        )
+    }
+
+    @Test
+    fun simplePrepend() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.prepend(key = 20)
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(15, 20)
+        )
+    }
+
+    @Test
+    fun initialLoad_invalidationTracker_isRegistered() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.refresh()
+        // run loadSingle to register InvalidationTracker
+        single.await()
+
+        assertTrue(pagingSource.observer.privateRegisteredState().get())
+    }
+
+    @Test
+    fun nonInitialLoad_invalidationTracker_isRegistered() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.prepend(key = 20)
+        // run loadSingle to register InvalidationTracker
+        single.await()
+
+        assertTrue(pagingSource.observer.privateRegisteredState().get())
+    }
+
+    @Test
+    fun refresh_singleImmediatelyReturn() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.refresh()
+
+        var observer = single.test()
+        observer.assertNotComplete()
+
+        // let room complete its tasks
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+
+        val result = observer.values().first() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(0, 15)
+        )
+        observer.assertComplete()
+        observer.assertNoErrors()
+        observer.dispose()
+    }
+
+    @Test
+    fun append_singleImmediatelyReturn() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.append(key = 10)
+
+        var observer = single.test()
+        observer.assertNotComplete()
+
+        // let room complete its tasks
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+
+        val result = observer.values().first() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(10, 15)
+        )
+        observer.assertComplete()
+        observer.assertNoErrors()
+        observer.dispose()
+    }
+
+    @Test
+    fun prepend_singleImmediatelyReturn() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.prepend(key = 15)
+
+        var observer = single.test()
+        observer.assertNotComplete()
+
+        // let room complete its tasks
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+
+        val result = observer.values().first() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(10, 15)
+        )
+        observer.assertComplete()
+        observer.assertNoErrors()
+        observer.dispose()
+    }
+
+    @Test
+    fun dbUpdate_invalidatesPagingSource() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.append(key = 50)
+
+        // trigger load to register observer
+        single.await()
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+
+        // make sure observer is registered and pagingSource is still valid at this point
+        assertTrue(pagingSource.observer.privateRegisteredState().get())
+        assertFalse(pagingSource.invalid)
+
+        // this should cause refreshVersionsSync to invalidate pagingSource
+        db.dao.addItem(TestItem(113))
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+
+        assertTrue(pagingSource.invalid)
+
+        val single2 = pagingSource.append(key = 55)
+        val result = single2.await()
+        Truth.assertThat(result).isInstanceOf(LoadResult.Invalid::class.java)
+    }
+
+    @Test
+    fun append_returnsInvalid() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.append(key = 50)
+
+        // this should cause load to return LoadResult.Invalid
+        pagingSource.invalidate()
+        assertTrue(pagingSource.invalid)
+
+        // trigger load
+        var result = single.await()
+
+        // let room complete its tasks
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+        Truth.assertThat(result).isInstanceOf(LoadResult.Invalid::class.java)
+    }
+
+    @Test
+    fun prepend_returnsInvalid() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.prepend(key = 50)
+
+        // this should cause load to return LoadResult.Invalid
+        pagingSource.invalidate()
+        assertTrue(pagingSource.invalid)
+
+        // trigger load
+        var observer = single.test()
+
+        // let room complete its tasks
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+        val result = observer.values().first()
+        Truth.assertThat(result).isInstanceOf(LoadResult.Invalid::class.java)
+        observer.dispose()
+    }
+
+    @Test
+    fun refresh_consecutively() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        val single = pagingSource.refresh()
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(0, 15)
+        )
+
+        val pagingSource2 = LimitOffsetRxPagingSourceImpl(db)
+        val single2 = pagingSource2.refresh()
+        val result2 = single2.await() as LoadResult.Page
+        Truth.assertThat(result2.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(0, 15)
+        )
+    }
+
+    @Test
+    fun append_consecutively() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+
+        val single = pagingSource.append(key = 15)
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(15, 20)
+        )
+
+        val single2 = pagingSource.append(key = 40)
+        val result2 = single2.await() as LoadResult.Page
+        Truth.assertThat(result2.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(40, 45)
+        )
+
+        val single3 = pagingSource.append(key = 45) // sequential append
+        val result3 = single3.await() as LoadResult.Page
+        Truth.assertThat(result3.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(45, 50)
+        )
+    }
+
+    @Test
+    fun prepend_consecutively() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+
+        val single = pagingSource.prepend(key = 15)
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(10, 15)
+        )
+
+        val single2 = pagingSource.prepend(key = 40)
+        val result2 = single2.await() as LoadResult.Page
+        Truth.assertThat(result2.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(35, 40)
+        )
+
+        val single3 = pagingSource.prepend(key = 45) // sequential prepend
+        val result3 = single3.await() as LoadResult.Page
+        Truth.assertThat(result3.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(40, 45)
+        )
+    }
+
+    @Test
+    fun refreshAgain_afterDispose() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+
+        var isDisposed = false
+        val single = pagingSource.refresh()
+            // dispose right after subscription
+            .doOnSubscribe { disposable -> disposable.dispose() }
+            .doOnSuccess { Truth.assertWithMessage("The single should not succeed").fail() }
+            .doOnError { Truth.assertWithMessage("The single should not error out").fail() }
+            .doOnDispose { isDisposed = true }
+
+        assertFailsWith<AssertionError> { withTestTimeout(2) { single.await() } }
+        assertTrue(isDisposed)
+        assertFalse(pagingSource.invalid)
+
+        // using same paging source
+        val single2 = pagingSource.refresh()
+        val result2 = single2.await() as LoadResult.Page
+        Truth.assertThat(result2.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(0, 15)
+        )
+    }
+
+    @Test
+    fun appendAgain_afterDispose() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+
+        var isDisposed = false
+        val single = pagingSource.append(key = 15)
+            // dispose right after subscription
+            .doOnSubscribe { disposable -> disposable.dispose() }
+            .doOnSuccess { Truth.assertWithMessage("The single should not succeed").fail() }
+            .doOnError { Truth.assertWithMessage("The single should not error out").fail() }
+            .doOnDispose { isDisposed = true }
+
+        assertFailsWith<AssertionError> { withTestTimeout(2) { single.await() } }
+        assertTrue(isDisposed)
+        assertFalse(pagingSource.invalid)
+
+        // try with same key same paging source
+        val single2 = pagingSource.append(key = 15)
+        val result2 = single2.await() as LoadResult.Page
+        Truth.assertThat(result2.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(15, 20)
+        )
+    }
+
+    @Test
+    fun prependAgain_afterDispose() = setupAndRun { db ->
+        db.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+
+        var isDisposed = false
+        val single = pagingSource.prepend(key = 40)
+            // dispose right after subscription
+            .doOnSubscribe { disposable -> disposable.dispose() }
+            .doOnSuccess { Truth.assertWithMessage("The single should not succeed").fail() }
+            .doOnError { Truth.assertWithMessage("The single should not error out").fail() }
+            .doOnDispose { isDisposed = true }
+
+        assertFailsWith<AssertionError> { withTestTimeout(2) { single.await() } }
+        assertTrue(isDisposed)
+        assertFalse(pagingSource.invalid)
+
+        // try with same key same paging source
+        val single2 = pagingSource.prepend(key = 40)
+        val result2 = single2.await() as LoadResult.Page
+        Truth.assertThat(result2.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(35, 40)
+        )
+    }
+
+    @Test
+    fun assert_usesQueryExecutor() {
+        val queryExecutor = TestExecutor()
+        val testDb = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(),
+            LimitOffsetTestDb::class.java
+        ).setQueryExecutor(queryExecutor)
+            .build()
+
+        testDb.dao.addAllItems(ITEMS_LIST)
+        queryExecutor.executeAll() // add items first
+
+        runTest {
+            assertFalse(queryExecutor.executeAll()) // make sure its idle now
+            val pagingSource = LimitOffsetRxPagingSourceImpl(testDb)
+            val single = pagingSource.append(key = 15)
+
+            var resultReceived = false
+            // subscribe to single
+            launch {
+                val result = single.await() as LoadResult.Page
+                assertThat(result.data).containsExactlyElementsIn(
+                    ITEMS_LIST.subList(15, 20)
+                )
+                resultReceived = true
+            }
+
+            advanceUntilIdle()
+
+            // execute Single's await()
+            assertTrue(queryExecutor.executeAll())
+
+            advanceUntilIdle()
+
+            assertTrue(resultReceived)
+            assertFalse(queryExecutor.executeAll())
+        }
+        testDb.close()
+    }
+
+    @Test
+    fun cancelledCoroutine_disposesSingle() {
+        val testDb = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(),
+            LimitOffsetTestDb::class.java
+        ).build()
+
+        testDb.dao.addAllItems(ITEMS_LIST)
+        val pagingSource = LimitOffsetRxPagingSourceImpl(testDb)
+
+        runBlocking {
+            var isDisposed = false
+            val single = pagingSource.refresh()
+                .doOnSubscribe { Thread.sleep(300) } // subscribe but delay the load
+                .doOnSuccess { Truth.assertWithMessage("The single should not succeed").fail() }
+                .doOnError { Truth.assertWithMessage("The single should not error out").fail() }
+                .doOnDispose { isDisposed = true }
+
+            val job = launch { single.await() }
+            job.start()
+            delay(100) // start single.await() to subscribe but don't let it complete
+            job.cancelAndJoin()
+
+            assertTrue(job.isCancelled)
+            assertTrue(isDisposed)
+        }
+
+        // need to drain before closing testDb or else will throw SQLiteConnectionPool exception
+        countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+        testDb.close()
+    }
+
+    @Test
+    fun refresh_secondaryConstructor() = setupAndRun { db ->
+        val pagingSource = object : LimitOffsetRxPagingSource<TestItem>(
+            db = db,
+            supportSQLiteQuery = SimpleSQLiteQuery("SELECT * FROM $tableName ORDER BY id ASC")
+        ) {
+            override fun convertRows(cursor: Cursor): List<TestItem> {
+                return convertRowsHelper(cursor)
+            }
+        }
+
+        db.dao.addAllItems(ITEMS_LIST)
+        val single = pagingSource.refresh()
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(0, 15)
+        )
+    }
+
+    @Test
+    fun append_secondaryConstructor() = setupAndRun { db ->
+        val pagingSource = object : LimitOffsetRxPagingSource<TestItem>(
+            db = db,
+            supportSQLiteQuery = SimpleSQLiteQuery("SELECT * FROM $tableName ORDER BY id ASC")
+        ) {
+            override fun convertRows(cursor: Cursor): List<TestItem> {
+                return convertRowsHelper(cursor)
+            }
+        }
+
+        db.dao.addAllItems(ITEMS_LIST)
+        val single = pagingSource.append(key = 15)
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(15, 20)
+        )
+    }
+
+    @Test
+    fun prepend_secondaryConstructor() = setupAndRun { db ->
+        val pagingSource = object : LimitOffsetRxPagingSource<TestItem>(
+            db = db,
+            supportSQLiteQuery = SimpleSQLiteQuery("SELECT * FROM $tableName ORDER BY id ASC")
+        ) {
+            override fun convertRows(cursor: Cursor): List<TestItem> {
+                return convertRowsHelper(cursor)
+            }
+        }
+
+        db.dao.addAllItems(ITEMS_LIST)
+        val single = pagingSource.prepend(key = 15)
+        val result = single.await() as LoadResult.Page
+        assertThat(result.data).containsExactlyElementsIn(
+            ITEMS_LIST.subList(10, 15)
+        )
+    }
+
+    @Test
+    fun jumping_enabled() = setupAndRun { db ->
+        val pagingSource = LimitOffsetRxPagingSourceImpl(db)
+        assertTrue(pagingSource.jumpingSupported)
+    }
+
+    private fun setupAndRun(
+        test: suspend (LimitOffsetTestDb) -> Unit
+    ) {
+        val db = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(),
+            LimitOffsetTestDb::class.java
+        ).build()
+
+        runTest {
+            test(db)
+        }
+       db.close()
+    }
+}
+
+private fun LimitOffsetRxPagingSource<TestItem>.refresh(
+    key: Int? = null,
+): Single<LoadResult<Int, TestItem>> {
+    return loadSingle(
+        createLoadParam(
+            loadType = LoadType.REFRESH,
+            key = key,
+        )
+    )
+}
+
+private fun LimitOffsetRxPagingSource<TestItem>.append(
+    key: Int? = -1,
+): Single<LoadResult<Int, TestItem>> {
+    itemCount.set(ITEMS_LIST.size) // to bypass check for initial load
+    return loadSingle(
+        createLoadParam(
+            loadType = LoadType.APPEND,
+            key = key,
+        )
+    )
+}
+
+private fun LimitOffsetRxPagingSource<TestItem>.prepend(
+    key: Int? = -1,
+): Single<LoadResult<Int, TestItem>> {
+    itemCount.set(ITEMS_LIST.size) // to bypass check for initial load
+    return loadSingle(
+        createLoadParam(
+            loadType = LoadType.PREPEND,
+            key = key,
+        )
+    )
+}
+
+private class LimitOffsetRxPagingSourceImpl(
+    db: RoomDatabase,
+    query: String = "SELECT * FROM $tableName ORDER BY id ASC",
+) : LimitOffsetRxPagingSource<TestItem>(
+    db = db,
+    sourceQuery = RoomSQLiteQuery.acquire(query, 0),
+    tables = arrayOf(tableName)
+) {
+    override fun convertRows(cursor: Cursor): List<TestItem> = convertRowsHelper(cursor)
+}
+
+private fun convertRowsHelper(cursor: Cursor): List<TestItem> {
+    val cursorIndexOfId = getColumnIndexOrThrow(cursor, "id")
+    val data = mutableListOf<TestItem>()
+    while (cursor.moveToNext()) {
+        val tmpId = cursor.getInt(cursorIndexOfId)
+        data.add(TestItem(tmpId))
+    }
+    return data
+}
+
+private val CONFIG = PagingConfig(
+    pageSize = 5,
+    enablePlaceholders = true,
+    initialLoadSize = 15,
+)
+
+private val ITEMS_LIST = createItemsForDb(0, 100)
+
+private fun createItemsForDb(startId: Int, count: Int): List<TestItem> {
+    return List(count) {
+        TestItem(
+            id = it + startId,
+        )
+    }
+}
+
+private fun createLoadParam(
+    loadType: LoadType,
+    key: Int? = null,
+    initialLoadSize: Int = CONFIG.initialLoadSize,
+    pageSize: Int = CONFIG.pageSize,
+    placeholdersEnabled: Boolean = CONFIG.enablePlaceholders
+): LoadParams<Int> {
+    return when (loadType) {
+        LoadType.REFRESH -> {
+            LoadParams.Refresh(
+                key = key,
+                loadSize = initialLoadSize,
+                placeholdersEnabled = placeholdersEnabled
+            )
+        }
+        LoadType.APPEND -> {
+            LoadParams.Append(
+                key = key ?: -1,
+                loadSize = pageSize,
+                placeholdersEnabled = placeholdersEnabled
+            )
+        }
+        LoadType.PREPEND -> {
+            LoadParams.Prepend(
+                key = key ?: -1,
+                loadSize = pageSize,
+                placeholdersEnabled = placeholdersEnabled
+            )
+        }
+    }
+}
+
+@Suppress("UNCHECKED_CAST")
+private fun ThreadSafeInvalidationObserver.privateRegisteredState(): AtomicBoolean {
+    return ThreadSafeInvalidationObserver::class.java
+        .getDeclaredField("registered")
+        .let {
+            it.isAccessible = true
+            it.get(this)
+        } as AtomicBoolean
+}
+
+@Database(entities = [TestItem::class], version = 1, exportSchema = false)
+abstract class LimitOffsetTestDb : RoomDatabase() {
+    abstract val dao: TestItemDao
+}
+
+@Entity(tableName = "TestItem")
+data class TestItem(
+    @PrimaryKey val id: Int,
+    val value: String = "item $id"
+)
+
+@Dao
+interface TestItemDao {
+    @Insert
+    fun addAllItems(testItems: List<TestItem>)
+
+    @Insert
+    fun addItem(testItem: TestItem)
+
+    @Query("SELECT COUNT(*) from $tableName")
+    fun itemCount(): Int
+}
diff --git a/room/room-paging-rxjava2/src/main/java/androidx/room/paging/rxjava2/LimitOffsetRxPagingSource.kt b/room/room-paging-rxjava2/src/main/java/androidx/room/paging/rxjava2/LimitOffsetRxPagingSource.kt
new file mode 100644
index 0000000..c4dab32
--- /dev/null
+++ b/room/room-paging-rxjava2/src/main/java/androidx/room/paging/rxjava2/LimitOffsetRxPagingSource.kt
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.paging.rxjava2
+
+import android.database.Cursor
+import androidx.annotation.NonNull
+import androidx.annotation.RestrictTo
+import androidx.annotation.VisibleForTesting
+import androidx.paging.PagingState
+import androidx.paging.rxjava2.RxPagingSource
+import androidx.room.RoomDatabase
+import androidx.room.RoomSQLiteQuery
+import androidx.room.RxRoom.createSingle
+import androidx.room.paging.util.INITIAL_ITEM_COUNT
+import androidx.room.paging.util.INVALID
+import androidx.room.paging.util.ThreadSafeInvalidationObserver
+import androidx.room.paging.util.getClippedRefreshKey
+import androidx.room.paging.util.queryDatabase
+import androidx.room.paging.util.queryItemCount
+import androidx.sqlite.db.SupportSQLiteQuery
+import io.reactivex.Single
+import io.reactivex.schedulers.Schedulers
+import java.util.concurrent.Callable
+import java.util.concurrent.atomic.AtomicInteger
+
+@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
+abstract class LimitOffsetRxPagingSource<Value : Any>(
+    private val sourceQuery: RoomSQLiteQuery,
+    private val db: RoomDatabase,
+    vararg tables: String,
+) : RxPagingSource<Int, Value>() {
+
+    constructor(
+        supportSQLiteQuery: SupportSQLiteQuery,
+        db: RoomDatabase,
+        vararg tables: String
+    ) : this (
+        sourceQuery = RoomSQLiteQuery.copyFrom(supportSQLiteQuery),
+        db = db,
+        tables = tables
+    )
+
+    @VisibleForTesting
+    internal val itemCount: AtomicInteger = AtomicInteger(INITIAL_ITEM_COUNT)
+    @VisibleForTesting
+    internal val observer = ThreadSafeInvalidationObserver(tables = tables) {
+        invalidate()
+    }
+
+    override fun loadSingle(params: LoadParams<Int>): Single<LoadResult<Int, Value>> {
+        val scheduler = Schedulers.from(db.queryExecutor)
+        return createSingle {
+            observer.registerIfNecessary(db)
+            val tempCount = itemCount.get()
+            if (tempCount == INITIAL_ITEM_COUNT) {
+                initialLoad(params)
+            } else {
+                nonInitialLoad(tempCount, params)
+            }
+        }.subscribeOn(scheduler)
+    }
+
+    private fun initialLoad(params: LoadParams<Int>): LoadResult<Int, Value> {
+        return db.runInTransaction(
+            Callable {
+                val tempCount = queryItemCount(sourceQuery, db)
+                itemCount.set(tempCount)
+                queryDatabase(
+                    params = params,
+                    sourceQuery = sourceQuery,
+                    db = db,
+                    itemCount = tempCount,
+                    convertRows = ::convertRows
+                )
+            }
+        )
+    }
+
+    private fun nonInitialLoad(tempCount: Int, params: LoadParams<Int>): LoadResult<Int, Value> {
+        val result = queryDatabase(
+            params = params,
+            sourceQuery = sourceQuery,
+            db = db,
+            itemCount = tempCount,
+            convertRows = ::convertRows
+        )
+        // manually check if database has been updated. If so, the observer's
+        // invalidation callback will invalidate this paging source
+        db.invalidationTracker.refreshVersionsSync()
+        @Suppress("UNCHECKED_CAST")
+        return if (invalid) INVALID as LoadResult.Invalid<Int, Value> else result
+    }
+
+    @NonNull
+    protected abstract fun convertRows(cursor: Cursor): List<Value>
+
+    override fun getRefreshKey(state: PagingState<Int, Value>): Int? {
+        return state.getClippedRefreshKey()
+    }
+
+    override val jumpingSupported: Boolean
+        get() = true
+}