Merge "Handle reentrant case in Room's acquireTransactionThread()" into snap-temp-L60900000959093030
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt
index b3cb2b9..589d690 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt
@@ -18,7 +18,9 @@
import androidx.room.Database
import androidx.room.RoomDatabase
+import androidx.room.androidx.room.integration.kotlintestapp.dao.CounterDao
import androidx.room.androidx.room.integration.kotlintestapp.dao.UsersDao
+import androidx.room.androidx.room.integration.kotlintestapp.vo.Counter
import androidx.room.androidx.room.integration.kotlintestapp.vo.User
import androidx.room.integration.kotlintestapp.dao.AbstractDao
import androidx.room.integration.kotlintestapp.dao.BooksDao
@@ -37,7 +39,7 @@
entities = [
Book::class, Author::class, Publisher::class, BookAuthor::class,
NoArgClass::class, DataClassFromDependency::class, JavaEntity::class,
- EntityWithJavaPojoList::class, User::class
+ EntityWithJavaPojoList::class, User::class, Counter::class
],
version = 1,
exportSchema = false
@@ -53,4 +55,6 @@
abstract fun dependencyDao(): DependencyDao
abstract fun abstractDao(): AbstractDao
+
+ abstract fun counterDao(): CounterDao
}
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
index d53132c..7ec25bd 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
@@ -136,6 +136,9 @@
@Query("SELECT * FROM book")
suspend fun getBooksSuspend(): List<Book>
+ @Query("SELECT * FROM publisher")
+ suspend fun getPublishersSuspend(): List<Publisher>
+
@Query("UPDATE book SET salesCnt = salesCnt + 1 WHERE bookId = :bookId")
fun increaseBookSales(bookId: String)
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/CounterDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/CounterDao.kt
new file mode 100644
index 0000000..62e7709
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/CounterDao.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.dao
+
+import androidx.room.Dao
+import androidx.room.Query
+import androidx.room.Upsert
+import androidx.room.androidx.room.integration.kotlintestapp.vo.Counter
+
+@Dao
+interface CounterDao {
+ @Upsert
+ suspend fun upsert(c: Counter)
+
+ @Query("SELECT * FROM Counter WHERE id = :id")
+ suspend fun getCounter(id: Long): Counter
+}
\ No newline at end of file
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt
index 48efc91..29a48db 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt
@@ -16,10 +16,13 @@
package androidx.room.integration.kotlintestapp.test
+import android.content.Context
import android.os.Build
import androidx.arch.core.executor.ArchTaskExecutor
+import androidx.arch.core.executor.TaskExecutor
import androidx.room.Room
import androidx.room.RoomDatabase
+import androidx.room.androidx.room.integration.kotlintestapp.vo.Counter
import androidx.room.integration.kotlintestapp.NewThreadDispatcher
import androidx.room.integration.kotlintestapp.TestDatabase
import androidx.room.integration.kotlintestapp.vo.Book
@@ -32,8 +35,22 @@
import androidx.test.filters.LargeTest
import com.google.common.truth.Truth.assertThat
import com.google.common.truth.Truth.assertWithMessage
+import java.io.IOException
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
+import kotlin.coroutines.intrinsics.intercepted
+import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
+import kotlin.coroutines.resume
+import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.ObsoleteCoroutinesApi
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Runnable
+import kotlinx.coroutines.TimeoutCancellationException
+import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
@@ -41,18 +58,13 @@
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
import org.junit.After
import org.junit.Assert.fail
import org.junit.Test
import org.junit.runner.RunWith
-import java.io.IOException
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-import kotlinx.coroutines.DelicateCoroutinesApi
@LargeTest
@RunWith(AndroidJUnit4::class)
@@ -158,9 +170,7 @@
database.endTransaction()
}
}
- runBlocking {
- assertThat(booksDao.getBooksSuspend()).isEqualTo(listOf(TestUtil.BOOK_2))
- }
+ assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
}
@Test
@@ -182,9 +192,7 @@
database.endTransaction()
}
}
- runBlocking {
- assertThat(booksDao.getBooksSuspend()).isEqualTo(listOf(TestUtil.BOOK_2))
- }
+ assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
}
@Test
@@ -206,9 +214,7 @@
database.endTransaction()
}
}
- runBlocking(NewThreadDispatcher()) {
- assertThat(booksDao.getBooksSuspend()).isEqualTo(listOf(TestUtil.BOOK_2))
- }
+ assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
}
@Test
@@ -256,10 +262,25 @@
booksDao.deleteUnsoldBooks()
}
}
- runBlocking(NewThreadDispatcher()) {
- assertThat(booksDao.getBooksSuspend())
- .isEqualTo(listOf(TestUtil.BOOK_2))
+ assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
+ }
+
+ @Test
+ fun withTransaction_withContext_newThreadDispatcher() {
+ runBlocking {
+ withContext(NewThreadDispatcher()) {
+ database.withTransaction {
+ booksDao.insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ booksDao.insertBookSuspend(TestUtil.BOOK_1.copy(salesCnt = 0))
+ booksDao.insertBookSuspend(TestUtil.BOOK_2)
+ booksDao.deleteUnsoldBooks()
+ }
+ }
}
+ assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
}
@Test
@@ -275,10 +296,7 @@
booksDao.deleteUnsoldBooks()
}
}
- runBlocking(NewThreadDispatcher()) {
- assertThat(booksDao.getBooksSuspend())
- .isEqualTo(listOf(TestUtil.BOOK_2))
- }
+ assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
}
@Test
@@ -301,6 +319,31 @@
}
@Test
+ fun withTransaction_contextSwitch_exception() {
+ runBlocking {
+ try {
+ database.withTransaction {
+ booksDao.insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ withContext(Dispatchers.IO) {
+ booksDao.insertBookSuspend(TestUtil.BOOK_1.copy(salesCnt = 0))
+ booksDao.insertBookSuspend(TestUtil.BOOK_2)
+ }
+ booksDao.deleteUnsoldBooks()
+ throw IOException("Boom!")
+ }
+ } catch (ex: IOException) {
+ assertThat(ex).hasMessageThat()
+ .contains("Boom")
+ }
+ assertThat(booksDao.getPublishersSuspend()).isEmpty()
+ assertThat(booksDao.getBooksSuspend()).isEmpty()
+ }
+ }
+
+ @Test
fun withTransaction_exception() {
runBlocking {
database.withTransaction {
@@ -314,6 +357,7 @@
try {
database.withTransaction {
booksDao.insertBookSuspend(TestUtil.BOOK_2)
+ booksDao.insertBookSuspend(TestUtil.BOOK_3)
throw IOException("Boom!")
}
@Suppress("UNREACHABLE_CODE")
@@ -369,8 +413,8 @@
}
}
- assertThat(booksDao.getBooksSuspend())
- .isEqualTo(emptyList<Book>())
+ assertThat(booksDao.getPublishersSuspend()).isEmpty()
+ assertThat(booksDao.getBooksSuspend()).isEmpty()
}
}
@@ -464,6 +508,7 @@
@Test
fun withTransaction_cancelCoroutine() {
+
runBlocking {
booksDao.insertPublisherSuspend(
TestUtil.PUBLISHER.publisherId,
@@ -496,6 +541,42 @@
}
@Test
+ fun withTransaction_busyExecutor_cancelCoroutine() {
+ val executorService = Executors.newSingleThreadExecutor()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executorService)
+ .build()
+
+ // Simulate a busy executor, no thread to acquire for transaction.
+ val busyLatch = CountDownLatch(1)
+ executorService.execute {
+ busyLatch.await()
+ }
+ runBlocking {
+ val startedRunning = CountDownLatch(1)
+ val job = launch(Dispatchers.IO) {
+ startedRunning.countDown()
+ delay(200) // yield and delay to queue the runnable in transaction executor
+ localDatabase.withTransaction {
+ fail("Transaction block should have never run!")
+ }
+ }
+
+ assertThat(startedRunning.await(1, TimeUnit.SECONDS)).isTrue()
+ job.cancelAndJoin()
+ }
+
+ // free busy thread
+ busyLatch.countDown()
+ executorService.shutdown()
+ assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+
+ assertThat(localDatabase.booksDao().getPublishers()).isEmpty()
+ }
+
+ @Test
fun withTransaction_blockingDaoMethods() {
runBlocking {
database.withTransaction {
@@ -655,15 +736,12 @@
}
}
- runBlocking {
- // as Set since insertion order is undefined
- assertThat(booksDao.getBooksSuspend().toSet())
- .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
- }
+ // as Set since insertion order is undefined
+ assertThat(booksDao.allBooks.toSet())
+ .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
}
@Test
- @ObsoleteCoroutinesApi
@Suppress("DeferredResultUnused")
fun withTransaction_multipleTransactions_multipleThreads() {
runBlocking {
@@ -689,31 +767,16 @@
}
}
- runBlocking {
- // as Set since insertion order is undefined
- assertThat(booksDao.getBooksSuspend().toSet())
- .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
- }
+ // as Set since insertion order is undefined
+ assertThat(booksDao.allBooks.toSet())
+ .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
}
@Test
@Suppress("DeferredResultUnused")
fun withTransaction_multipleTransactions_verifyThreadUsage() {
val busyThreadsCount = AtomicInteger()
- // Executor wrapper that counts threads that are busy executing commands.
- class WrappedService(val delegate: ExecutorService) : ExecutorService by delegate {
- override fun execute(command: Runnable) {
- delegate.execute {
- busyThreadsCount.incrementAndGet()
- try {
- command.run()
- } finally {
- busyThreadsCount.decrementAndGet()
- }
- }
- }
- }
- val wrappedExecutor = WrappedService(Executors.newCachedThreadPool())
+ val wrappedExecutor = BusyCountingService(busyThreadsCount, Executors.newCachedThreadPool())
val localDatabase = Room.inMemoryDatabaseBuilder(
ApplicationProvider.getApplicationContext(), TestDatabase::class.java
)
@@ -740,57 +803,67 @@
}
}
- wrappedExecutor.awaitTermination(1, TimeUnit.SECONDS)
+ assertThat(busyThreadsCount.get()).isEqualTo(0)
+ wrappedExecutor.shutdown()
+ assertThat(wrappedExecutor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
}
@Test
fun withTransaction_busyExecutor() {
+ val executorService = Executors.newSingleThreadExecutor()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executorService)
+ .build()
+
+ // Simulate a busy executor, no thread to acquire for transaction.
+ val busyLatch = CountDownLatch(1)
+ executorService.execute {
+ busyLatch.await()
+ }
runBlocking {
- val executorService = Executors.newSingleThreadExecutor()
- val localDatabase = Room.inMemoryDatabaseBuilder(
- ApplicationProvider.getApplicationContext(), TestDatabase::class.java
- )
- .setTransactionExecutor(executorService)
- .build()
-
- // Simulate a busy executor, no thread to acquire for transaction.
- val busyLatch = CountDownLatch(1)
- executorService.execute {
- busyLatch.await()
- }
-
var asyncExecuted = false
- val transactionLatch = CountDownLatch(1)
val job = async(Dispatchers.IO) {
asyncExecuted = true
localDatabase.withTransaction {
- transactionLatch.countDown()
+ booksDao.insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
}
}
- assertThat(transactionLatch.await(1000, TimeUnit.MILLISECONDS)).isFalse()
+ try {
+ withTimeout(1000) {
+ job.join()
+ }
+ fail("A timeout should have occurred!")
+ } catch (_: TimeoutCancellationException) { }
job.cancelAndJoin()
assertThat(asyncExecuted).isTrue()
-
- // free busy thread
- busyLatch.countDown()
- executorService.awaitTermination(1, TimeUnit.SECONDS)
}
+ // free busy thread
+ busyLatch.countDown()
+ executorService.shutdown()
+ assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+
+ assertThat(booksDao.getPublishers()).isEmpty()
}
@Test
fun withTransaction_shutdownExecutor() {
+ val executorService = Executors.newCachedThreadPool()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executorService)
+ .build()
+
+ executorService.shutdownNow()
+
runBlocking {
- val executorService = Executors.newCachedThreadPool()
- val localDatabase = Room.inMemoryDatabaseBuilder(
- ApplicationProvider.getApplicationContext(), TestDatabase::class.java
- )
- .setTransactionExecutor(executorService)
- .build()
-
- executorService.shutdownNow()
-
try {
localDatabase.withTransaction {
fail("This coroutine should never run.")
@@ -801,22 +874,24 @@
.contains("Unable to acquire a thread to perform the database transaction")
}
}
+
+ executorService.shutdown()
+ assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
}
@Test
fun withTransaction_databaseOpenError() {
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .addCallback(object : RoomDatabase.Callback() {
+ override fun onOpen(db: SupportSQLiteDatabase) {
+ // this causes all transaction methods to throw, this can happen IRL
+ throw RuntimeException("Error opening Database.")
+ }
+ })
+ .build()
runBlocking {
- val localDatabase = Room.inMemoryDatabaseBuilder(
- ApplicationProvider.getApplicationContext(), TestDatabase::class.java
- )
- .addCallback(object : RoomDatabase.Callback() {
- override fun onOpen(db: SupportSQLiteDatabase) {
- // this causes all transaction methods to throw, this can happen IRL
- throw RuntimeException("Error opening Database.")
- }
- })
- .build()
-
try {
localDatabase.withTransaction {
fail("This coroutine should never run.")
@@ -830,41 +905,40 @@
@Test
fun withTransaction_beginTransaction_error() {
- runBlocking {
- // delegate and delegate just so that we can throw in beginTransaction()
- val localDatabase = Room.inMemoryDatabaseBuilder(
- ApplicationProvider.getApplicationContext(), TestDatabase::class.java
- )
- .openHelperFactory(
- object : SupportSQLiteOpenHelper.Factory {
- val factoryDelegate = FrameworkSQLiteOpenHelperFactory()
- override fun create(
- configuration: SupportSQLiteOpenHelper.Configuration
- ): SupportSQLiteOpenHelper {
- val helperDelegate = factoryDelegate.create(configuration)
- return object : SupportSQLiteOpenHelper by helperDelegate {
- override val writableDatabase: SupportSQLiteDatabase
- get() {
- val databaseDelegate = helperDelegate.writableDatabase
- return object : SupportSQLiteDatabase by databaseDelegate {
- override fun beginTransaction() {
- throw RuntimeException(
- "Error beginning transaction."
- )
- }
- override fun beginTransactionNonExclusive() {
- throw RuntimeException(
- "Error beginning transaction."
- )
- }
+ // delegate and delegate just so that we can throw in beginTransaction()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .openHelperFactory(
+ object : SupportSQLiteOpenHelper.Factory {
+ val factoryDelegate = FrameworkSQLiteOpenHelperFactory()
+ override fun create(
+ configuration: SupportSQLiteOpenHelper.Configuration
+ ): SupportSQLiteOpenHelper {
+ val helperDelegate = factoryDelegate.create(configuration)
+ return object : SupportSQLiteOpenHelper by helperDelegate {
+ override val writableDatabase: SupportSQLiteDatabase
+ get() {
+ val databaseDelegate = helperDelegate.writableDatabase
+ return object : SupportSQLiteDatabase by databaseDelegate {
+ override fun beginTransaction() {
+ throw RuntimeException(
+ "Error beginning transaction."
+ )
+ }
+ override fun beginTransactionNonExclusive() {
+ throw RuntimeException(
+ "Error beginning transaction."
+ )
}
}
- }
+ }
}
}
- )
- .build()
-
+ }
+ )
+ .build()
+ runBlocking {
try {
localDatabase.withTransaction {
fail("This coroutine should never run.")
@@ -1023,4 +1097,286 @@
assertThat(booksDao.getBooksSuspend())
.contains(addedBook)
}
+
+ @Test
+ fun withTransaction_instantTaskExecutorRule() = runBlocking {
+ // Not the actual InstantTaskExecutorRule since this test class already uses
+ // CountingTaskExecutorRule but same behaviour.
+ ArchTaskExecutor.getInstance().setDelegate(object : TaskExecutor() {
+ override fun executeOnDiskIO(runnable: Runnable) {
+ runnable.run()
+ }
+
+ override fun postToMainThread(runnable: Runnable) {
+ runnable.run()
+ }
+
+ override fun isMainThread(): Boolean {
+ return false
+ }
+ })
+ database.withTransaction {
+ booksDao.insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ }
+ assertThat(booksDao.getPublishers().size).isEqualTo(1)
+ }
+
+ @Test
+ fun withTransaction_singleExecutorDispatcher() {
+ val executor = Executors.newSingleThreadExecutor()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executor)
+ .build()
+ runBlocking {
+ withContext(executor.asCoroutineDispatcher()) {
+ localDatabase.withTransaction {
+ localDatabase.booksDao().insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ }
+ }
+ }
+ assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+
+ executor.shutdown()
+ assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+ }
+
+ @Test
+ fun withTransaction_reentrant_nested() {
+ val executor = Executors.newSingleThreadExecutor()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executor)
+ .build()
+ runBlocking {
+ withContext(executor.asCoroutineDispatcher()) {
+ localDatabase.withTransaction {
+ localDatabase.booksDao().insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ localDatabase.withTransaction {
+ localDatabase.booksDao().insertBookSuspend(TestUtil.BOOK_1)
+ }
+ }
+ }
+ }
+ assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+ assertThat(localDatabase.booksDao().allBooks.size).isEqualTo(1)
+
+ executor.shutdown()
+ assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+ }
+
+ @Test
+ fun withTransaction_reentrant_nested_exception() {
+ val executor = Executors.newSingleThreadExecutor()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executor)
+ .build()
+ runBlocking {
+ withContext(executor.asCoroutineDispatcher()) {
+ localDatabase.withTransaction {
+ localDatabase.booksDao().insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ try {
+ localDatabase.withTransaction {
+ localDatabase.booksDao().insertBookSuspend(TestUtil.BOOK_1)
+ throw IOException("Boom!")
+ }
+ @Suppress("UNREACHABLE_CODE")
+ fail("An exception should have been thrown.")
+ } catch (ex: IOException) {
+ assertThat(ex).hasMessageThat()
+ .contains("Boom")
+ }
+ }
+ }
+ }
+ assertThat(localDatabase.booksDao().getPublishers()).isEmpty()
+ assertThat(localDatabase.booksDao().allBooks).isEmpty()
+
+ executor.shutdown()
+ assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+ }
+
+ @Test
+ fun withTransaction_reentrant_nested_contextSwitch() {
+ val executor = Executors.newSingleThreadExecutor()
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executor)
+ .build()
+
+ runBlocking {
+ withContext(executor.asCoroutineDispatcher()) {
+ localDatabase.withTransaction {
+ localDatabase.booksDao().insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ withContext(Dispatchers.IO) {
+ localDatabase.withTransaction {
+ localDatabase.booksDao().insertBookSuspend(TestUtil.BOOK_1)
+ }
+ }
+ }
+ }
+ }
+ assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+ assertThat(localDatabase.booksDao().allBooks.size).isEqualTo(1)
+
+ executor.shutdown()
+ assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+ }
+
+ @Test
+ fun withTransaction_reentrant_busyExecutor() {
+ val busyThreadsCount = AtomicInteger()
+ val executor =
+ BusyCountingService(busyThreadsCount, Executors.newFixedThreadPool(2))
+ val localDatabase = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+ )
+ .setTransactionExecutor(executor)
+ .build()
+
+ // Grab one of the thread and simulate busy work
+ val busyLatch = CountDownLatch(1)
+ executor.execute {
+ busyLatch.await()
+ }
+
+ runBlocking {
+ // Using the other thread in the pool this will cause a reentrant situation
+ withContext(executor.asCoroutineDispatcher()) {
+ localDatabase.withTransaction {
+ val transactionThread = Thread.currentThread()
+ // Suspend transaction thread while freeing the busy thread from the pool
+ withContext(Dispatchers.IO) {
+ busyLatch.countDown()
+ delay(200)
+ // Only one thread is busy, the transaction thread
+ assertThat(busyThreadsCount.get()).isEqualTo(1)
+ }
+ // Resume in the transaction thread, the recently free thread in the pool that
+ // is not in a transaction should not be used.
+ assertThat(Thread.currentThread()).isEqualTo(transactionThread)
+ localDatabase.booksDao().insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ }
+ }
+ }
+
+ assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+
+ executor.shutdown()
+ assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+ }
+
+ @Test
+ @OptIn(ExperimentalCoroutinesApi::class)
+ fun withTransaction_runTest() {
+ runTest {
+ database.withTransaction {
+ booksDao.insertPublisherSuspend(
+ TestUtil.PUBLISHER.publisherId,
+ TestUtil.PUBLISHER.name
+ )
+ booksDao.insertBookSuspend(TestUtil.BOOK_1.copy(salesCnt = 0))
+ booksDao.insertBookSuspend(TestUtil.BOOK_2)
+ booksDao.deleteUnsoldBooks()
+ }
+ assertThat(booksDao.getBooksSuspend())
+ .isEqualTo(listOf(TestUtil.BOOK_2))
+ }
+ }
+
+ @Test
+ fun withTransaction_stress_testMutation() {
+ val output = mutableListOf<String>()
+ runBlocking {
+ repeat(5000) { count ->
+ database.withTransaction {
+ output.add("$count")
+ suspendHere()
+ output.add("$count")
+ }
+ }
+ }
+
+ val expectedOutput = buildList {
+ repeat(5000) { count ->
+ add("$count")
+ add("$count")
+ }
+ }
+ assertThat(output).isEqualTo(expectedOutput)
+ }
+
+ @Test
+ fun withTransaction_stress_dbMutation() {
+ val context: Context = ApplicationProvider.getApplicationContext()
+ context.deleteDatabase("test_stress_dbMutation.db")
+ val db = Room.databaseBuilder(
+ context,
+ TestDatabase::class.java,
+ "test.db"
+ ).build()
+ runBlocking {
+ db.counterDao().upsert(Counter(1, 0))
+ repeat(5000) {
+ launch(Dispatchers.IO) {
+ db.withTransaction {
+ val current = db.counterDao().getCounter(1)
+ suspendHere()
+ db.counterDao().upsert(current.copy(value = current.value + 1))
+ }
+ }
+ }
+ }
+ runBlocking {
+ val count = db.counterDao().getCounter(1)
+ assertThat(count.value).isEqualTo(5000)
+ }
+ db.close()
+ }
+
+ // Utility function to _really_ suspend.
+ private suspend fun suspendHere(): Unit = suspendCoroutineUninterceptedOrReturn {
+ it.intercepted().resume(Unit)
+ COROUTINE_SUSPENDED
+ }
+
+ // Executor wrapper that counts threads that are busy executing commands.
+ class BusyCountingService(
+ val count: AtomicInteger,
+ val delegate: ExecutorService
+ ) : ExecutorService by delegate {
+ override fun execute(command: Runnable) {
+ delegate.execute {
+ count.incrementAndGet()
+ try {
+ command.run()
+ } finally {
+ count.decrementAndGet()
+ }
+ }
+ }
+ }
}
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/vo/Counter.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/vo/Counter.kt
new file mode 100644
index 0000000..ce8fd1c
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/vo/Counter.kt
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.vo
+
+import androidx.room.Entity
+import androidx.room.PrimaryKey
+
+@Entity
+data class Counter(
+ @PrimaryKey val id: Long,
+ val value: Int
+)
\ No newline at end of file
diff --git a/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt b/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt
index 4edabab..9fc8d5d 100644
--- a/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt
+++ b/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt
@@ -18,18 +18,17 @@
package androidx.room
import androidx.annotation.RestrictTo
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.asContextElement
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
-import java.util.concurrent.Executor
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.coroutines.resume
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asContextElement
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlinx.coroutines.withContext
/**
* Calls the specified suspending [block] in a database transaction. The transaction will be
@@ -43,13 +42,11 @@
* one received by the suspending block. It is recommended that all [Dao] function invoked within
* the [block] be suspending functions.
*
- * The dispatcher used to execute the given [block] will utilize threads from Room's query executor.
+ * The internal dispatcher used to execute the given [block] will block an utilize a thread from
+ * Room's transaction executor until the [block] is complete.
*/
public suspend fun <R> RoomDatabase.withTransaction(block: suspend () -> R): R {
- // Use inherited transaction context if available, this allows nested suspending transactions.
- val transactionContext =
- coroutineContext[TransactionElement]?.transactionDispatcher ?: createTransactionContext()
- return withContext(transactionContext) {
+ val transactionBlock: suspend CoroutineScope.() -> R = transaction@{
val transactionElement = coroutineContext[TransactionElement]!!
transactionElement.acquire()
try {
@@ -59,7 +56,7 @@
val result = block.invoke()
@Suppress("DEPRECATION")
setTransactionSuccessful()
- return@withContext result
+ return@transaction result
} finally {
@Suppress("DEPRECATION")
endTransaction()
@@ -68,6 +65,51 @@
transactionElement.release()
}
}
+ // Use inherited transaction context if available, this allows nested suspending transactions.
+ val transactionDispatcher = coroutineContext[TransactionElement]?.transactionDispatcher
+ return if (transactionDispatcher != null) {
+ withContext(transactionDispatcher, transactionBlock)
+ } else {
+ startTransactionCoroutine(coroutineContext, transactionBlock)
+ }
+}
+
+/**
+ * Suspend caller coroutine and start the transaction coroutine in a thread from the
+ * [RoomDatabase.transactionExecutor], resuming the caller coroutine with the result once done.
+ * The [context] will be a parent of the started coroutine to propagating cancellation and release
+ * the thread when cancelled.
+ */
+private suspend fun <R> RoomDatabase.startTransactionCoroutine(
+ context: CoroutineContext,
+ transactionBlock: suspend CoroutineScope.() -> R
+): R = suspendCancellableCoroutine { continuation ->
+ try {
+ transactionExecutor.execute {
+ try {
+ // Thread acquired, start the transaction coroutine using the parent context.
+ // The started coroutine will have an event loop dispatcher that we'll use for the
+ // transaction context.
+ runBlocking(context.minusKey(ContinuationInterceptor)) {
+ val dispatcher = coroutineContext[ContinuationInterceptor]!!
+ val transactionContext = createTransactionContext(dispatcher)
+ continuation.resume(
+ withContext(transactionContext, transactionBlock)
+ )
+ }
+ } catch (ex: Throwable) {
+ // If anything goes wrong, propagate exception to the calling coroutine.
+ continuation.cancel(ex)
+ }
+ }
+ } catch (ex: RejectedExecutionException) {
+ // Couldn't acquire a thread, cancel coroutine.
+ continuation.cancel(
+ IllegalStateException(
+ "Unable to acquire a thread to perform the database transaction.", ex
+ )
+ )
+ }
}
/**
@@ -76,8 +118,9 @@
* The context is a combination of a dispatcher, a [TransactionElement] and a thread local element.
*
* * The dispatcher will dispatch coroutines to a single thread that is taken over from the Room
- * query executor. If the coroutine context is switched, suspending DAO functions will be able to
- * dispatch to the transaction thread.
+ * transaction executor. If the coroutine context is switched, suspending DAO functions will be able
+ * to dispatch to the transaction thread. In reality the dispatcher is the event loop of a
+ * [runBlocking] started on the dedicated thread.
*
* * The [TransactionElement] serves as an indicator for inherited context, meaning, if there is a
* switch of context, suspending DAO methods will be able to use the indicator to dispatch the
@@ -88,61 +131,22 @@
* if a blocking DAO method is invoked within the transaction coroutine. Never assign meaning to
* this value, for now all we care is if its present or not.
*/
-private suspend fun RoomDatabase.createTransactionContext(): CoroutineContext {
- val controlJob = Job()
- // make sure to tie the control job to this context to avoid blocking the transaction if
- // context get cancelled before we can even start using this job. Otherwise, the acquired
- // transaction thread will forever wait for the controlJob to be cancelled.
- // see b/148181325
- coroutineContext[Job]?.invokeOnCompletion {
- controlJob.cancel()
- }
- val dispatcher = transactionExecutor.acquireTransactionThread(controlJob)
- val transactionElement = TransactionElement(controlJob, dispatcher)
+private fun RoomDatabase.createTransactionContext(
+ dispatcher: ContinuationInterceptor
+): CoroutineContext {
+ val transactionElement = TransactionElement(dispatcher)
val threadLocalElement =
- suspendingTransactionId.asContextElement(System.identityHashCode(controlJob))
+ suspendingTransactionId.asContextElement(System.identityHashCode(transactionElement))
return dispatcher + transactionElement + threadLocalElement
}
/**
- * Acquires a thread from the executor and returns a [ContinuationInterceptor] to dispatch
- * coroutines to the acquired thread. The [controlJob] is used to control the release of the
- * thread by cancelling the job.
- */
-private suspend fun Executor.acquireTransactionThread(controlJob: Job): ContinuationInterceptor {
- return suspendCancellableCoroutine { continuation ->
- continuation.invokeOnCancellation {
- // We got cancelled while waiting to acquire a thread, we can't stop our attempt to
- // acquire a thread, but we can cancel the controlling job so once it gets acquired it
- // is quickly released.
- controlJob.cancel()
- }
- try {
- execute {
- runBlocking {
- // Thread acquired, resume coroutine.
- continuation.resume(coroutineContext[ContinuationInterceptor]!!)
- controlJob.join()
- }
- }
- } catch (ex: RejectedExecutionException) {
- // Couldn't acquire a thread, cancel coroutine.
- continuation.cancel(
- IllegalStateException(
- "Unable to acquire a thread to perform the database transaction.", ex
- )
- )
- }
- }
-}
-/**
* A [CoroutineContext.Element] that indicates there is an on-going database transaction.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
internal class TransactionElement(
- private val transactionThreadControlJob: Job,
internal val transactionDispatcher: ContinuationInterceptor
) : CoroutineContext.Element {
@@ -153,9 +157,7 @@
/**
* Number of transactions (including nested ones) started with this element.
- * Call [acquire] to increase the count and [release] to decrease it. If the count reaches zero
- * when [release] is invoked then the transaction job is cancelled and the transaction thread
- * is released.
+ * Call [acquire] to increase the count and [release] to decrease it.
*/
private val referenceCount = AtomicInteger(0)
@@ -167,9 +169,6 @@
val count = referenceCount.decrementAndGet()
if (count < 0) {
throw IllegalStateException("Transaction was never started or was already released.")
- } else if (count == 0) {
- // Cancel the job that controls the transaction thread, causing it to be released.
- transactionThreadControlJob.cancel()
}
}
}
\ No newline at end of file
diff --git a/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt b/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt
index bd253f2..820d26a 100644
--- a/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt
+++ b/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt
@@ -53,8 +53,8 @@
*
* ```
* db.beginTransaction()
- * try {
- * ...
+ * try {
+ * ...
* db.setTransactionSuccessful()
* } finally {
* db.endTransaction()
@@ -75,10 +75,10 @@
*
* ```
* db.beginTransactionNonExclusive()
- * try {
- * ...
+ * try {
+ * ...
* db.setTransactionSuccessful()
- * } finally {
+ * } finally {
* db.endTransaction()
* }
* ```
@@ -126,7 +126,7 @@
* db.beginTransactionWithListenerNonExclusive(listener)
* try {
* ...
- * db.setTransactionSuccessful()
+ * db.setTransactionSuccessful()
* } finally {
* db.endTransaction()
* }