Implemented integration test for upsert focusing on testing RxJava: Flowable and Observable

Bug: 117855270
Test: N/A
Change-Id: I4b83a816d2d2fba59ca024ad854aa6d2c7f83699
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
index fac123e..6e0e3fa 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
@@ -46,6 +46,7 @@
 import io.reactivex.Flowable
 import io.reactivex.Maybe
 import io.reactivex.Single
+import io.reactivex.Observable
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.runBlocking
 import java.util.Date
@@ -477,4 +478,16 @@
 
     @Upsert
     fun upsertBookCompletable(book: Book): Completable
+
+    @Upsert
+    fun upsertBook(book: Book)
+
+    @Upsert
+    fun upsertBooksWithReturns(books: List<Book>): Array<Long>
+
+    @Query("SELECT * FROM book")
+    fun getBooksFlowable(): Flowable<List<Book>>
+
+    @Query("SELECT * FROM book")
+    fun getBooksObservable(): Observable<List<Book>>
 }
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/UpsertTest.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/UpsertTest.kt
index 3ccd123..65b5fb3 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/UpsertTest.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/UpsertTest.kt
@@ -162,4 +162,50 @@
         testObserver.assertError(SQLiteConstraintException::class.java)
         assertThat(testObserver.errors().get(0).message).ignoringCase().contains("foreign key")
     }
+
+    @Test
+    fun upsertFlowable() {
+        booksDao.upsertPublishers(TestUtil.PUBLISHER)
+        booksDao.upsertBook(TestUtil.BOOK_1)
+        val subscriber = TestSubscriber<List<Book>>()
+        booksDao.getBooksFlowable().subscribeWith(subscriber)
+        drain()
+        assertThat(subscriber.values().size).isEqualTo(1)
+        assertThat(subscriber.values()[0]).isEqualTo(listOf(TestUtil.BOOK_1))
+        booksDao.upsertBook(TestUtil.BOOK_1.copy(title = "changed title"))
+        drain()
+        assertThat(subscriber.values()[1][0].title).isEqualTo("changed title")
+        booksDao.upsertBooksWithReturns(buildList {
+            add(TestUtil.BOOK_2)
+            add(TestUtil.BOOK_3)
+        })
+        drain()
+        assertThat(subscriber.values().size).isEqualTo(3)
+        assertThat(subscriber.values()[2]).isEqualTo(
+            listOf(TestUtil.BOOK_1.copy(title = "changed title"), TestUtil.BOOK_2, TestUtil.BOOK_3)
+        )
+    }
+
+    @Test
+    fun upsertObservable() {
+        booksDao.addPublishers(TestUtil.PUBLISHER)
+        booksDao.upsertBook(TestUtil.BOOK_1)
+        val observer = TestObserver<List<Book>>()
+        booksDao.getBooksObservable().subscribeWith(observer)
+        drain()
+        assertThat(observer.values().size).isEqualTo(1)
+        assertThat(observer.values()[0]).isEqualTo(listOf(TestUtil.BOOK_1))
+        booksDao.upsertBook(TestUtil.BOOK_1.copy(title = "changed title"))
+        drain()
+        assertThat(observer.values()[1][0].title).isEqualTo("changed title")
+        booksDao.upsertBooksWithReturns(buildList {
+            add(TestUtil.BOOK_2)
+            add(TestUtil.BOOK_3)
+        })
+        drain()
+        assertThat(observer.values().size).isEqualTo(3)
+        assertThat(observer.values()[2]).isEqualTo(
+            listOf(TestUtil.BOOK_1.copy(title = "changed title"), TestUtil.BOOK_2, TestUtil.BOOK_3)
+        )
+    }
 }