Exposed cached data via PagingData

When using Flow<PagingData<T>>.cachedIn(scope), data that has been collected within that generation is cached for as long as the scope remains alive.
Normally the cachedData can only be accessed by new subscriptions when they collect on the SharedFlow returned by cachedIn. Now we expose this cachedData via PagingData so that the cached data can be accessed synchronously without requiring collection.

Test: ./gradlew paging:paging-common:test
Bug: 177245496

Change-Id: I36a894d7d90f61396604e7847c5be6027b5e0991
diff --git a/paging/paging-common/src/main/kotlin/androidx/paging/CachedPageEventFlow.kt b/paging/paging-common/src/main/kotlin/androidx/paging/CachedPageEventFlow.kt
index f954fcf..da08a43 100644
--- a/paging/paging-common/src/main/kotlin/androidx/paging/CachedPageEventFlow.kt
+++ b/paging/paging-common/src/main/kotlin/androidx/paging/CachedPageEventFlow.kt
@@ -108,6 +108,12 @@
                 }
             }
     }
+
+    /**
+     * Returns cached data as PageEvent.Insert. Null if cached data is empty (for example on
+     * initial refresh).
+     */
+    internal fun getCachedEvent(): PageEvent.Insert<T>? = pageController.getCachedEvent()
 }
 
 private class FlattenedPageController<T : Any> {
@@ -141,6 +147,10 @@
             }
         }
     }
+
+    fun getCachedEvent(): PageEvent.Insert<T>? = list.getAsEvents().firstOrNull()?.let {
+        if (it is PageEvent.Insert && it.loadType == LoadType.REFRESH) it else null
+    }
 }
 
 /**
diff --git a/paging/paging-common/src/main/kotlin/androidx/paging/CachedPagingData.kt b/paging/paging-common/src/main/kotlin/androidx/paging/CachedPagingData.kt
index 08024a7..35e6aed 100644
--- a/paging/paging-common/src/main/kotlin/androidx/paging/CachedPagingData.kt
+++ b/paging/paging-common/src/main/kotlin/androidx/paging/CachedPagingData.kt
@@ -53,7 +53,8 @@
             tracker?.onComplete(PAGE_EVENT_FLOW)
         },
         uiReceiver = parent.uiReceiver,
-        hintReceiver = parent.hintReceiver
+        hintReceiver = parent.hintReceiver,
+        cachedPageEvent = { accumulated.getCachedEvent() }
     )
 
     suspend fun close() = accumulated.close()
diff --git a/paging/paging-common/src/main/kotlin/androidx/paging/PagingData.kt b/paging/paging-common/src/main/kotlin/androidx/paging/PagingData.kt
index 3049153..c14bb57 100644
--- a/paging/paging-common/src/main/kotlin/androidx/paging/PagingData.kt
+++ b/paging/paging-common/src/main/kotlin/androidx/paging/PagingData.kt
@@ -28,7 +28,15 @@
 public class PagingData<T : Any> internal constructor(
     internal val flow: Flow<PageEvent<T>>,
     internal val uiReceiver: UiReceiver,
-    internal val hintReceiver: HintReceiver
+    internal val hintReceiver: HintReceiver,
+
+    /**
+     * A lambda returning a nullable PageEvent.Insert containing data which can be accessed
+     * and displayed synchronously without requiring collection.
+     *
+     * For example, the data may be real loaded data that has been cached via [cachedIn].
+     */
+    private val cachedPageEvent: () -> PageEvent.Insert<T>? = { null }
 ) {
     public companion object {
         internal val NOOP_UI_RECEIVER = object : UiReceiver {
@@ -137,4 +145,6 @@
             hintReceiver = NOOP_HINT_RECEIVER,
         )
     }
-}
+
+    internal fun cachedEvent(): PageEvent.Insert<T>? = cachedPageEvent()
+}
\ No newline at end of file
diff --git a/paging/paging-common/src/test/kotlin/androidx/paging/CachingTest.kt b/paging/paging-common/src/test/kotlin/androidx/paging/CachingTest.kt
index 6b41869..5eb154c 100644
--- a/paging/paging-common/src/test/kotlin/androidx/paging/CachingTest.kt
+++ b/paging/paging-common/src/test/kotlin/androidx/paging/CachingTest.kt
@@ -26,6 +26,7 @@
 import kotlinx.coroutines.SupervisorJob
 import kotlinx.coroutines.cancelAndJoin
 import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.SharedFlow
 import kotlinx.coroutines.flow.catch
 import kotlinx.coroutines.flow.filterIsInstance
 import kotlinx.coroutines.flow.first
@@ -103,6 +104,36 @@
     }
 
     @Test
+    fun cachedData() = testScope.runTest {
+        val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker)
+        assertThat(pageFlow).isInstanceOf(SharedFlow::class.java)
+        assertThat((pageFlow as SharedFlow<PagingData<Item>>).replayCache).isEmpty()
+
+        pageFlow.collectItemsUntilSize(6)
+        val firstCachedData = pageFlow.cachedData()
+        assertThat(firstCachedData).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6
+            )
+        )
+
+        pageFlow.collectItemsUntilSize(9)
+        val secondCachedEvent = pageFlow.cachedData()
+        assertThat(secondCachedEvent).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 9
+            )
+        )
+        assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
+    }
+
+    @Test
     fun cached_afterMapping() = testScope.runTest {
         var mappingCnt = 0
         val pageFlow = buildPageFlow().map { pagingData ->
@@ -138,6 +169,44 @@
     }
 
     @Test
+    fun cachedData_afterMapping() = testScope.runTest {
+        var mappingCnt = 0
+        val pageFlow = buildPageFlow().map { pagingData ->
+            val mappingIndex = mappingCnt++
+            pagingData.map {
+                it.copy(metadata = mappingIndex.toString())
+            }
+        }.cachedIn(backgroundScope, tracker)
+
+        pageFlow.collectItemsUntilSize(6)
+        val firstCachedData = pageFlow.cachedData()
+        assertThat(firstCachedData).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6
+            ) {
+                it.copy(metadata = "0")
+            }
+        )
+
+        pageFlow.collectItemsUntilSize(9)
+        val secondCachedData = pageFlow.cachedData()
+        assertThat(secondCachedData).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 9
+            ) {
+                it.copy(metadata = "0")
+            }
+        )
+        assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
+    }
+
+    @Test
     fun cached_beforeMapping() = testScope.runTest {
         var mappingCnt = 0
         val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker).map { pagingData ->
@@ -173,6 +242,44 @@
     }
 
     @Test
+    fun cachedData_beforeMapping() = testScope.runTest {
+        var mappingCnt = 0
+        val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker)
+        val mappedFlow = pageFlow.map { pagingData ->
+            val mappingIndex = mappingCnt++
+            pagingData.map {
+                it.copy(metadata = mappingIndex.toString())
+            }
+        }
+        // Mapping converts SharedFlow to Flow and thereby blocks access to cachedIn's
+        // replayCache. You can still access latest cachedData directly from pre-mapped flow.
+        mappedFlow.collectItemsUntilSize(6)
+        val firstCachedData = pageFlow.cachedData()
+        assertThat(firstCachedData).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6,
+                modifier = null // before mapping
+            )
+        )
+
+        mappedFlow.collectItemsUntilSize(9)
+        val secondCachedEvent = pageFlow.cachedData()
+        assertThat(secondCachedEvent).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 9,
+                modifier = null // before mapping
+            )
+        )
+        assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
+    }
+
+    @Test
     fun cached_afterMapping_withMoreMappingAfterwards() = testScope.runTest {
         var mappingCnt = 0
         val pageFlow = buildPageFlow().map { pagingData ->
@@ -213,6 +320,51 @@
     }
 
     @Test
+    fun cachedData_afterMapping_withMoreMappingAfterwards() = testScope.runTest {
+        var mappingCnt = 0
+        val pageFlow = buildPageFlow().map { pagingData ->
+            val mappingIndex = mappingCnt++
+            pagingData.map {
+                it.copy(metadata = mappingIndex.toString())
+            }
+        }.cachedIn(backgroundScope, tracker)
+        val mappedFlow = pageFlow.map { pagingData ->
+            val mappingIndex = mappingCnt++
+            pagingData.map {
+                it.copy(metadata = "${it.metadata}_$mappingIndex")
+            }
+        }
+        // Mapping converts SharedFlow to Flow and thereby blocks access to cachedIn's
+        // replayCache. You can still access latest cachedData directly from pre-mapped flow.
+        mappedFlow.collectItemsUntilSize(6)
+        val firstCachedData = pageFlow.cachedData()
+        assertThat(firstCachedData).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6
+            ) {
+                it.copy(metadata = "0") // with mapping before cache
+            }
+        )
+
+        mappedFlow.collectItemsUntilSize(9)
+        val secondCachedEvent = pageFlow.cachedData()
+        assertThat(secondCachedEvent).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 9
+            ) {
+                it.copy(metadata = "0") // with mapping before cache
+            }
+        )
+        assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
+    }
+
+    @Test
     fun pagesAreClosedProperty() {
         val job = SupervisorJob()
         val subScope = CoroutineScope(job + Dispatchers.Default)
@@ -364,6 +516,67 @@
         )
     }
 
+    @Test
+    public fun unusedPagingDataIsNeverCached(): Unit = testScope.runTest {
+        val factory = StringPagingSource.VersionedFactory()
+        val flow = buildPageFlow(factory).cachedIn(backgroundScope, tracker)
+        val collector = ItemCollector(flow)
+        val job = SupervisorJob()
+        val subScope = CoroutineScope(coroutineContext + job)
+        collector.collectPassivelyIn(subScope)
+        testScope.runCurrent()
+        // check that cachedData contains data from passive collector
+        assertThat(flow.cachedData()).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 3
+            )
+        )
+        // finish that collector
+        job.cancelAndJoin()
+        assertThat(factory.nextVersion).isEqualTo(1)
+        repeat(10) {
+            factory.invalidateLatest()
+            testScope.runCurrent()
+        }
+        runCurrent()
+        // next version is 11, the last paged data we've created has version 10
+        assertThat(factory.nextVersion).isEqualTo(11)
+
+        // the replayCache has paged data version 10 but no collection on this pagingData yet
+        // so it has no cachedEvent.
+        val cachedPagingData = (flow as SharedFlow<PagingData<Item>>).replayCache.first()
+        assertThat(cachedPagingData.cachedEvent()).isNull()
+
+        // create another collector from shared, should only receive 1 paging data and that
+        // should be the latest because previous PagingData is invalidated
+        val collector2 = ItemCollector(flow)
+        collector2.collectPassivelyIn(backgroundScope)
+        testScope.runCurrent()
+        // now this PagingData has cachedEvents from version 10
+        assertThat(flow.cachedData()).isEqualTo(
+            buildItems(
+                version = 10,
+                generation = 0,
+                start = 0,
+                size = 3
+            )
+        )
+        assertThat(factory.nextVersion).isEqualTo(11)
+        // collect some more and ensure cachedData is still up-to-date
+        flow.collectItemsUntilSize(9)
+        assertThat(flow.cachedData()).isEqualTo(
+            buildItems(
+                version = 10,
+                generation = 0,
+                start = 0,
+                size = 9
+            )
+        )
+    }
+
     private fun buildPageFlow(
         factory: StringPagingSource.VersionedFactory = StringPagingSource.VersionedFactory()
     ): Flow<PagingData<Item>> {
@@ -441,6 +654,20 @@
             }.first()
     }
 
+    private fun Flow<PagingData<Item>>.cachedData(): List<Item> {
+        assertThat(this).isInstanceOf(SharedFlow::class.java)
+        val flow = this as SharedFlow<PagingData<Item>>
+        assertThat(flow.replayCache).isNotEmpty()
+
+        val pagingData = flow.replayCache.firstOrNull()
+        assertThat(pagingData).isNotNull()
+
+        val event = pagingData!!.cachedEvent()
+        assertThat(event).isInstanceOf(PageEvent.Insert::class.java)
+
+        return (event as PageEvent.Insert<Item>).pages.flatMap { it.data }
+    }
+
     /**
      * Paged list collector that does not call any hints but always collects
      */