Use mutex to implement queueing
With this, we can easily control the scope for running
the given task and handle errors easily.
It also updates awaitClose to wait remaining tasks
and close.
Bug: 267193989
Test: manually read characteritisc from a remote GATT server
Change-Id: I0afa1002a2f4d163dfe3f1d0f0a113a7565617e2
diff --git a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt
index 219af7b..4438963 100644
--- a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt
+++ b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt
@@ -124,9 +124,9 @@
suspend fun <R> connectGatt(
context: Context,
device: BluetoothDevice,
- block: GattClientScope.() -> R
- ) {
- GattClientImpl().connect(context, device, block)
+ block: suspend GattClientScope.() -> R
+ ): R? {
+ return GattClientImpl().connect(context, device, block)
}
@SuppressLint("MissingPermission")
diff --git a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt
index 1b38884..0acb5e9 100644
--- a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt
+++ b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt
@@ -32,13 +32,15 @@
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
-import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableSharedFlow
+import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.emptyFlow
+import kotlinx.coroutines.flow.filter
+import kotlinx.coroutines.flow.first
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
@@ -52,54 +54,44 @@
private val CCCD_UID = UUID.fromString("00002902-0000-1000-8000-00805f9b34fb")
}
- private data class ClientTask(
- val taskBlock: () -> Unit
- ) {
- val finished: CompletableDeferred<Boolean> = CompletableDeferred()
- val callbackChannel: Channel<ClientCallback> = Channel()
- }
-
- private sealed interface ClientCallback {
+ private sealed interface CallbackResult {
class OnCharacteristicRead(
val characteristic: BluetoothGattCharacteristic,
val value: ByteArray,
val status: Int
- ) : ClientCallback
+ ) : CallbackResult
class OnCharacteristicWrite(
val characteristic: BluetoothGattCharacteristic,
val status: Int
- ) : ClientCallback
+ ) : CallbackResult
class OnDescriptorWrite(
val descriptor: BluetoothGattDescriptor,
val status: Int
- ) : ClientCallback
+ ) : CallbackResult
}
private interface SubscribeListener {
fun onCharacteristicNotification(value: ByteArray)
+ fun finish()
}
@SuppressLint("MissingPermission")
suspend fun <R> connect(
context: Context,
device: BluetoothDevice,
- block: BluetoothLe.GattClientScope.() -> R
- ) = coroutineScope {
+ block: suspend BluetoothLe.GattClientScope.() -> R
+ ): R? = coroutineScope {
val connectResult = CompletableDeferred<Boolean>(parent = coroutineContext.job)
val finished = Job(parent = coroutineContext.job)
- var currentTask: ClientTask? = null
+ val callbackResultsFlow =
+ MutableSharedFlow<CallbackResult>(extraBufferCapacity = Int.MAX_VALUE)
val subscribeMap: MutableMap<BluetoothGattCharacteristic, SubscribeListener> = arrayMapOf()
val subscribeMutex = Mutex()
val callback = object : BluetoothGattCallback() {
override fun onConnectionStateChange(gatt: BluetoothGatt?, status: Int, newState: Int) {
- Log.d(
- TAG,
- "onConnectionStateChange() called with: gatt = $gatt, status = $status, " +
- "newState = $newState"
- )
if (newState == BluetoothGatt.STATE_CONNECTED) {
gatt?.requestMtu(GATT_MAX_MTU)
} else {
@@ -130,8 +122,8 @@
value: ByteArray,
status: Int
) {
- currentTask?.callbackChannel?.trySend(
- ClientCallback.OnCharacteristicRead(characteristic, value, status))
+ callbackResultsFlow.tryEmit(
+ CallbackResult.OnCharacteristicRead(characteristic, value, status))
}
override fun onCharacteristicWrite(
@@ -139,8 +131,8 @@
characteristic: BluetoothGattCharacteristic,
status: Int
) {
- currentTask?.callbackChannel?.trySend(
- ClientCallback.OnCharacteristicWrite(characteristic, status))
+ callbackResultsFlow.tryEmit(
+ CallbackResult.OnCharacteristicWrite(characteristic, status))
}
override fun onDescriptorWrite(
@@ -148,8 +140,8 @@
descriptor: BluetoothGattDescriptor,
status: Int
) {
- currentTask?.callbackChannel?.trySend(
- ClientCallback.OnDescriptorWrite(descriptor, status))
+ callbackResultsFlow.tryEmit(
+ CallbackResult.OnDescriptorWrite(descriptor, status))
}
override fun onCharacteristicChanged(
@@ -165,25 +157,16 @@
}
}
val bluetoothGatt = device.connectGatt(context, /*autoConnect=*/false, callback)
- val tasks: Channel<ClientTask> = Channel(10)
if (!connectResult.await()) {
Log.w(TAG, "Failed to connect to the remote GATT server")
- return@coroutineScope
+ return@coroutineScope null
}
val gattScope = object : BluetoothLe.GattClientScope {
- suspend fun run() {
- try {
- tasks.consumeEach { task ->
- currentTask = task
- task.taskBlock()
- task.finished.await()
- currentTask = null
- }
- } finally {
- finished.complete()
- bluetoothGatt.close()
- bluetoothGatt.disconnect()
+ val taskMutex = Mutex()
+ suspend fun<R> runTask(block: suspend () -> R): R {
+ taskMutex.withLock {
+ return block()
}
}
@@ -197,17 +180,14 @@
override suspend fun readCharacteristic(characteristic: BluetoothGattCharacteristic):
Result<ByteArray> {
- val task = ClientTask {
+ return runTask {
bluetoothGatt.readCharacteristic(characteristic)
- }
- tasks.send(task)
- while (true) {
- val res = task.callbackChannel.receive()
- if (res !is ClientCallback.OnCharacteristicRead) continue
- if (res.characteristic != characteristic) continue
+ val res = takeMatchingResult<CallbackResult.OnCharacteristicRead>(
+ callbackResultsFlow) {
+ it.characteristic == characteristic
+ }
- task.finished.complete(res.status == GATT_SUCCESS)
- return if (res.status == GATT_SUCCESS) Result.success(res.value)
+ if (res.status == GATT_SUCCESS) Result.success(res.value)
else Result.failure(RuntimeException("fail"))
}
}
@@ -217,17 +197,13 @@
value: ByteArray,
writeType: Int
): Result<Unit> {
- val task = ClientTask {
- bluetoothGatt.writeCharacteristic(characteristic, value, writeType)
- }
- tasks.send(task)
- while (true) {
- val res = task.callbackChannel.receive()
- if (res !is ClientCallback.OnCharacteristicWrite) continue
- if (res.characteristic.uuid != characteristic.uuid) continue
-
- task.finished.complete(res.status == GATT_SUCCESS)
- return if (res.status == GATT_SUCCESS) Result.success(Unit)
+ return runTask {
+ bluetoothGatt.writeCharacteristic(characteristic, value, writeType)
+ val res = takeMatchingResult<CallbackResult.OnCharacteristicWrite>(
+ callbackResultsFlow) {
+ it.characteristic == characteristic
+ }
+ if (res.status == GATT_SUCCESS) Result.success(Unit)
else Result.failure(RuntimeException("fail"))
}
}
@@ -241,29 +217,27 @@
override fun onCharacteristicNotification(value: ByteArray) {
trySend(value)
}
+ override fun finish() {
+ cancel("finished")
+ }
}
if (!registerSubscribeListener(characteristic, listener)) {
- cancel(CancellationException("already subscribed"))
+ cancel("already subscribed")
}
- val task = ClientTask {
+ runTask {
bluetoothGatt.setCharacteristicNotification(characteristic, /*enable=*/true)
bluetoothGatt.writeDescriptor(
cccd,
BluetoothGattDescriptor.ENABLE_NOTIFICATION_VALUE
)
- }
- tasks.send(task)
- while (true) {
- val res = task.callbackChannel.receive()
- if (res !is ClientCallback.OnDescriptorWrite) continue
- if (res.descriptor != cccd) continue
-
- task.finished.complete(res.status == GATT_SUCCESS)
+ val res = takeMatchingResult<CallbackResult.OnDescriptorWrite>(
+ callbackResultsFlow) {
+ it.descriptor == cccd
+ }
if (res.status != GATT_SUCCESS) {
cancel(CancellationException("failed to set notification"))
}
- break
}
this.awaitClose {
@@ -282,8 +256,12 @@
override suspend fun awaitClose(onClosed: () -> Unit) {
try {
- tasks.close()
- finished.join()
+ // Wait for queued tasks done
+ taskMutex.withLock {
+ subscribeMutex.withLock {
+ subscribeMap.values.forEach { it.finish() }
+ }
+ }
} finally {
onClosed()
}
@@ -310,13 +288,13 @@
}
}
}
- coroutineScope {
- launch {
- gattScope.run()
- }
- launch {
- gattScope.block()
- }
- }
+ gattScope.block()
+ }
+
+ private suspend inline fun<reified R : CallbackResult> takeMatchingResult(
+ flow: SharedFlow<CallbackResult>,
+ crossinline predicate: (R) -> Boolean
+ ): R {
+ return flow.filter { it is R && predicate(it) }.first() as R
}
}
diff --git a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt
index ef8e022..24f3597 100644
--- a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt
+++ b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt
@@ -41,7 +41,6 @@
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.first
-import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
class HomeFragment : Fragment() {
@@ -140,30 +139,26 @@
connectJob?.cancel()
connectJob = connectScope.launch {
bluetoothLe.connectGatt(requireContext(), scanResult.device) {
- launch {
- val jobs = ArrayList<Job>()
- for (srv in getServices()) {
- for (char in srv.characteristics) {
- if (char.properties.and(PROPERTY_READ) == 0) continue
- jobs.add(launch {
- val value = readCharacteristic(char).getOrNull()
- if (value != null) {
- Log.d(TAG, "Successfully read characteristic value=$value")
- }
- })
+ for (srv in getServices()) {
+ for (char in srv.characteristics) {
+ if (char.properties.and(PROPERTY_READ) == 0) continue
+ launch {
+ val value = readCharacteristic(char).getOrNull()
+ if (value != null) {
+ Log.d(TAG, "Successfully read characteristic value=$value")
+ }
+ }
+ launch {
if (char.properties.and(PROPERTY_NOTIFY) != 0) {
- jobs.add(launch {
- val value = subscribeToCharacteristic(char).first()
- Log.d(TAG, "Successfully get characteristic value=$value")
- })
+ val value = subscribeToCharacteristic(char).first()
+ Log.d(TAG, "Successfully get characteristic value=$value")
}
}
}
- jobs.joinAll()
- awaitClose {
- Log.d(TAG, "GATT client is closed")
- connectJob = null
- }
+ }
+ awaitClose {
+ Log.d(TAG, "GATT client is closed")
+ connectJob = null
}
}
}