Merge "snapshotFlow and withMutableSnapshot APIs" into androidx-master-dev
diff --git a/compose/runtime/runtime/api/current.txt b/compose/runtime/runtime/api/current.txt
index 2f40713..3744467 100644
--- a/compose/runtime/runtime/api/current.txt
+++ b/compose/runtime/runtime/api/current.txt
@@ -884,6 +884,11 @@
     field public static final androidx.compose.runtime.snapshots.SnapshotApplyResult.Success INSTANCE;
   }
 
+  public final class SnapshotFlowKt {
+    method @androidx.compose.runtime.ExperimentalComposeApi public static <T> kotlinx.coroutines.flow.Flow<T> snapshotFlow(kotlin.jvm.functions.Function0<? extends T> block);
+    method @androidx.compose.runtime.ExperimentalComposeApi public static inline <R> R! withMutableSnapshot(kotlin.jvm.functions.Function0<? extends R> block);
+  }
+
   public final class SnapshotIdSetKt {
   }
 
diff --git a/compose/runtime/runtime/api/public_plus_experimental_current.txt b/compose/runtime/runtime/api/public_plus_experimental_current.txt
index 2f40713..3744467 100644
--- a/compose/runtime/runtime/api/public_plus_experimental_current.txt
+++ b/compose/runtime/runtime/api/public_plus_experimental_current.txt
@@ -884,6 +884,11 @@
     field public static final androidx.compose.runtime.snapshots.SnapshotApplyResult.Success INSTANCE;
   }
 
+  public final class SnapshotFlowKt {
+    method @androidx.compose.runtime.ExperimentalComposeApi public static <T> kotlinx.coroutines.flow.Flow<T> snapshotFlow(kotlin.jvm.functions.Function0<? extends T> block);
+    method @androidx.compose.runtime.ExperimentalComposeApi public static inline <R> R! withMutableSnapshot(kotlin.jvm.functions.Function0<? extends R> block);
+  }
+
   public final class SnapshotIdSetKt {
   }
 
diff --git a/compose/runtime/runtime/api/restricted_current.txt b/compose/runtime/runtime/api/restricted_current.txt
index 1270ce0..7240288 100644
--- a/compose/runtime/runtime/api/restricted_current.txt
+++ b/compose/runtime/runtime/api/restricted_current.txt
@@ -918,6 +918,11 @@
     field public static final androidx.compose.runtime.snapshots.SnapshotApplyResult.Success INSTANCE;
   }
 
+  public final class SnapshotFlowKt {
+    method @androidx.compose.runtime.ExperimentalComposeApi public static <T> kotlinx.coroutines.flow.Flow<T> snapshotFlow(kotlin.jvm.functions.Function0<? extends T> block);
+    method @androidx.compose.runtime.ExperimentalComposeApi public static inline <R> R! withMutableSnapshot(kotlin.jvm.functions.Function0<? extends R> block);
+  }
+
   public final class SnapshotIdSetKt {
   }
 
diff --git a/compose/runtime/runtime/samples/src/main/java/androidx/compose/runtime/samples/SnapshotSamples.kt b/compose/runtime/runtime/samples/src/main/java/androidx/compose/runtime/samples/SnapshotSamples.kt
new file mode 100644
index 0000000..1e62c6d
--- /dev/null
+++ b/compose/runtime/runtime/samples/src/main/java/androidx/compose/runtime/samples/SnapshotSamples.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.compose.runtime.samples
+
+import androidx.annotation.Sampled
+import androidx.compose.runtime.ExperimentalComposeApi
+import androidx.compose.runtime.getValue
+import androidx.compose.runtime.mutableStateOf
+import androidx.compose.runtime.setValue
+import androidx.compose.runtime.snapshots.snapshotFlow
+import androidx.compose.runtime.snapshots.withMutableSnapshot
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.launch
+
+@Suppress("UNREACHABLE_CODE", "CanBeVal", "UNUSED_VARIABLE")
+@OptIn(ExperimentalComposeApi::class)
+@Sampled
+fun snapshotFlowSample() {
+    // Define Snapshot state objects
+    var greeting by mutableStateOf("Hello")
+    var person by mutableStateOf("Adam")
+
+    // ...
+
+    // Create a flow that will emit whenever our person-specific greeting changes
+    val greetPersonFlow = snapshotFlow { "$greeting, $person" }
+
+    // ...
+
+    val collectionScope: CoroutineScope = TODO("Use your scope here")
+
+    // Collect the flow and offer greetings!
+    collectionScope.launch {
+        greetPersonFlow.collect {
+            println(greeting)
+        }
+    }
+
+    // ...
+
+    // Change snapshot state; greetPersonFlow will emit a new greeting
+    withMutableSnapshot {
+        greeting = "Ahoy"
+        person = "Sean"
+    }
+}
diff --git a/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/snapshots/SnapshotFlow.kt b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/snapshots/SnapshotFlow.kt
new file mode 100644
index 0000000..3ecb3ee
--- /dev/null
+++ b/compose/runtime/runtime/src/commonMain/kotlin/androidx/compose/runtime/snapshots/SnapshotFlow.kt
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.compose.runtime.snapshots
+
+import androidx.compose.runtime.ExperimentalComposeApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.flow
+
+/**
+ * Create a [Flow] from observable [Snapshot] state. (e.g. state holders returned by
+ * [mutableStateOf][androidx.compose.runtime.mutableStateOf].)
+ *
+ * [snapshotFlow] creates a [Flow] that runs [block] when collected and emits the result,
+ * recording any snapshot state that was accessed. While collection continues, if a new [Snapshot]
+ * is applied that changes state accessed by [block], the flow will run [block] again,
+ * re-recording the snapshot state that was accessed.
+ * If the result of [block] is not [equal to][Any.equals] the previous result, the flow will emit
+ * that new result. (This behavior is similar to that of
+ * [Flow.distinctUntilChanged][kotlinx.coroutines.flow.distinctUntilChanged].) Collection will
+ * continue indefinitely unless it is explicitly cancelled or limited by the use of other [Flow]
+ * operators.
+ *
+ * @sample androidx.compose.runtime.samples.snapshotFlowSample
+ *
+ * [block] is run in a **read-only** [Snapshot] and may not modify snapshot data. If [block]
+ * attempts to modify snapshot data, flow collection will fail with [IllegalStateException].
+ *
+ * [block] may run more than once for equal sets of inputs or only once after many rapid
+ * snapshot changes; it should be idempotent and free of side effects.
+ *
+ * When working with [Snapshot] state it is useful to keep the distinction between **events** and
+ * **state** in mind. [snapshotFlow] models snapshot changes as events, but events **cannot** be
+ * effectively modeled as observable state. Observable state is a lossy compression of the events
+ * that produced that state.
+ *
+ * An observable **event** happens at a point in time and is discarded. All registered observers
+ * at the time the event occurred are notified. All individual events in a stream are assumed
+ * to be relevant and may build on one another; repeated equal events have meaning and therefore
+ * a registered observer must observe all events without skipping.
+ *
+ * Observable **state** raises change events when the state changes from one value to a new,
+ * unequal value. State change events are **conflated;** only the most recent state matters.
+ * Observers of state changes must therefore be **idempotent;** given the same state value the
+ * observer should produce the same result. It is valid for a state observer to both skip
+ * intermediate states as well as run multiple times for the same state and the result should
+ * be the same.
+ */
+@ExperimentalComposeApi
+fun <T> snapshotFlow(
+    block: () -> T
+): Flow<T> = flow {
+    // Objects read the last time block was run
+    val readSet = mutableSetOf<Any>()
+    val readObserver: (Any) -> Unit = { readSet.add(it) }
+
+    // This channel may not block or lose data on an offer call.
+    val appliedChanges = Channel<Set<Any>>(Channel.UNLIMITED)
+
+    // Register the apply observer before running for the first time
+    // so that we don't miss updates.
+    val unregisterApplyObserver = Snapshot.registerApplyObserver { changed, _ ->
+        appliedChanges.offer(changed)
+    }
+
+    try {
+        var lastValue = takeSnapshot(readObserver).run {
+            try {
+                enter(block)
+            } finally {
+                dispose()
+            }
+        }
+        emit(lastValue)
+
+        while (true) {
+            var found = false
+            var changedObjects = appliedChanges.receive()
+
+            // Poll for any other changes before running block to minimize the number of
+            // additional times it runs for the same data
+            while (true) {
+                // Assumption: readSet will typically be smaller than changed
+                found = found || readSet.intersects(changedObjects)
+                changedObjects = appliedChanges.poll() ?: break
+            }
+
+            if (found) {
+                readSet.clear()
+                val newValue = takeSnapshot(readObserver).run {
+                    try {
+                        enter(block)
+                    } finally {
+                        dispose()
+                    }
+                }
+
+                if (newValue != lastValue) {
+                    lastValue = newValue
+                    emit(newValue)
+                }
+            }
+        }
+    } finally {
+        unregisterApplyObserver()
+    }
+}
+
+/**
+ * Return `true` if there are any elements shared between `this` and [other]
+ */
+private fun <T> Set<T>.intersects(other: Set<T>): Boolean =
+    if (size < other.size) any { it in other } else other.any { it in this }
+
+/**
+ * Take a [MutableSnapshot] and run [block] within it. When [block] returns successfully,
+ * attempt to [MutableSnapshot.apply] the snapshot. Returns the result of [block] or throws
+ * [SnapshotApplyConflictException] if snapshot changes attempted by [block] could not be applied.
+ *
+ * Prior to returning, any changes made to snapshot state (e.g. state holders returned by
+ * [androidx.compose.runtime.mutableStateOf] are not visible to other threads. When
+ * [withMutableSnapshot] returns successfully those changes will be made visible to other threads
+ * and any snapshot observers (e.g. [snapshotFlow]) will be notified of changes.
+ *
+ * [block] must not suspend if [withMutableSnapshot] is called from a suspend function.
+ */
+// TODO: determine a good way to prevent/discourage suspending in an inlined [block]
+@ExperimentalComposeApi
+inline fun <R> withMutableSnapshot(
+    block: () -> R
+): R = takeMutableSnapshot().run {
+    try {
+        enter(block).also { apply().check() }
+    } catch (t: Throwable) {
+        dispose()
+        throw t
+    }
+}
diff --git a/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/snapshots/SnapshotFlowTests.kt b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/snapshots/SnapshotFlowTests.kt
new file mode 100644
index 0000000..2bd56c8
--- /dev/null
+++ b/compose/runtime/runtime/src/test/kotlin/androidx/compose/runtime/snapshots/SnapshotFlowTests.kt
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.compose.runtime.snapshots
+
+import androidx.compose.runtime.ExperimentalComposeApi
+import androidx.compose.runtime.getValue
+import androidx.compose.runtime.mutableStateOf
+import androidx.compose.runtime.setValue
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.plus
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.yield
+import kotlin.test.Test
+import kotlin.test.assertEquals
+
+@Suppress("RemoveExplicitTypeArguments")
+@OptIn(ExperimentalComposeApi::class)
+class SnapshotFlowTests {
+    @Test
+    fun observeBasicChanges() = runBlocking<Unit> {
+        var state by mutableStateOf(1)
+        var result = 0
+
+        // Use Dispatchers.Unconfined to cause the observer to run immediately for this test,
+        // both here and when we apply a change.
+        val collector = snapshotFlow { state * 2 }
+            .onEach { result = it }
+            .launchIn(this + Dispatchers.Unconfined)
+
+        assertEquals(2, result, "value after initial run")
+
+        withMutableSnapshot {
+            state = 5
+        }
+
+        assertEquals(10, result, "value after snapshot update")
+
+        collector.cancel()
+    }
+
+    @Test
+    fun coalesceChanges() = runBlocking<Unit> {
+        var state by mutableStateOf(1)
+        var runCount = 0
+
+        // This test uses the runBlocking single-threaded dispatcher for observation, which means
+        // we don't flush changes to the observer until we yield() intentionally.
+        val collector = snapshotFlow { state }
+            .onEach { runCount++ }
+            .launchIn(this)
+
+        assertEquals(0, runCount, "initial value - snapshot collector hasn't run yet")
+        yield()
+        assertEquals(1, runCount, "snapshot collector initial run")
+
+        withMutableSnapshot { state++ }
+        yield()
+
+        assertEquals(2, runCount, "made one change")
+
+        withMutableSnapshot { state++ }
+        withMutableSnapshot { state++ }
+        yield()
+
+        assertEquals(3, runCount, "coalesced two changes")
+
+        collector.cancel()
+    }
+
+    @Test
+    fun ignoreUnrelatedChanges() = runBlocking<Unit> {
+        val state by mutableStateOf(1)
+        var unrelatedState by mutableStateOf(1)
+        var runCount = 0
+
+        // This test uses the runBlocking single-threaded dispatcher for observation, which means
+        // we don't flush changes to the observer until we yield() intentionally.
+        val collector = snapshotFlow { state }
+            .onEach { runCount++ }
+            .launchIn(this)
+        yield()
+
+        assertEquals(1, runCount, "initial run")
+
+        withMutableSnapshot { unrelatedState++ }
+        yield()
+
+        assertEquals(1, runCount, "after changing unrelated state")
+
+        collector.cancel()
+    }
+}