Eş yordamlarda, yalnızca tek bir değer döndüren askıya alma işlevlerinin aksine akış, birden fazla değeri sırayla yayınlayabilen bir türdür. Örneğin, bir veritabanından canlı güncellemeler almak için bir akış kullanabilirsiniz.
Akışlar eş yordamların üzerine kuruludur ve birden fazla değer sağlayabilir.
Akış, kavramsal olarak eşzamansız olarak hesaplanabilen bir veri akışıdır. Yayınlanan değerler aynı türde olmalıdır. Örneğin Flow<Int>
, tam sayı değerleri yayan bir akıştır.
Akış, bir değer dizisi oluşturan Iterator
işlevine çok benzer ancak eşzamansız olarak değer üretmek ve tüketmek için askıya alma işlevlerini kullanır. Bu, örneğin akışın ana iş parçacığını engellemeden bir sonraki değeri üretmek için güvenli bir şekilde ağ isteğinde bulunabileceği anlamına gelir.
Veri akışlarına dahil olan üç varlık vardır:
- Üretici, akışa eklenen veriler oluşturur. Eş yordamlar sayesinde akışlar, verileri eşzamansız olarak da üretebilir.
- (İsteğe bağlı) Aracılar, akışa veya akışın kendisine yayınlanan her bir değeri değiştirebilir.
- Tüketiciler, akıştaki değerleri tüketir.
Android'de depo, genellikle verileri görüntüleyen tüketici olarak kullanıcı arayüzüne sahip bir kullanıcı arayüzü verileri üreticisidir. Bazı durumlarda ise kullanıcı arayüzü katmanı, kullanıcı giriş etkinliklerinin üreticisidir ve hiyerarşinin diğer katmanları bunları tüketir. Üretici ve tüketici arasındaki katmanlar genellikle veri akışını değiştirerek bir sonraki katmanın gereksinimlerine göre düzenleyen aracı görevi görür.
Akış oluşturma
Akış oluşturmak için akış oluşturucu API'lerini kullanın. flow
oluşturucu işlevi, emit
işlevini kullanarak veri akışına manuel olarak yeni değerler yayınlayabileceğiniz yeni bir akış oluşturur.
Aşağıdaki örnekte bir veri kaynağı en son haberleri sabit bir aralıklarla otomatik olarak getirir. Askıya alma işlevi birden fazla ardışık değer döndüremediğinden veri kaynağı bu koşulu karşılamak için bir akış oluşturur ve döndürür. Bu durumda veri kaynağı, üretici rolünü üstlenir.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
oluşturucu, bir eş yordam içinde yürütüldü. Dolayısıyla, aynı eşzamansız API'lerden yararlanır ancak bazı kısıtlamalar geçerlidir:
- Akışlar sıralıdır. Üretici bir eş yordam içinde olduğundan, askıya alma işlevi çağrılırken askıya alma işlevi geri dönene kadar cihazı askıya alır. Örnekte üretici,
fetchLatestNews
ağ isteği tamamlanana kadar askıya alınır. Sonuç, ancak sonrasında akışa iletilir. flow
oluşturucuyla, üretici farklı birCoroutineContext
'denemit
değerleri alamaz. Bu nedenle, yeni eş yordamlar oluşturarak veyawithContext
kod blokları kullanarakemit
öğesini farklı birCoroutineContext
içinde çağırmayın. Bu durumlardacallbackFlow
gibi diğer akış oluşturucuları kullanabilirsiniz.
Akışı değiştirme
Aracılar, değerleri tüketmeden veri akışını değiştirmek için ara operatörler kullanabilir. Bu operatörler, bir veri akışına uygulandığında değerler gelecekte tüketilene kadar yürütülmeyen bir işlem zinciri oluşturan işlevlerdir. Akış referans belgelerinden ara operatörler hakkında daha fazla bilgi edinebilirsiniz.
Aşağıdaki örnekte, depo katmanı View
üzerinde görüntülenecek verileri dönüştürmek için map
ara operatörünü kullanır:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Ara operatörler birbiri ardına uygulanabilir. Böylece, bir öğe akışa yayınlandığında geç gerçekleştirilen bir işlem zinciri oluşturabilir. Bir akışa sadece ara operatör uygulamanın akış toplama işlemini başlatmadığını unutmayın.
Akıştan veri toplama
Değerleri dinlemeye başlamak üzere akışı tetiklemek için bir terminal operatörü kullanın. Yayınlandığı şekilde akıştaki tüm değerleri almak için collect
işlevini kullanın.
Resmi akış belgelerinde terminal operatörleri hakkında daha fazla bilgi edinebilirsiniz.
collect
bir askıya alma işlevi olduğundan, eş yordam içinde yürütülmesi gerekir. Her yeni değerde çağrılan parametre olarak lambda kullanılır. Bu bir askıya alma işlevi olduğundan, collect
çağrısı yapan eş düzey çalışan, akış kapatılana kadar askıya alınabilir.
Önceki örnekten devam edelim. Depo katmanındaki verileri tüketen bir ViewModel
şöyle basit bir şekilde uygulanabilir:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Akışın toplanması, en son haberleri yenileyen ve ağ isteğinin sonucunu sabit aralıklarla yayan üreticiyi tetikler. Üretici while(true)
döngüsüyle her zaman etkin olduğundan, ViewModel temizlendiğinde ve viewModelScope
iptal edildiğinde veri akışı kapatılır.
Akış toplama aşağıdaki nedenlerle durdurulabilir:
- Bir önceki örnekte gösterildiği gibi, toplanan eş yordam iptal edildi. Bu işlem, ana yapımcıyı da durdurur.
- Üretici, öğeleri yayınlamayı bitirir. Bu durumda veri akışı kapatılır ve
collect
adlı eş yordam yürütme devam eder.
Diğer ara operatörlerle belirtilmediği sürece akışlar soğuk ve geç olur. Bu, akışta bir terminal operatörü her çağrıldığında üretici kodunun yürütülmesi anlamına gelir. Önceki örnekte, birden fazla akış toplayıcının olması, veri kaynağının en son haberleri farklı sabit aralıklarla birden çok kez getirmesine neden olur. Birden fazla tüketici aynı anda toplama yaptığında akışı optimize etmek ve paylaşmak için shareIn
operatörünü kullanın.
Beklenmeyen istisnaları yakalama
Üretici uygulaması, üçüncü taraf kitaplığından gelebilir.
Bu, beklenmedik istisnalar oluşturabileceği anlamına gelir. Bu istisnaları işlemek için catch
ara operatörünü kullanın.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Yukarıdaki örnekte bir istisna oluştuğunda, yeni bir öğe alınmadığından collect
lambda çağrılmaz.
catch
, akışa emit
öğe de ekleyebilir. Örnek depo katmanı, bunun yerine önbelleğe alınan değerleri emit
:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
Bu örnekte, bir istisna oluştuğunda istisna nedeniyle akışa yeni bir öğe yayınlandığı için collect
lambdası çağrılır.
Farklı bir CoroutineContext'te yürütme
Varsayılan olarak, flow
oluşturucunun üreticisi, kendisinden toplanan eş yordamın CoroutineContext
içinde yürütülür ve daha önce de belirtildiği gibi farklı bir CoroutineContext
öğesinden emit
değerleri alamaz. Bazı durumlarda bu istenmeyen bir davranış olabilir.
Örneğin, bu konu genelinde kullanılan örneklerde depo katmanı, viewModelScope
tarafından kullanılan Dispatchers.Main
üzerinde işlem yapmamalıdır.
Bir akışın CoroutineContext
değerini değiştirmek için flowOn
ara operatörünü kullanın.
flowOn
, yukarı akış akışının CoroutineContext
değerini değiştirir; yani üretici ve önce (veya üstü)
flowOn
uygulanan tüm ara operatörler. Aşağı akış akışı (tüketiciyle birlikte flowOn
tarihinden sonra ara operatörler) etkilenmez ve akıştan collect
için kullanılan CoroutineContext
üzerinde yürütülür. Birden fazla flowOn
operatörü varsa her biri yukarı akışı mevcut konumundan değiştirir.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Bu kodla, onEach
ve map
operatörleri defaultDispatcher
'ı kullanırken catch
operatörü ve tüketici, viewModelScope
tarafından kullanılan Dispatchers.Main
üzerinde yürütülür.
Veri kaynağı katmanı G/Ç çalışması yaptığından, G/Ç işlemleri için optimize edilmiş bir görev dağıtıcı kullanmanız gerekir:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack kitaplıklarındaki akışlar
Akış birçok Jetpack kitaplığına entegre edilmiştir ve Android üçüncü taraf kitaplıkları arasında popülerdir. Akış, canlı veri güncellemeleri ve sonsuz veri akışı için idealdir.
Veritabanındaki değişikliklerle ilgili bilgi almak için Flow with Room'u kullanabilirsiniz. Veri erişim nesnelerini (DAO) kullanırken canlı güncellemeler almak için bir Flow
türü döndürün.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Example
tablosunda her değişiklik olduğunda, veritabanındaki yeni öğeleri içeren yeni bir liste yayınlanır.
Geri çağırmaya dayalı API'leri akışlara dönüştürme
callbackFlow
, geri çağırmaya dayalı API'leri akışlara dönüştürmenizi sağlayan bir akış oluşturucudur.
Örneğin, Firebase Firestore Android API'lerinde geri çağırma işlevleri kullanılır.
Bu API'leri akışlara dönüştürmek ve Firestore veritabanı güncellemelerini dinlemek için aşağıdaki kodu kullanabilirsiniz:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
flow
oluşturucudan farklı olarak callbackFlow
, değerlerin send
işleviyle farklı bir CoroutineContext
'den veya trySend
işleviyle bir eş yordamın dışında yayınlanmasına izin verir.
callbackFlow
, dahili olarak bir kanal kullanır. Bu kanal, kavramsal olarak engelleme sırasına çok benzer.
Bir kanal, arabelleğe alınabilecek maksimum öğe sayısı olan bir kapasite ile yapılandırılır. callbackFlow
ürününde oluşturulan kanalın varsayılan
kapasitesi 64 öğedir. Tam kanala yeni bir öğe eklemeye çalıştığınızda, send
yeni öğe için yer ayrılana kadar yapımcıyı askıya alır. trySend
ise öğeyi kanala eklemez ve false
değerini hemen döndürür.
trySend
, yalnızca kapasite kısıtlamalarını ihlal etmediği sürece belirtilen öğeyi kanala hemen ekler ve daha sonra başarılı sonucu döndürür.
Ek akış kaynakları
- Android'de Kotlin akışlarını test etme
StateFlow
veSharedFlow
- Kotlin eş yordamları ve akışı için ek kaynaklar