Nelle coroutine, un flusso è un tipo che può emettere più valori in sequenza, a differenza delle funzioni di sospensione che restituiscono un solo valore. Ad esempio, puoi utilizzare un flusso per ricevere aggiornamenti in tempo reale da un database.
I flussi si basano sulle coroutine e possono fornire più valori.
Un flusso è concettualmente un flusso di dati che può essere calcolato in modo asincrono. I valori emessi devono essere dello stesso tipo. Ad
esempio, un Flow<Int>
è un flusso che emette valori interi.
Un flusso è molto simile a un Iterator
che produce una sequenza di
valori, ma utilizza funzioni di sospensione per produrre e consumare valori
in modo asincrono. Ciò significa, ad esempio, che il flusso può effettuare in modo sicuro una
richiesta di rete per produrre il valore successivo senza bloccare il thread
principale.
Nei flussi di dati sono coinvolte tre entità:
- Un producer produce i dati che vengono aggiunti allo stream. Grazie alle coroutine, i flussi possono anche produrre dati in modo asincrono.
- (Facoltativo) Gli intermediari possono modificare ogni valore emesso nello stream o nello stream stesso.
- Un consumer consuma i valori dello stream.
In Android, un repository è in genere un produttore di dati dell'interfaccia utente che ha l'interfaccia utente (UI) come consumer che visualizza i dati. Altre volte, il livello UI è un produttore di eventi di input dell'utente, mentre altri livelli della gerarchia li utilizzano. I livelli tra il produttore e il consumatore di solito agiscono da intermediari che modificano il flusso di dati per adattarlo ai requisiti del livello successivo.
Creazione di un flusso
Per creare flussi, utilizza le API di flow Builder. La funzione di creazione di flow
crea un nuovo flusso in cui puoi emettere manualmente
nuovi valori nel flusso di dati utilizzando la funzione
emit
.
Nell'esempio seguente, un'origine dati recupera automaticamente le notizie più recenti a intervalli fissi. Poiché una funzione di sospensione non può restituire più valori consecutivi, l'origine dati crea e restituisce un flusso per soddisfare questo requisito. In questo caso, l'origine dati agisce come producer.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Il builder di flow
viene eseguito all'interno di una coroutine. Pertanto, beneficia
delle stesse API asincrone, ma si applicano alcune limitazioni:
- I flussi sono sequenziali. Poiché il producer è in una coroutine, quando richiama
una funzione di sospensione, il producer sospende finché non restituisce la funzione
di sospensione. Nell'esempio, il producer esegue la sospensione fino al completamento della richiesta di rete
fetchLatestNews
. Solo in questo caso il risultato viene emesso nello stream. - Con il builder
flow
, il producer non puòemit
valori da unCoroutineContext
diverso. Pertanto, non chiamareemit
in un altroCoroutineContext
creando nuove coroutine o utilizzandowithContext
blocchi di codice. In questi casi, puoi utilizzare altri generatori di flussi comecallbackFlow
.
Modifica dello stream
Gli intermediari possono utilizzare gli operatori intermedi per modificare il flusso di dati senza consumare i valori. Questi operatori sono funzioni che, applicate a un flusso di dati, configurano una catena di operazioni che non vengono eseguite finché i valori non vengono consumati in futuro. Scopri di più sugli operatori intermedi nella documentazione di riferimento di Flow.
Nell'esempio seguente, il livello di repository utilizza l'operatore intermedio
map
per trasformare i dati da visualizzare in View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Gli operatori intermedi possono essere applicati uno dopo l'altro, formando una catena di operazioni che vengono eseguite in modo lento quando un elemento viene emesso nel flusso. Tieni presente che la semplice applicazione di un operatore intermedio a un flusso non avvia la raccolta del flusso.
Raccolta da un flusso
Utilizza un operatore del terminale per attivare il flusso in modo che inizi ad ascoltare i valori. Per ottenere tutti i valori nello stream man mano che vengono emessi, utilizza collect
.
Per scoprire di più sugli operatori dei terminal, consulta la documentazione ufficiale sul flusso.
Poiché collect
è una funzione di sospensione, deve essere eseguita all'interno di una coroutine. Prende un parametro lambda che viene
chiamato su ogni nuovo valore. Poiché è una funzione di sospensione, la coroutina che
chiama collect
potrebbe essere sospesa fino alla chiusura del flusso.
Continuando con l'esempio precedente, ecco una semplice implementazione di un ViewModel
che utilizza i dati dal livello di repository:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
La raccolta del flusso attiva il producer che aggiorna le ultime notizie ed emette il risultato della richiesta di rete a intervalli fissi. Poiché il producer rimane sempre attivo con il loop while(true)
, il flusso di dati verrà chiuso quando il ViewModel viene cancellato e viewModelScope
viene annullato.
La raccolta dei flussi può interrompersi per i seguenti motivi:
- La coroutina che viene raccolta viene annullata, come mostrato nell'esempio precedente. In questo modo viene interrotto anche il producer sottostante.
- Il producer ha finito di emettere articoli. In questo caso, il flusso di dati
è chiuso e la coroutine che ha chiamato
collect
riprende l'esecuzione.
I flussi sono freddi e lazy a meno che non siano specificati con altri operatori
intermedi. Ciò significa che il codice producer viene eseguito ogni volta che
viene chiamato un operatore del terminale nel flusso. Nell'esempio precedente,
la presenza di più raccoglitori di flusso fa sì che l'origine dati recuperi le ultime notizie più volte a intervalli fissi diversi. Per ottimizzare e condividere un flusso quando più consumatori raccolgono contemporaneamente, utilizza l'operatore shareIn
.
Rilevamento di eccezioni impreviste
L'implementazione del produttore può provenire da una libreria di terze parti.
Ciò significa che può generare eccezioni impreviste. Per gestire queste
eccezioni, utilizza l'operatore intermedio
catch
.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Nell'esempio precedente, quando si verifica un'eccezione, collect
lambda non viene chiamato perché non è stato ricevuto un nuovo elemento.
catch
può anche emit
elementi nel flusso. Il livello del repository di esempio potrebbe invece emit
i valori memorizzati nella cache:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
In questo esempio, quando si verifica un'eccezione, viene chiamata la funzione lambda collect
, poiché un nuovo elemento è stato emesso nello stream a causa dell'eccezione.
Esecuzione in un contesto Coroutine diverso
Per impostazione predefinita, il producer di un builder flow
esegue l'elemento
CoroutineContext
della coroutine che lo raccoglie e, come
precedentemente accennato, non può emit
valori da un
CoroutineContext
diverso. Questo comportamento potrebbe essere indesiderato in alcuni casi.
Ad esempio, negli esempi utilizzati in questo argomento, il livello del repository non dovrebbe eseguire operazioni su Dispatchers.Main
utilizzato da viewModelScope
.
Per modificare CoroutineContext
di un flusso, utilizza l'operatore intermedio flowOn
.
flowOn
modifica il CoroutineContext
del flusso upstream, vale a dire che il producer e gli eventuali operatori intermedi applicati prima (o sopra)
flowOn
. Il flusso downstream (gli operatori intermedi dopo flowOn
insieme al consumatore) non è interessato e viene eseguito sul CoroutineContext
utilizzato a collect
dal flusso. Se sono presenti
più operatori flowOn
, ciascuno modifica l'upstream rispetto alla
posizione attuale.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Con questo codice, gli operatori onEach
e map
utilizzano defaultDispatcher
,
mentre l'operatore catch
e il consumatore vengono eseguiti su
Dispatchers.Main
utilizzato da viewModelScope
.
Poiché il livello dell'origine dati esegue le operazioni di I/O, devi utilizzare un supervisore ottimizzato per le operazioni di I/O:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Flussi nelle librerie Jetpack
Flow è integrato in molte librerie Jetpack ed è molto diffuso tra le librerie di terze parti Android. Flow è un'ottima soluzione per aggiornamenti di dati in tempo reale e flussi infiniti di dati.
Puoi utilizzare Flow with Room per ricevere notifiche sulle modifiche in un database. Quando utilizzi gli oggetti di accesso ai dati (DAO), restituisci un tipo Flow
per ricevere aggiornamenti in tempo reale.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Ogni volta che viene apportata una modifica alla tabella Example
, viene emesso un nuovo elenco
con i nuovi elementi nel database.
Converti le API basate su callback in flussi
callbackFlow
è un generatore di flussi che consente di convertire le API basate su callback in flussi.
Ad esempio, le API Android di Firebase Firestore
utilizzano i callback.
Per convertire queste API in flussi e ascoltare gli aggiornamenti del database Firestore, puoi utilizzare il seguente codice:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
A differenza del generatore di flow
, callbackFlow
consente l'emissione di valori da un valore CoroutineContext
diverso con la funzione
send
o al di fuori di una coroutina con la
funzione trySend
.
Internamente, callbackFlow
utilizza un
canale,
che è concettualmente molto simile a una
coda di blocco.
Un canale è configurato con una capacità, ovvero il numero massimo di elementi che possono essere inseriti nel buffer. Il canale creato in callbackFlow
ha una capacità
predefinita di 64 elementi. Quando provi ad aggiungere un nuovo elemento a un canale
intero, send
sospende il produttore finché non è disponibile spazio per il nuovo
elemento, mentre trySend
non aggiunge l'elemento al canale e restituisce
false
immediatamente.
trySend
aggiunge immediatamente l'elemento specificato al canale,
solo se ciò non viola le sue limitazioni della capacità, quindi restituisce il
risultato riuscito.
Risorse di flusso aggiuntive
- Testare i flussi Kotlin su Android
StateFlow
eSharedFlow
- Risorse aggiuntive per coroutine e flussi di Kotlin