Closeable Recomposer

Add a close() method to Recomposer to initiate a graceful shutdown that
awaits the completion of any currently running child coroutines -
LaunchedEffects and jobs launched from rememberCoroutineScope() scopes
in any currently managed compositions. Recomposition of invalidated
compositions will continue until all child effect coroutines join.

Change Recomposer.join() to await full shutdown rather than just effect
parent job completion.

Test: RecomposerTests.kt
Relnote: "Recomposers can now be closed. Closed recomposers will
continue recomposition until composition child coroutines complete.
Recomposer.shutDown renamed to cancel to contrast with close."

Change-Id: Ib6d766b91381ee45af41a14b7951c48f794f0a90
diff --git a/compose/foundation/foundation/src/test/kotlin/androidx/compose/foundation/gestures/SuspendingGestureTestUtil.kt b/compose/foundation/foundation/src/test/kotlin/androidx/compose/foundation/gestures/SuspendingGestureTestUtil.kt
index 3d4fe80..c0ba66b 100644
--- a/compose/foundation/foundation/src/test/kotlin/androidx/compose/foundation/gestures/SuspendingGestureTestUtil.kt
+++ b/compose/foundation/foundation/src/test/kotlin/androidx/compose/foundation/gestures/SuspendingGestureTestUtil.kt
@@ -106,6 +106,8 @@
             }
             yield()
             block()
+            // Pointer input effects will loop indefinitely; fully cancel them.
+            recomposer.cancel()
         }
     }
 
diff --git a/compose/runtime/runtime/api/current.txt b/compose/runtime/runtime/api/current.txt
index 833d1e5..0d23c10f 100644
--- a/compose/runtime/runtime/api/current.txt
+++ b/compose/runtime/runtime/api/current.txt
@@ -362,13 +362,15 @@
     ctor public Recomposer(kotlin.coroutines.CoroutineContext effectCoroutineContext);
     method public androidx.compose.runtime.RecomposerInfo asRecomposerInfo();
     method public suspend Object? awaitIdle(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
+    method public void cancel();
+    method public void close();
     method public long getChangeCount();
     method public boolean getHasPendingWork();
     method public kotlinx.coroutines.flow.Flow<androidx.compose.runtime.Recomposer.State> getState();
     method @Deprecated public boolean hasInvalidations();
     method public suspend Object? join(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
-    method public suspend Object? runRecomposeAndApplyChanges(kotlin.coroutines.Continuation<?> p);
-    method public void shutDown();
+    method public suspend Object? runRecomposeAndApplyChanges(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
+    method @Deprecated public void shutDown();
     property public final long changeCount;
     property public final boolean hasPendingWork;
     property public final kotlinx.coroutines.flow.Flow<androidx.compose.runtime.Recomposer.State> state;
diff --git a/compose/runtime/runtime/api/public_plus_experimental_current.txt b/compose/runtime/runtime/api/public_plus_experimental_current.txt
index 833d1e5..0d23c10f 100644
--- a/compose/runtime/runtime/api/public_plus_experimental_current.txt
+++ b/compose/runtime/runtime/api/public_plus_experimental_current.txt
@@ -362,13 +362,15 @@
     ctor public Recomposer(kotlin.coroutines.CoroutineContext effectCoroutineContext);
     method public androidx.compose.runtime.RecomposerInfo asRecomposerInfo();
     method public suspend Object? awaitIdle(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
+    method public void cancel();
+    method public void close();
     method public long getChangeCount();
     method public boolean getHasPendingWork();
     method public kotlinx.coroutines.flow.Flow<androidx.compose.runtime.Recomposer.State> getState();
     method @Deprecated public boolean hasInvalidations();
     method public suspend Object? join(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
-    method public suspend Object? runRecomposeAndApplyChanges(kotlin.coroutines.Continuation<?> p);
-    method public void shutDown();
+    method public suspend Object? runRecomposeAndApplyChanges(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
+    method @Deprecated public void shutDown();
     property public final long changeCount;
     property public final boolean hasPendingWork;
     property public final kotlinx.coroutines.flow.Flow<androidx.compose.runtime.Recomposer.State> state;
diff --git a/compose/runtime/runtime/api/restricted_current.txt b/compose/runtime/runtime/api/restricted_current.txt
index 07e1499..bb6fd9e 100644
--- a/compose/runtime/runtime/api/restricted_current.txt
+++ b/compose/runtime/runtime/api/restricted_current.txt
@@ -386,13 +386,15 @@
     ctor public Recomposer(kotlin.coroutines.CoroutineContext effectCoroutineContext);
     method public androidx.compose.runtime.RecomposerInfo asRecomposerInfo();
     method public suspend Object? awaitIdle(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
+    method public void cancel();
+    method public void close();
     method public long getChangeCount();
     method public boolean getHasPendingWork();
     method public kotlinx.coroutines.flow.Flow<androidx.compose.runtime.Recomposer.State> getState();
     method @Deprecated public boolean hasInvalidations();
     method public suspend Object? join(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
-    method public suspend Object? runRecomposeAndApplyChanges(kotlin.coroutines.Continuation<?> p);
-    method public void shutDown();
+    method public suspend Object? runRecomposeAndApplyChanges(kotlin.coroutines.Continuation<? super kotlin.Unit> p);
+    method @Deprecated public void shutDown();
     property public final long changeCount;
     property public final boolean hasPendingWork;
     property public final kotlinx.coroutines.flow.Flow<androidx.compose.runtime.Recomposer.State> state;
diff --git a/compose/runtime/runtime/compose-runtime-benchmark/src/androidTest/java/androidx/compose/runtime/benchmark/ComposeBenchmarkBase.kt b/compose/runtime/runtime/compose-runtime-benchmark/src/androidTest/java/androidx/compose/runtime/benchmark/ComposeBenchmarkBase.kt
index 95f78ad..fcf16d4 100644
--- a/compose/runtime/runtime/compose-runtime-benchmark/src/androidTest/java/androidx/compose/runtime/benchmark/ComposeBenchmarkBase.kt
+++ b/compose/runtime/runtime/compose-runtime-benchmark/src/androidTest/java/androidx/compose/runtime/benchmark/ComposeBenchmarkBase.kt
@@ -74,7 +74,7 @@
         } finally {
             activity.setContentView(emptyView)
             advanceUntilIdle()
-            recomposer.shutDown()
+            recomposer.cancel()
         }
     }
 
@@ -163,7 +163,7 @@
         }
 
         activity.setContentView(emptyView)
-        recomposer.shutDown()
+        recomposer.cancel()
     }
 }
 
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt
index 8cc6928..1b3f20b 100644
--- a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/Recomposer.kt
@@ -33,6 +33,7 @@
 import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.StateFlow
 import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.first
 import kotlinx.coroutines.flow.takeWhile
 import kotlinx.coroutines.job
 import kotlinx.coroutines.launch
@@ -47,6 +48,9 @@
 
 /**
  * Runs [block] with a new, active [Recomposer] applying changes in the calling [CoroutineContext].
+ * The [Recomposer] will be [closed][Recomposer.close] after [block] returns.
+ * [withRunningRecomposer] will return once the [Recomposer] is [Recomposer.State.ShutDown]
+ * and all child jobs launched by [block] have [joined][Job.join].
  */
 suspend fun <R> withRunningRecomposer(
     block: suspend CoroutineScope.(recomposer: Recomposer) -> R
@@ -55,7 +59,8 @@
     // Will be cancelled when recomposerJob cancels
     launch { recomposer.runRecomposeAndApplyChanges() }
     block(recomposer).also {
-        recomposer.shutDown()
+        recomposer.close()
+        recomposer.join()
     }
 }
 
@@ -88,7 +93,8 @@
  */
 // RedundantVisibilityModifier suppressed because metalava picks up internal function overrides
 // if 'internal' is not explicitly specified - b/171342041
-@Suppress("RedundantVisibilityModifier")
+// NotCloseable suppressed because this is Kotlin-only common code; [Auto]Closeable not available.
+@Suppress("RedundantVisibilityModifier", "NotCloseable")
 @OptIn(
     ExperimentalComposeApi::class,
     InternalComposeApi::class
@@ -125,12 +131,20 @@
             // kick it out and make sure no new ones start if we have one.
             val cancellation = CancellationException("Recomposer effect job completed", throwable)
 
+            var continuationToResume: CancellableContinuation<Unit>? = null
             synchronized(stateLock) {
                 val runnerJob = runnerJob
                 if (runnerJob != null) {
                     _state.value = State.ShuttingDown
-                    // This will cancel frameContinuation if needed
-                    runnerJob.cancel(cancellation)
+                    // If the recomposer is closed we will let the runnerJob return from
+                    // runRecomposeAndApplyChanges normally and consider ourselves shut down
+                    // immediately.
+                    if (!isClosed) {
+                        // This is the job hosting frameContinuation; no need to resume it otherwise
+                        runnerJob.cancel(cancellation)
+                    } else if (frameContinuation != null) {
+                        continuationToResume = frameContinuation
+                    }
                     frameContinuation = null
                     runnerJob.invokeOnCompletion { runnerJobCause ->
                         synchronized(stateLock) {
@@ -147,6 +161,7 @@
                     _state.value = State.ShutDown
                 }
             }
+            continuationToResume?.resume(Unit)
         }
     }
 
@@ -161,13 +176,13 @@
      */
     enum class State {
         /**
-         * [shutDown] was called on the [Recomposer] and all cleanup work has completed.
+         * [cancel] was called on the [Recomposer] and all cleanup work has completed.
          * The [Recomposer] is no longer available for use.
          */
         ShutDown,
 
         /**
-         * [shutDown] was called on the [Recomposer] and it is no longer available for use.
+         * [cancel] was called on the [Recomposer] and it is no longer available for use.
          * Cleanup work has not yet been fully completed and composition effect coroutines may
          * still be running.
          */
@@ -205,12 +220,17 @@
     }
 
     private val stateLock = Any()
+
+    // Begin properties guarded by stateLock
     private var runnerJob: Job? = null
     private var closeCause: Throwable? = null
     private val knownCompositions = mutableListOf<ControlledComposition>()
     private val snapshotInvalidations = mutableListOf<Set<Any>>()
     private val compositionInvalidations = mutableListOf<ControlledComposition>()
     private var frameContinuation: CancellableContinuation<Unit>? = null
+    private var isClosed: Boolean = false
+    // End properties guarded by stateLock
+
     private val _state = MutableStateFlow(State.Inactive)
 
     /**
@@ -247,6 +267,13 @@
     }
 
     /**
+     * `true` if there is still work to do for an active caller of [runRecomposeAndApplyChanges]
+     */
+    private val shouldKeepRecomposing: Boolean
+        get() = synchronized(stateLock) { !isClosed } ||
+            effectJob.children.any { it.isActive }
+
+    /**
      * The current [State] of this [Recomposer]. See each [State] value for its meaning.
      */
     public val state: Flow<State>
@@ -301,10 +328,11 @@
      * While [runRecomposeAndApplyChanges] is running, [awaitIdle] will suspend until there are no
      * more invalid composers awaiting recomposition.
      *
-     * This method never returns. Cancel the calling [CoroutineScope] to stop.
+     * This method will not return unless the [Recomposer] is [close]d and all effects in managed
+     * compositions complete.
      * Unhandled failure exceptions from child coroutines will be thrown by this method.
      */
-    suspend fun runRecomposeAndApplyChanges(): Nothing {
+    suspend fun runRecomposeAndApplyChanges() {
         val parentFrameClock = coroutineContext[MonotonicFrameClock] ?: DefaultMonotonicFrameClock
         withContext(broadcastFrameClock) {
             // Enforce mutual exclusion of callers; register self as current runner
@@ -335,12 +363,15 @@
 
                 val toRecompose = mutableListOf<ControlledComposition>()
                 val toApply = mutableListOf<ControlledComposition>()
-                while (true) {
+                while (shouldKeepRecomposing) {
                     // Await something to do
                     if (_state.value < State.PendingWork) {
                         suspendCancellableCoroutine<Unit> { co ->
                             synchronized(stateLock) {
-                                if (_state.value == State.PendingWork) {
+                                val currentState = _state.value
+                                if (currentState == State.PendingWork ||
+                                    currentState <= State.ShuttingDown
+                                ) {
                                     co.resume(Unit)
                                 } else {
                                     frameContinuation = co
@@ -433,15 +464,32 @@
      * [Recomposer] will also be cancelled. See [join] to await the completion of all of these
      * outstanding tasks.
      */
-    fun shutDown() {
+    fun cancel() {
         effectJob.cancel()
     }
 
+    @Deprecated("renamed to cancel(); consider close() for your use case", ReplaceWith("cancel()"))
+    fun shutDown() = cancel()
+
     /**
-     * Await the completion of a [shutDown] operation.
+     * Close this [Recomposer]. Once all effects launched by managed compositions complete,
+     * any active call to [runRecomposeAndApplyChanges] will return normally and this [Recomposer]
+     * will be [State.ShutDown]. See [join] to await the completion of all of these outstanding
+     * tasks.
+     */
+    fun close() {
+        if (effectJob.complete()) {
+            synchronized(stateLock) {
+                isClosed = true
+            }
+        }
+    }
+
+    /**
+     * Await the completion of a [cancel] operation.
      */
     suspend fun join() {
-        effectJob.join()
+        state.first { it == State.ShutDown }
     }
 
     internal override fun composeInitial(
diff --git a/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/CompositionTests.kt b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/CompositionTests.kt
index fc8855b..69e8be0 100644
--- a/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/CompositionTests.kt
+++ b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/CompositionTests.kt
@@ -2806,7 +2806,7 @@
     block(recomposer)
     // This call doesn't need to be in a finally; everything it does will be torn down
     // in exceptional cases by the coroutineScope failure
-    recomposer.shutDown()
+    recomposer.cancel()
 }
 
 @Composable fun Wrap(content: @Composable () -> Unit) {
diff --git a/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/RecomposerTests.kt b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/RecomposerTests.kt
new file mode 100644
index 0000000..db19167
--- /dev/null
+++ b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/RecomposerTests.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.compose.runtime
+
+import androidx.compose.runtime.mock.TestMonotonicFrameClock
+import androidx.compose.runtime.snapshots.withMutableSnapshot
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.flow.first
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.withTimeoutOrNull
+import org.junit.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
+
+@OptIn(ExperimentalCoroutinesApi::class, ExperimentalComposeApi::class)
+class RecomposerTests {
+
+    @Test
+    fun recomposerRecomposesWhileOpen() = runBlockingTest {
+        val testClock = TestMonotonicFrameClock(this)
+        withContext(testClock) {
+            val recomposer = Recomposer(coroutineContext)
+            val runner = launch {
+                recomposer.runRecomposeAndApplyChanges()
+            }
+            val composition = Composition(UnitApplier(), recomposer)
+            var state by mutableStateOf(0)
+            var lastRecomposedState = -1
+            composition.setContent {
+                lastRecomposedState = state
+            }
+            assertEquals(0, lastRecomposedState, "initial composition")
+            withMutableSnapshot { state = 1 }
+            assertNotNull(
+                withTimeoutOrNull(3_000) { recomposer.awaitIdle() },
+                "timed out waiting for recomposer idle for recomposition"
+            )
+            assertEquals(1, lastRecomposedState, "recomposition")
+            recomposer.close()
+            assertNotNull(
+                withTimeoutOrNull(3_000) { recomposer.join() },
+                "timed out waiting for recomposer.join"
+            )
+            assertNotNull(
+                withTimeoutOrNull(3_000) { runner.join() },
+                "timed out waiting for recomposer runner job"
+            )
+            withMutableSnapshot { state = 2 }
+            assertNotNull(
+                withTimeoutOrNull(3_000) {
+                    recomposer.state.first { it <= Recomposer.State.PendingWork }
+                },
+                "timed out waiting for recomposer to not have active pending work"
+            )
+            assertEquals(1, lastRecomposedState, "expected no recomposition by closed recomposer")
+        }
+    }
+
+    @Test
+    fun recomposerRemainsOpenUntilEffectsJoin() = runBlockingTest {
+        val testClock = TestMonotonicFrameClock(this)
+        withContext(testClock) {
+            val recomposer = Recomposer(coroutineContext)
+            val runner = launch {
+                recomposer.runRecomposeAndApplyChanges()
+            }
+            val composition = Composition(UnitApplier(), recomposer)
+            val completer = Job()
+            composition.setContent {
+                LaunchedEffect(completer) {
+                    completer.join()
+                }
+            }
+            recomposer.awaitIdle()
+            recomposer.close()
+            recomposer.awaitIdle()
+            assertTrue(runner.isActive, "runner is still active")
+            completer.complete()
+            assertNotNull(
+                withTimeoutOrNull(5_000) { recomposer.join() },
+                "Expected recomposer join"
+            )
+            assertEquals(Recomposer.State.ShutDown, recomposer.state.first(), "recomposer state")
+            assertNotNull(
+                withTimeoutOrNull(5_000) { runner.join() },
+                "Expected runner join"
+            )
+        }
+    }
+}
+
+private class UnitApplier : Applier<Unit> {
+    override val current: Unit
+        get() = Unit
+
+    override fun down(node: Unit) {
+    }
+
+    override fun up() {
+    }
+
+    override fun insertTopDown(index: Int, instance: Unit) {
+    }
+
+    override fun insertBottomUp(index: Int, instance: Unit) {
+    }
+
+    override fun remove(index: Int, count: Int) {
+    }
+
+    override fun move(from: Int, to: Int, count: Int) {
+    }
+
+    override fun clear() {
+    }
+}
diff --git a/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/mock/CompositionTest.kt b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/mock/CompositionTest.kt
index 611e945..67a7da2 100644
--- a/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/mock/CompositionTest.kt
+++ b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/mock/CompositionTest.kt
@@ -67,7 +67,7 @@
         }
         scope.block()
         scope.composition?.dispose()
-        recomposer.shutDown()
+        recomposer.cancel()
         recomposer.join()
     }
 }
diff --git a/compose/ui/ui-test-junit4/src/androidMain/kotlin/androidx/compose/ui/test/junit4/AndroidComposeTestRule.kt b/compose/ui/ui-test-junit4/src/androidMain/kotlin/androidx/compose/ui/test/junit4/AndroidComposeTestRule.kt
index a58576e..d43bd21 100644
--- a/compose/ui/ui-test-junit4/src/androidMain/kotlin/androidx/compose/ui/test/junit4/AndroidComposeTestRule.kt
+++ b/compose/ui/ui-test-junit4/src/androidMain/kotlin/androidx/compose/ui/test/junit4/AndroidComposeTestRule.kt
@@ -341,7 +341,7 @@
                 }
                 base.evaluate()
             } finally {
-                recomposer.shutDown()
+                recomposer.cancel()
                 // FYI: Not canceling these scope below would end up cleanupTestCoroutines
                 // throwing errors on active coroutines
                 recomposerApplyCoroutineScope.cancel()
diff --git a/compose/ui/ui/src/androidAndroidTest/kotlin/androidx/compose/ui/platform/WindowRecomposerTest.kt b/compose/ui/ui/src/androidAndroidTest/kotlin/androidx/compose/ui/platform/WindowRecomposerTest.kt
index 26d6a75..646f377 100644
--- a/compose/ui/ui/src/androidAndroidTest/kotlin/androidx/compose/ui/platform/WindowRecomposerTest.kt
+++ b/compose/ui/ui/src/androidAndroidTest/kotlin/androidx/compose/ui/platform/WindowRecomposerTest.kt
@@ -76,7 +76,7 @@
                 assertNull("expected Activity to have been collected", weakActivityRef.get())
             }
         } finally {
-            localRecomposer.shutDown()
+            localRecomposer.cancel()
             runBlocking {
                 recomposerJob.join()
             }
diff --git a/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/platform/WindowRecomposer.kt b/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/platform/WindowRecomposer.kt
index c90661e..94aba7e 100644
--- a/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/platform/WindowRecomposer.kt
+++ b/compose/ui/ui/src/androidMain/kotlin/androidx/compose/ui/platform/WindowRecomposer.kt
@@ -76,7 +76,7 @@
     /**
      * Get a [Recomposer] for the window where [windowRootView] is at the root of the window's
      * [View] hierarchy. The factory is responsible for establishing a policy for
-     * [shutting down][Recomposer.shutDown] the returned [Recomposer]. [windowRootView] will
+     * [shutting down][Recomposer.cancel] the returned [Recomposer]. [windowRootView] will
      * hold a hard reference to the returned [Recomposer] until it [joins][Recomposer.join]
      * after shutting down.
      */
@@ -228,7 +228,7 @@
                 Lifecycle.Event.ON_START -> pausableClock?.resume()
                 Lifecycle.Event.ON_STOP -> pausableClock?.pause()
                 Lifecycle.Event.ON_DESTROY -> {
-                    recomposer.shutDown()
+                    recomposer.cancel()
                 }
             }
         }