Merge "Handle reentrant case in Room's acquireTransactionThread()" into snap-temp-L60900000959093030
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt
index b3cb2b9..589d690 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/TestDatabase.kt
@@ -18,7 +18,9 @@
 
 import androidx.room.Database
 import androidx.room.RoomDatabase
+import androidx.room.androidx.room.integration.kotlintestapp.dao.CounterDao
 import androidx.room.androidx.room.integration.kotlintestapp.dao.UsersDao
+import androidx.room.androidx.room.integration.kotlintestapp.vo.Counter
 import androidx.room.androidx.room.integration.kotlintestapp.vo.User
 import androidx.room.integration.kotlintestapp.dao.AbstractDao
 import androidx.room.integration.kotlintestapp.dao.BooksDao
@@ -37,7 +39,7 @@
     entities = [
         Book::class, Author::class, Publisher::class, BookAuthor::class,
         NoArgClass::class, DataClassFromDependency::class, JavaEntity::class,
-        EntityWithJavaPojoList::class, User::class
+        EntityWithJavaPojoList::class, User::class, Counter::class
     ],
     version = 1,
     exportSchema = false
@@ -53,4 +55,6 @@
     abstract fun dependencyDao(): DependencyDao
 
     abstract fun abstractDao(): AbstractDao
+
+    abstract fun counterDao(): CounterDao
 }
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
index d53132c..7ec25bd 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/BooksDao.kt
@@ -136,6 +136,9 @@
     @Query("SELECT * FROM book")
     suspend fun getBooksSuspend(): List<Book>
 
+    @Query("SELECT * FROM publisher")
+    suspend fun getPublishersSuspend(): List<Publisher>
+
     @Query("UPDATE book SET salesCnt = salesCnt + 1 WHERE bookId = :bookId")
     fun increaseBookSales(bookId: String)
 
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/CounterDao.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/CounterDao.kt
new file mode 100644
index 0000000..62e7709
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/dao/CounterDao.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.dao
+
+import androidx.room.Dao
+import androidx.room.Query
+import androidx.room.Upsert
+import androidx.room.androidx.room.integration.kotlintestapp.vo.Counter
+
+@Dao
+interface CounterDao {
+    @Upsert
+    suspend fun upsert(c: Counter)
+
+    @Query("SELECT * FROM Counter WHERE id = :id")
+    suspend fun getCounter(id: Long): Counter
+}
\ No newline at end of file
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt
index 48efc91..29a48db 100644
--- a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/test/SuspendingQueryTest.kt
@@ -16,10 +16,13 @@
 
 package androidx.room.integration.kotlintestapp.test
 
+import android.content.Context
 import android.os.Build
 import androidx.arch.core.executor.ArchTaskExecutor
+import androidx.arch.core.executor.TaskExecutor
 import androidx.room.Room
 import androidx.room.RoomDatabase
+import androidx.room.androidx.room.integration.kotlintestapp.vo.Counter
 import androidx.room.integration.kotlintestapp.NewThreadDispatcher
 import androidx.room.integration.kotlintestapp.TestDatabase
 import androidx.room.integration.kotlintestapp.vo.Book
@@ -32,8 +35,22 @@
 import androidx.test.filters.LargeTest
 import com.google.common.truth.Truth.assertThat
 import com.google.common.truth.Truth.assertWithMessage
+import java.io.IOException
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
+import kotlin.coroutines.intrinsics.intercepted
+import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
+import kotlin.coroutines.resume
+import kotlinx.coroutines.DelicateCoroutinesApi
 import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.ObsoleteCoroutinesApi
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Runnable
+import kotlinx.coroutines.TimeoutCancellationException
+import kotlinx.coroutines.asCoroutineDispatcher
 import kotlinx.coroutines.async
 import kotlinx.coroutines.cancelAndJoin
 import kotlinx.coroutines.coroutineScope
@@ -41,18 +58,13 @@
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.newSingleThreadContext
 import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.runTest
 import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeout
 import org.junit.After
 import org.junit.Assert.fail
 import org.junit.Test
 import org.junit.runner.RunWith
-import java.io.IOException
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-import kotlinx.coroutines.DelicateCoroutinesApi
 
 @LargeTest
 @RunWith(AndroidJUnit4::class)
@@ -158,9 +170,7 @@
                 database.endTransaction()
             }
         }
-        runBlocking {
-            assertThat(booksDao.getBooksSuspend()).isEqualTo(listOf(TestUtil.BOOK_2))
-        }
+        assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
     }
 
     @Test
@@ -182,9 +192,7 @@
                 database.endTransaction()
             }
         }
-        runBlocking {
-            assertThat(booksDao.getBooksSuspend()).isEqualTo(listOf(TestUtil.BOOK_2))
-        }
+        assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
     }
 
     @Test
@@ -206,9 +214,7 @@
                 database.endTransaction()
             }
         }
-        runBlocking(NewThreadDispatcher()) {
-            assertThat(booksDao.getBooksSuspend()).isEqualTo(listOf(TestUtil.BOOK_2))
-        }
+        assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
     }
 
     @Test
@@ -256,10 +262,25 @@
                 booksDao.deleteUnsoldBooks()
             }
         }
-        runBlocking(NewThreadDispatcher()) {
-            assertThat(booksDao.getBooksSuspend())
-                .isEqualTo(listOf(TestUtil.BOOK_2))
+        assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
+    }
+
+    @Test
+    fun withTransaction_withContext_newThreadDispatcher() {
+        runBlocking {
+            withContext(NewThreadDispatcher()) {
+                database.withTransaction {
+                    booksDao.insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
+                    booksDao.insertBookSuspend(TestUtil.BOOK_1.copy(salesCnt = 0))
+                    booksDao.insertBookSuspend(TestUtil.BOOK_2)
+                    booksDao.deleteUnsoldBooks()
+                }
+            }
         }
+        assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
     }
 
     @Test
@@ -275,10 +296,7 @@
                 booksDao.deleteUnsoldBooks()
             }
         }
-        runBlocking(NewThreadDispatcher()) {
-            assertThat(booksDao.getBooksSuspend())
-                .isEqualTo(listOf(TestUtil.BOOK_2))
-        }
+        assertThat(booksDao.allBooks).isEqualTo(listOf(TestUtil.BOOK_2))
     }
 
     @Test
@@ -301,6 +319,31 @@
     }
 
     @Test
+    fun withTransaction_contextSwitch_exception() {
+        runBlocking {
+            try {
+                database.withTransaction {
+                    booksDao.insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
+                    withContext(Dispatchers.IO) {
+                        booksDao.insertBookSuspend(TestUtil.BOOK_1.copy(salesCnt = 0))
+                        booksDao.insertBookSuspend(TestUtil.BOOK_2)
+                    }
+                    booksDao.deleteUnsoldBooks()
+                    throw IOException("Boom!")
+                }
+            } catch (ex: IOException) {
+                assertThat(ex).hasMessageThat()
+                    .contains("Boom")
+            }
+            assertThat(booksDao.getPublishersSuspend()).isEmpty()
+            assertThat(booksDao.getBooksSuspend()).isEmpty()
+        }
+    }
+
+    @Test
     fun withTransaction_exception() {
         runBlocking {
             database.withTransaction {
@@ -314,6 +357,7 @@
             try {
                 database.withTransaction {
                     booksDao.insertBookSuspend(TestUtil.BOOK_2)
+                    booksDao.insertBookSuspend(TestUtil.BOOK_3)
                     throw IOException("Boom!")
                 }
                 @Suppress("UNREACHABLE_CODE")
@@ -369,8 +413,8 @@
                 }
             }
 
-            assertThat(booksDao.getBooksSuspend())
-                .isEqualTo(emptyList<Book>())
+            assertThat(booksDao.getPublishersSuspend()).isEmpty()
+            assertThat(booksDao.getBooksSuspend()).isEmpty()
         }
     }
 
@@ -464,6 +508,7 @@
 
     @Test
     fun withTransaction_cancelCoroutine() {
+
         runBlocking {
             booksDao.insertPublisherSuspend(
                 TestUtil.PUBLISHER.publisherId,
@@ -496,6 +541,42 @@
     }
 
     @Test
+    fun withTransaction_busyExecutor_cancelCoroutine() {
+        val executorService = Executors.newSingleThreadExecutor()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executorService)
+            .build()
+
+        // Simulate a busy executor, no thread to acquire for transaction.
+        val busyLatch = CountDownLatch(1)
+        executorService.execute {
+            busyLatch.await()
+        }
+        runBlocking {
+            val startedRunning = CountDownLatch(1)
+            val job = launch(Dispatchers.IO) {
+                startedRunning.countDown()
+                delay(200) // yield and delay to queue the runnable in transaction executor
+                localDatabase.withTransaction {
+                    fail("Transaction block should have never run!")
+                }
+            }
+
+            assertThat(startedRunning.await(1, TimeUnit.SECONDS)).isTrue()
+            job.cancelAndJoin()
+        }
+
+        // free busy thread
+        busyLatch.countDown()
+        executorService.shutdown()
+        assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+
+        assertThat(localDatabase.booksDao().getPublishers()).isEmpty()
+    }
+
+    @Test
     fun withTransaction_blockingDaoMethods() {
         runBlocking {
             database.withTransaction {
@@ -655,15 +736,12 @@
             }
         }
 
-        runBlocking {
-            // as Set since insertion order is undefined
-            assertThat(booksDao.getBooksSuspend().toSet())
-                .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
-        }
+        // as Set since insertion order is undefined
+        assertThat(booksDao.allBooks.toSet())
+            .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
     }
 
     @Test
-    @ObsoleteCoroutinesApi
     @Suppress("DeferredResultUnused")
     fun withTransaction_multipleTransactions_multipleThreads() {
         runBlocking {
@@ -689,31 +767,16 @@
             }
         }
 
-        runBlocking {
-            // as Set since insertion order is undefined
-            assertThat(booksDao.getBooksSuspend().toSet())
-                .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
-        }
+        // as Set since insertion order is undefined
+        assertThat(booksDao.allBooks.toSet())
+            .isEqualTo(setOf(TestUtil.BOOK_1, TestUtil.BOOK_2))
     }
 
     @Test
     @Suppress("DeferredResultUnused")
     fun withTransaction_multipleTransactions_verifyThreadUsage() {
         val busyThreadsCount = AtomicInteger()
-        // Executor wrapper that counts threads that are busy executing commands.
-        class WrappedService(val delegate: ExecutorService) : ExecutorService by delegate {
-            override fun execute(command: Runnable) {
-                delegate.execute {
-                    busyThreadsCount.incrementAndGet()
-                    try {
-                        command.run()
-                    } finally {
-                        busyThreadsCount.decrementAndGet()
-                    }
-                }
-            }
-        }
-        val wrappedExecutor = WrappedService(Executors.newCachedThreadPool())
+        val wrappedExecutor = BusyCountingService(busyThreadsCount, Executors.newCachedThreadPool())
         val localDatabase = Room.inMemoryDatabaseBuilder(
             ApplicationProvider.getApplicationContext(), TestDatabase::class.java
         )
@@ -740,57 +803,67 @@
             }
         }
 
-        wrappedExecutor.awaitTermination(1, TimeUnit.SECONDS)
+        assertThat(busyThreadsCount.get()).isEqualTo(0)
+        wrappedExecutor.shutdown()
+        assertThat(wrappedExecutor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
     }
 
     @Test
     fun withTransaction_busyExecutor() {
+        val executorService = Executors.newSingleThreadExecutor()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executorService)
+            .build()
+
+        // Simulate a busy executor, no thread to acquire for transaction.
+        val busyLatch = CountDownLatch(1)
+        executorService.execute {
+            busyLatch.await()
+        }
         runBlocking {
-            val executorService = Executors.newSingleThreadExecutor()
-            val localDatabase = Room.inMemoryDatabaseBuilder(
-                ApplicationProvider.getApplicationContext(), TestDatabase::class.java
-            )
-                .setTransactionExecutor(executorService)
-                .build()
-
-            // Simulate a busy executor, no thread to acquire for transaction.
-            val busyLatch = CountDownLatch(1)
-            executorService.execute {
-                busyLatch.await()
-            }
-
             var asyncExecuted = false
-            val transactionLatch = CountDownLatch(1)
             val job = async(Dispatchers.IO) {
                 asyncExecuted = true
                 localDatabase.withTransaction {
-                    transactionLatch.countDown()
+                    booksDao.insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
                 }
             }
 
-            assertThat(transactionLatch.await(1000, TimeUnit.MILLISECONDS)).isFalse()
+            try {
+                withTimeout(1000) {
+                    job.join()
+                }
+                fail("A timeout should have occurred!")
+            } catch (_: TimeoutCancellationException) { }
             job.cancelAndJoin()
 
             assertThat(asyncExecuted).isTrue()
-
-            // free busy thread
-            busyLatch.countDown()
-            executorService.awaitTermination(1, TimeUnit.SECONDS)
         }
+        // free busy thread
+        busyLatch.countDown()
+        executorService.shutdown()
+        assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+
+        assertThat(booksDao.getPublishers()).isEmpty()
     }
 
     @Test
     fun withTransaction_shutdownExecutor() {
+        val executorService = Executors.newCachedThreadPool()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executorService)
+            .build()
+
+        executorService.shutdownNow()
+
         runBlocking {
-            val executorService = Executors.newCachedThreadPool()
-            val localDatabase = Room.inMemoryDatabaseBuilder(
-                ApplicationProvider.getApplicationContext(), TestDatabase::class.java
-            )
-                .setTransactionExecutor(executorService)
-                .build()
-
-            executorService.shutdownNow()
-
             try {
                 localDatabase.withTransaction {
                     fail("This coroutine should never run.")
@@ -801,22 +874,24 @@
                     .contains("Unable to acquire a thread to perform the database transaction")
             }
         }
+
+        executorService.shutdown()
+        assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
     }
 
     @Test
     fun withTransaction_databaseOpenError() {
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .addCallback(object : RoomDatabase.Callback() {
+                override fun onOpen(db: SupportSQLiteDatabase) {
+                    // this causes all transaction methods to throw, this can happen IRL
+                    throw RuntimeException("Error opening Database.")
+                }
+            })
+            .build()
         runBlocking {
-            val localDatabase = Room.inMemoryDatabaseBuilder(
-                ApplicationProvider.getApplicationContext(), TestDatabase::class.java
-            )
-                .addCallback(object : RoomDatabase.Callback() {
-                    override fun onOpen(db: SupportSQLiteDatabase) {
-                        // this causes all transaction methods to throw, this can happen IRL
-                        throw RuntimeException("Error opening Database.")
-                    }
-                })
-                .build()
-
             try {
                 localDatabase.withTransaction {
                     fail("This coroutine should never run.")
@@ -830,41 +905,40 @@
 
     @Test
     fun withTransaction_beginTransaction_error() {
-        runBlocking {
-            // delegate and delegate just so that we can throw in beginTransaction()
-            val localDatabase = Room.inMemoryDatabaseBuilder(
-                ApplicationProvider.getApplicationContext(), TestDatabase::class.java
-            )
-                .openHelperFactory(
-                    object : SupportSQLiteOpenHelper.Factory {
-                        val factoryDelegate = FrameworkSQLiteOpenHelperFactory()
-                        override fun create(
-                            configuration: SupportSQLiteOpenHelper.Configuration
-                        ): SupportSQLiteOpenHelper {
-                            val helperDelegate = factoryDelegate.create(configuration)
-                            return object : SupportSQLiteOpenHelper by helperDelegate {
-                                override val writableDatabase: SupportSQLiteDatabase
-                                    get() {
-                                        val databaseDelegate = helperDelegate.writableDatabase
-                                        return object : SupportSQLiteDatabase by databaseDelegate {
-                                            override fun beginTransaction() {
-                                                throw RuntimeException(
-                                                    "Error beginning transaction."
-                                                )
-                                            }
-                                            override fun beginTransactionNonExclusive() {
-                                                throw RuntimeException(
-                                                    "Error beginning transaction."
-                                                )
-                                            }
+        // delegate and delegate just so that we can throw in beginTransaction()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .openHelperFactory(
+                object : SupportSQLiteOpenHelper.Factory {
+                    val factoryDelegate = FrameworkSQLiteOpenHelperFactory()
+                    override fun create(
+                        configuration: SupportSQLiteOpenHelper.Configuration
+                    ): SupportSQLiteOpenHelper {
+                        val helperDelegate = factoryDelegate.create(configuration)
+                        return object : SupportSQLiteOpenHelper by helperDelegate {
+                            override val writableDatabase: SupportSQLiteDatabase
+                                get() {
+                                    val databaseDelegate = helperDelegate.writableDatabase
+                                    return object : SupportSQLiteDatabase by databaseDelegate {
+                                        override fun beginTransaction() {
+                                            throw RuntimeException(
+                                                "Error beginning transaction."
+                                            )
+                                        }
+                                        override fun beginTransactionNonExclusive() {
+                                            throw RuntimeException(
+                                                "Error beginning transaction."
+                                            )
                                         }
                                     }
-                            }
+                                }
                         }
                     }
-                )
-                .build()
-
+                }
+            )
+            .build()
+        runBlocking {
             try {
                 localDatabase.withTransaction {
                     fail("This coroutine should never run.")
@@ -1023,4 +1097,286 @@
         assertThat(booksDao.getBooksSuspend())
             .contains(addedBook)
     }
+
+    @Test
+    fun withTransaction_instantTaskExecutorRule() = runBlocking {
+        // Not the actual InstantTaskExecutorRule since this test class already uses
+        // CountingTaskExecutorRule but same behaviour.
+        ArchTaskExecutor.getInstance().setDelegate(object : TaskExecutor() {
+            override fun executeOnDiskIO(runnable: Runnable) {
+                runnable.run()
+            }
+
+            override fun postToMainThread(runnable: Runnable) {
+                runnable.run()
+            }
+
+            override fun isMainThread(): Boolean {
+                return false
+            }
+        })
+        database.withTransaction {
+            booksDao.insertPublisherSuspend(
+                TestUtil.PUBLISHER.publisherId,
+                TestUtil.PUBLISHER.name
+            )
+        }
+        assertThat(booksDao.getPublishers().size).isEqualTo(1)
+    }
+
+    @Test
+    fun withTransaction_singleExecutorDispatcher() {
+        val executor = Executors.newSingleThreadExecutor()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executor)
+            .build()
+        runBlocking {
+            withContext(executor.asCoroutineDispatcher()) {
+                localDatabase.withTransaction {
+                    localDatabase.booksDao().insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
+                }
+            }
+        }
+        assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+
+        executor.shutdown()
+        assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+    }
+
+    @Test
+    fun withTransaction_reentrant_nested() {
+        val executor = Executors.newSingleThreadExecutor()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executor)
+            .build()
+        runBlocking {
+            withContext(executor.asCoroutineDispatcher()) {
+                localDatabase.withTransaction {
+                    localDatabase.booksDao().insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
+                    localDatabase.withTransaction {
+                        localDatabase.booksDao().insertBookSuspend(TestUtil.BOOK_1)
+                    }
+                }
+            }
+        }
+        assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+        assertThat(localDatabase.booksDao().allBooks.size).isEqualTo(1)
+
+        executor.shutdown()
+        assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+    }
+
+    @Test
+    fun withTransaction_reentrant_nested_exception() {
+        val executor = Executors.newSingleThreadExecutor()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executor)
+            .build()
+        runBlocking {
+            withContext(executor.asCoroutineDispatcher()) {
+                localDatabase.withTransaction {
+                    localDatabase.booksDao().insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
+                    try {
+                        localDatabase.withTransaction {
+                            localDatabase.booksDao().insertBookSuspend(TestUtil.BOOK_1)
+                            throw IOException("Boom!")
+                        }
+                        @Suppress("UNREACHABLE_CODE")
+                        fail("An exception should have been thrown.")
+                    } catch (ex: IOException) {
+                        assertThat(ex).hasMessageThat()
+                            .contains("Boom")
+                    }
+                }
+            }
+        }
+        assertThat(localDatabase.booksDao().getPublishers()).isEmpty()
+        assertThat(localDatabase.booksDao().allBooks).isEmpty()
+
+        executor.shutdown()
+        assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+    }
+
+    @Test
+    fun withTransaction_reentrant_nested_contextSwitch() {
+        val executor = Executors.newSingleThreadExecutor()
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executor)
+            .build()
+
+        runBlocking {
+            withContext(executor.asCoroutineDispatcher()) {
+                localDatabase.withTransaction {
+                    localDatabase.booksDao().insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
+                    withContext(Dispatchers.IO) {
+                        localDatabase.withTransaction {
+                            localDatabase.booksDao().insertBookSuspend(TestUtil.BOOK_1)
+                        }
+                    }
+                }
+            }
+        }
+        assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+        assertThat(localDatabase.booksDao().allBooks.size).isEqualTo(1)
+
+        executor.shutdown()
+        assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+    }
+
+    @Test
+    fun withTransaction_reentrant_busyExecutor() {
+        val busyThreadsCount = AtomicInteger()
+        val executor =
+            BusyCountingService(busyThreadsCount, Executors.newFixedThreadPool(2))
+        val localDatabase = Room.inMemoryDatabaseBuilder(
+            ApplicationProvider.getApplicationContext(), TestDatabase::class.java
+        )
+            .setTransactionExecutor(executor)
+            .build()
+
+        // Grab one of the thread and simulate busy work
+        val busyLatch = CountDownLatch(1)
+        executor.execute {
+            busyLatch.await()
+        }
+
+        runBlocking {
+            // Using the other thread in the pool this will cause a reentrant situation
+            withContext(executor.asCoroutineDispatcher()) {
+                localDatabase.withTransaction {
+                    val transactionThread = Thread.currentThread()
+                    // Suspend transaction thread while freeing the busy thread from the pool
+                    withContext(Dispatchers.IO) {
+                        busyLatch.countDown()
+                        delay(200)
+                        // Only one thread is busy, the transaction thread
+                        assertThat(busyThreadsCount.get()).isEqualTo(1)
+                    }
+                    // Resume in the transaction thread, the recently free thread in the pool that
+                    // is not in a transaction should not be used.
+                    assertThat(Thread.currentThread()).isEqualTo(transactionThread)
+                    localDatabase.booksDao().insertPublisherSuspend(
+                        TestUtil.PUBLISHER.publisherId,
+                        TestUtil.PUBLISHER.name
+                    )
+                }
+            }
+        }
+
+        assertThat(localDatabase.booksDao().getPublishers().size).isEqualTo(1)
+
+        executor.shutdown()
+        assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue()
+    }
+
+    @Test
+    @OptIn(ExperimentalCoroutinesApi::class)
+    fun withTransaction_runTest() {
+        runTest {
+            database.withTransaction {
+                booksDao.insertPublisherSuspend(
+                    TestUtil.PUBLISHER.publisherId,
+                    TestUtil.PUBLISHER.name
+                )
+                booksDao.insertBookSuspend(TestUtil.BOOK_1.copy(salesCnt = 0))
+                booksDao.insertBookSuspend(TestUtil.BOOK_2)
+                booksDao.deleteUnsoldBooks()
+            }
+            assertThat(booksDao.getBooksSuspend())
+                .isEqualTo(listOf(TestUtil.BOOK_2))
+        }
+    }
+
+    @Test
+    fun withTransaction_stress_testMutation() {
+        val output = mutableListOf<String>()
+        runBlocking {
+            repeat(5000) { count ->
+                database.withTransaction {
+                    output.add("$count")
+                    suspendHere()
+                    output.add("$count")
+                }
+            }
+        }
+
+        val expectedOutput = buildList {
+            repeat(5000) { count ->
+                add("$count")
+                add("$count")
+            }
+        }
+        assertThat(output).isEqualTo(expectedOutput)
+    }
+
+    @Test
+    fun withTransaction_stress_dbMutation() {
+        val context: Context = ApplicationProvider.getApplicationContext()
+        context.deleteDatabase("test_stress_dbMutation.db")
+        val db = Room.databaseBuilder(
+            context,
+            TestDatabase::class.java,
+            "test.db"
+        ).build()
+        runBlocking {
+            db.counterDao().upsert(Counter(1, 0))
+            repeat(5000) {
+                launch(Dispatchers.IO) {
+                    db.withTransaction {
+                        val current = db.counterDao().getCounter(1)
+                        suspendHere()
+                        db.counterDao().upsert(current.copy(value = current.value + 1))
+                    }
+                }
+            }
+        }
+        runBlocking {
+            val count = db.counterDao().getCounter(1)
+            assertThat(count.value).isEqualTo(5000)
+        }
+        db.close()
+    }
+
+    // Utility function to _really_ suspend.
+    private suspend fun suspendHere(): Unit = suspendCoroutineUninterceptedOrReturn {
+        it.intercepted().resume(Unit)
+        COROUTINE_SUSPENDED
+    }
+
+    // Executor wrapper that counts threads that are busy executing commands.
+    class BusyCountingService(
+        val count: AtomicInteger,
+        val delegate: ExecutorService
+    ) : ExecutorService by delegate {
+        override fun execute(command: Runnable) {
+            delegate.execute {
+                count.incrementAndGet()
+                try {
+                    command.run()
+                } finally {
+                    count.decrementAndGet()
+                }
+            }
+        }
+    }
 }
diff --git a/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/vo/Counter.kt b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/vo/Counter.kt
new file mode 100644
index 0000000..ce8fd1c
--- /dev/null
+++ b/room/integration-tests/kotlintestapp/src/androidTest/java/androidx/room/integration/kotlintestapp/vo/Counter.kt
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.androidx.room.integration.kotlintestapp.vo
+
+import androidx.room.Entity
+import androidx.room.PrimaryKey
+
+@Entity
+data class Counter(
+    @PrimaryKey val id: Long,
+    val value: Int
+)
\ No newline at end of file
diff --git a/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt b/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt
index 4edabab..9fc8d5d 100644
--- a/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt
+++ b/room/room-ktx/src/main/java/androidx/room/RoomDatabaseExt.kt
@@ -18,18 +18,17 @@
 package androidx.room
 
 import androidx.annotation.RestrictTo
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.asContextElement
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlinx.coroutines.withContext
-import java.util.concurrent.Executor
 import java.util.concurrent.RejectedExecutionException
 import java.util.concurrent.atomic.AtomicInteger
 import kotlin.coroutines.ContinuationInterceptor
 import kotlin.coroutines.CoroutineContext
 import kotlin.coroutines.coroutineContext
 import kotlin.coroutines.resume
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asContextElement
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlinx.coroutines.withContext
 
 /**
  * Calls the specified suspending [block] in a database transaction. The transaction will be
@@ -43,13 +42,11 @@
  * one received by the suspending block. It is recommended that all [Dao] function invoked within
  * the [block] be suspending functions.
  *
- * The dispatcher used to execute the given [block] will utilize threads from Room's query executor.
+ * The internal dispatcher used to execute the given [block] will block an utilize a thread from
+ * Room's transaction executor until the [block] is complete.
  */
 public suspend fun <R> RoomDatabase.withTransaction(block: suspend () -> R): R {
-    // Use inherited transaction context if available, this allows nested suspending transactions.
-    val transactionContext =
-        coroutineContext[TransactionElement]?.transactionDispatcher ?: createTransactionContext()
-    return withContext(transactionContext) {
+    val transactionBlock: suspend CoroutineScope.() -> R = transaction@{
         val transactionElement = coroutineContext[TransactionElement]!!
         transactionElement.acquire()
         try {
@@ -59,7 +56,7 @@
                 val result = block.invoke()
                 @Suppress("DEPRECATION")
                 setTransactionSuccessful()
-                return@withContext result
+                return@transaction result
             } finally {
                 @Suppress("DEPRECATION")
                 endTransaction()
@@ -68,6 +65,51 @@
             transactionElement.release()
         }
     }
+    // Use inherited transaction context if available, this allows nested suspending transactions.
+    val transactionDispatcher = coroutineContext[TransactionElement]?.transactionDispatcher
+    return if (transactionDispatcher != null) {
+        withContext(transactionDispatcher, transactionBlock)
+    } else {
+        startTransactionCoroutine(coroutineContext, transactionBlock)
+    }
+}
+
+/**
+ * Suspend caller coroutine and start the transaction coroutine in a thread from the
+ * [RoomDatabase.transactionExecutor], resuming the caller coroutine with the result once done.
+ * The [context] will be a parent of the started coroutine to propagating cancellation and release
+ * the thread when cancelled.
+ */
+private suspend fun <R> RoomDatabase.startTransactionCoroutine(
+    context: CoroutineContext,
+    transactionBlock: suspend CoroutineScope.() -> R
+): R = suspendCancellableCoroutine { continuation ->
+    try {
+        transactionExecutor.execute {
+            try {
+                // Thread acquired, start the transaction coroutine using the parent context.
+                // The started coroutine will have an event loop dispatcher that we'll use for the
+                // transaction context.
+                runBlocking(context.minusKey(ContinuationInterceptor)) {
+                    val dispatcher = coroutineContext[ContinuationInterceptor]!!
+                    val transactionContext = createTransactionContext(dispatcher)
+                    continuation.resume(
+                        withContext(transactionContext, transactionBlock)
+                    )
+                }
+            } catch (ex: Throwable) {
+                // If anything goes wrong, propagate exception to the calling coroutine.
+                continuation.cancel(ex)
+            }
+        }
+    } catch (ex: RejectedExecutionException) {
+        // Couldn't acquire a thread, cancel coroutine.
+        continuation.cancel(
+            IllegalStateException(
+                "Unable to acquire a thread to perform the database transaction.", ex
+            )
+        )
+    }
 }
 
 /**
@@ -76,8 +118,9 @@
  * The context is a combination of a dispatcher, a [TransactionElement] and a thread local element.
  *
  * * The dispatcher will dispatch coroutines to a single thread that is taken over from the Room
- * query executor. If the coroutine context is switched, suspending DAO functions will be able to
- * dispatch to the transaction thread.
+ * transaction executor. If the coroutine context is switched, suspending DAO functions will be able
+ * to dispatch to the transaction thread. In reality the dispatcher is the event loop of a
+ * [runBlocking] started on the dedicated thread.
  *
  * * The [TransactionElement] serves as an indicator for inherited context, meaning, if there is a
  * switch of context, suspending DAO methods will be able to use the indicator to dispatch the
@@ -88,61 +131,22 @@
  * if a blocking DAO method is invoked within the transaction coroutine. Never assign meaning to
  * this value, for now all we care is if its present or not.
  */
-private suspend fun RoomDatabase.createTransactionContext(): CoroutineContext {
-    val controlJob = Job()
-    // make sure to tie the control job to this context to avoid blocking the transaction if
-    // context get cancelled before we can even start using this job. Otherwise, the acquired
-    // transaction thread will forever wait for the controlJob to be cancelled.
-    // see b/148181325
-    coroutineContext[Job]?.invokeOnCompletion {
-        controlJob.cancel()
-    }
-    val dispatcher = transactionExecutor.acquireTransactionThread(controlJob)
-    val transactionElement = TransactionElement(controlJob, dispatcher)
+private fun RoomDatabase.createTransactionContext(
+    dispatcher: ContinuationInterceptor
+): CoroutineContext {
+    val transactionElement = TransactionElement(dispatcher)
     val threadLocalElement =
-        suspendingTransactionId.asContextElement(System.identityHashCode(controlJob))
+        suspendingTransactionId.asContextElement(System.identityHashCode(transactionElement))
     return dispatcher + transactionElement + threadLocalElement
 }
 
 /**
- * Acquires a thread from the executor and returns a [ContinuationInterceptor] to dispatch
- * coroutines to the acquired thread. The [controlJob] is used to control the release of the
- * thread by cancelling the job.
- */
-private suspend fun Executor.acquireTransactionThread(controlJob: Job): ContinuationInterceptor {
-    return suspendCancellableCoroutine { continuation ->
-        continuation.invokeOnCancellation {
-            // We got cancelled while waiting to acquire a thread, we can't stop our attempt to
-            // acquire a thread, but we can cancel the controlling job so once it gets acquired it
-            // is quickly released.
-            controlJob.cancel()
-        }
-        try {
-            execute {
-                runBlocking {
-                    // Thread acquired, resume coroutine.
-                    continuation.resume(coroutineContext[ContinuationInterceptor]!!)
-                    controlJob.join()
-                }
-            }
-        } catch (ex: RejectedExecutionException) {
-            // Couldn't acquire a thread, cancel coroutine.
-            continuation.cancel(
-                IllegalStateException(
-                    "Unable to acquire a thread to perform the database transaction.", ex
-                )
-            )
-        }
-    }
-}
-/**
  * A [CoroutineContext.Element] that indicates there is an on-going database transaction.
  *
  * @hide
  */
 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
 internal class TransactionElement(
-    private val transactionThreadControlJob: Job,
     internal val transactionDispatcher: ContinuationInterceptor
 ) : CoroutineContext.Element {
 
@@ -153,9 +157,7 @@
 
     /**
      * Number of transactions (including nested ones) started with this element.
-     * Call [acquire] to increase the count and [release] to decrease it. If the count reaches zero
-     * when [release] is invoked then the transaction job is cancelled and the transaction thread
-     * is released.
+     * Call [acquire] to increase the count and [release] to decrease it.
      */
     private val referenceCount = AtomicInteger(0)
 
@@ -167,9 +169,6 @@
         val count = referenceCount.decrementAndGet()
         if (count < 0) {
             throw IllegalStateException("Transaction was never started or was already released.")
-        } else if (count == 0) {
-            // Cancel the job that controls the transaction thread, causing it to be released.
-            transactionThreadControlJob.cancel()
         }
     }
 }
\ No newline at end of file
diff --git a/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt b/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt
index bd253f2..820d26a 100644
--- a/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt
+++ b/sqlite/sqlite/src/main/java/androidx/sqlite/db/SupportSQLiteDatabase.kt
@@ -53,8 +53,8 @@
      *
      * ```
      *  db.beginTransaction()
-     *      try {
-     *          ...
+     *  try {
+     *      ...
      *      db.setTransactionSuccessful()
      *  } finally {
      *      db.endTransaction()
@@ -75,10 +75,10 @@
      *
      * ```
      *  db.beginTransactionNonExclusive()
-     *      try {
-     *          ...
+     *  try {
+     *      ...
      *      db.setTransactionSuccessful()
-     *          } finally {
+     *  } finally {
      *      db.endTransaction()
      *  }
      *  ```
@@ -126,7 +126,7 @@
      *  db.beginTransactionWithListenerNonExclusive(listener)
      *  try {
      *      ...
-     *  db.setTransactionSuccessful()
+     *      db.setTransactionSuccessful()
      *  } finally {
      *      db.endTransaction()
      *  }