Use mutex to implement queueing

With this, we can easily control the scope for running
the given task and handle errors easily.

It also updates awaitClose to wait remaining tasks
and close.

Bug: 267193989
Test: manually read characteritisc from a remote GATT server
Change-Id: I0afa1002a2f4d163dfe3f1d0f0a113a7565617e2
diff --git a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt
index 219af7b..4438963 100644
--- a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt
+++ b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/BluetoothLe.kt
@@ -124,9 +124,9 @@
     suspend fun <R> connectGatt(
         context: Context,
         device: BluetoothDevice,
-        block: GattClientScope.() -> R
-    ) {
-        GattClientImpl().connect(context, device, block)
+        block: suspend GattClientScope.() -> R
+    ): R? {
+        return GattClientImpl().connect(context, device, block)
     }
 
     @SuppressLint("MissingPermission")
diff --git a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt
index 1b38884..0acb5e9 100644
--- a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt
+++ b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/experimental/GattClientImpl.kt
@@ -32,13 +32,15 @@
 import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.Job
 import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.channels.awaitClose
-import kotlinx.coroutines.channels.consumeEach
 import kotlinx.coroutines.coroutineScope
 import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableSharedFlow
+import kotlinx.coroutines.flow.SharedFlow
 import kotlinx.coroutines.flow.callbackFlow
 import kotlinx.coroutines.flow.emptyFlow
+import kotlinx.coroutines.flow.filter
+import kotlinx.coroutines.flow.first
 import kotlinx.coroutines.job
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.sync.Mutex
@@ -52,54 +54,44 @@
         private val CCCD_UID = UUID.fromString("00002902-0000-1000-8000-00805f9b34fb")
     }
 
-    private data class ClientTask(
-        val taskBlock: () -> Unit
-    ) {
-        val finished: CompletableDeferred<Boolean> = CompletableDeferred()
-        val callbackChannel: Channel<ClientCallback> = Channel()
-    }
-
-    private sealed interface ClientCallback {
+    private sealed interface CallbackResult {
         class OnCharacteristicRead(
             val characteristic: BluetoothGattCharacteristic,
             val value: ByteArray,
             val status: Int
-        ) : ClientCallback
+        ) : CallbackResult
 
         class OnCharacteristicWrite(
             val characteristic: BluetoothGattCharacteristic,
             val status: Int
-        ) : ClientCallback
+        ) : CallbackResult
 
         class OnDescriptorWrite(
             val descriptor: BluetoothGattDescriptor,
             val status: Int
-        ) : ClientCallback
+        ) : CallbackResult
     }
 
     private interface SubscribeListener {
         fun onCharacteristicNotification(value: ByteArray)
+        fun finish()
     }
 
     @SuppressLint("MissingPermission")
     suspend fun <R> connect(
         context: Context,
         device: BluetoothDevice,
-        block: BluetoothLe.GattClientScope.() -> R
-    ) = coroutineScope {
+        block: suspend BluetoothLe.GattClientScope.() -> R
+    ): R? = coroutineScope {
         val connectResult = CompletableDeferred<Boolean>(parent = coroutineContext.job)
         val finished = Job(parent = coroutineContext.job)
-        var currentTask: ClientTask? = null
+        val callbackResultsFlow =
+            MutableSharedFlow<CallbackResult>(extraBufferCapacity = Int.MAX_VALUE)
         val subscribeMap: MutableMap<BluetoothGattCharacteristic, SubscribeListener> = arrayMapOf()
         val subscribeMutex = Mutex()
 
         val callback = object : BluetoothGattCallback() {
             override fun onConnectionStateChange(gatt: BluetoothGatt?, status: Int, newState: Int) {
-                Log.d(
-                    TAG,
-                    "onConnectionStateChange() called with: gatt = $gatt, status = $status, " +
-                        "newState = $newState"
-                )
                 if (newState == BluetoothGatt.STATE_CONNECTED) {
                     gatt?.requestMtu(GATT_MAX_MTU)
                 } else {
@@ -130,8 +122,8 @@
                 value: ByteArray,
                 status: Int
             ) {
-                currentTask?.callbackChannel?.trySend(
-                    ClientCallback.OnCharacteristicRead(characteristic, value, status))
+                callbackResultsFlow.tryEmit(
+                    CallbackResult.OnCharacteristicRead(characteristic, value, status))
             }
 
             override fun onCharacteristicWrite(
@@ -139,8 +131,8 @@
                 characteristic: BluetoothGattCharacteristic,
                 status: Int
             ) {
-                currentTask?.callbackChannel?.trySend(
-                    ClientCallback.OnCharacteristicWrite(characteristic, status))
+                callbackResultsFlow.tryEmit(
+                    CallbackResult.OnCharacteristicWrite(characteristic, status))
             }
 
             override fun onDescriptorWrite(
@@ -148,8 +140,8 @@
                 descriptor: BluetoothGattDescriptor,
                 status: Int
             ) {
-                currentTask?.callbackChannel?.trySend(
-                    ClientCallback.OnDescriptorWrite(descriptor, status))
+                callbackResultsFlow.tryEmit(
+                    CallbackResult.OnDescriptorWrite(descriptor, status))
             }
 
             override fun onCharacteristicChanged(
@@ -165,25 +157,16 @@
             }
         }
         val bluetoothGatt = device.connectGatt(context, /*autoConnect=*/false, callback)
-        val tasks: Channel<ClientTask> = Channel(10)
 
         if (!connectResult.await()) {
             Log.w(TAG, "Failed to connect to the remote GATT server")
-            return@coroutineScope
+            return@coroutineScope null
         }
         val gattScope = object : BluetoothLe.GattClientScope {
-            suspend fun run() {
-                try {
-                    tasks.consumeEach { task ->
-                        currentTask = task
-                        task.taskBlock()
-                        task.finished.await()
-                        currentTask = null
-                    }
-                } finally {
-                    finished.complete()
-                    bluetoothGatt.close()
-                    bluetoothGatt.disconnect()
+            val taskMutex = Mutex()
+            suspend fun<R> runTask(block: suspend () -> R): R {
+                taskMutex.withLock {
+                    return block()
                 }
             }
 
@@ -197,17 +180,14 @@
 
             override suspend fun readCharacteristic(characteristic: BluetoothGattCharacteristic):
                 Result<ByteArray> {
-                val task = ClientTask {
+                return runTask {
                     bluetoothGatt.readCharacteristic(characteristic)
-                }
-                tasks.send(task)
-                while (true) {
-                    val res = task.callbackChannel.receive()
-                    if (res !is ClientCallback.OnCharacteristicRead) continue
-                    if (res.characteristic != characteristic) continue
+                    val res = takeMatchingResult<CallbackResult.OnCharacteristicRead>(
+                        callbackResultsFlow) {
+                        it.characteristic == characteristic
+                    }
 
-                    task.finished.complete(res.status == GATT_SUCCESS)
-                    return if (res.status == GATT_SUCCESS) Result.success(res.value)
+                    if (res.status == GATT_SUCCESS) Result.success(res.value)
                     else Result.failure(RuntimeException("fail"))
                 }
             }
@@ -217,17 +197,13 @@
                 value: ByteArray,
                 writeType: Int
             ): Result<Unit> {
-               val task = ClientTask {
-                   bluetoothGatt.writeCharacteristic(characteristic, value, writeType)
-                }
-                tasks.send(task)
-                while (true) {
-                    val res = task.callbackChannel.receive()
-                    if (res !is ClientCallback.OnCharacteristicWrite) continue
-                    if (res.characteristic.uuid != characteristic.uuid) continue
-
-                    task.finished.complete(res.status == GATT_SUCCESS)
-                    return if (res.status == GATT_SUCCESS) Result.success(Unit)
+                return runTask {
+                    bluetoothGatt.writeCharacteristic(characteristic, value, writeType)
+                    val res = takeMatchingResult<CallbackResult.OnCharacteristicWrite>(
+                        callbackResultsFlow) {
+                        it.characteristic == characteristic
+                    }
+                    if (res.status == GATT_SUCCESS) Result.success(Unit)
                     else Result.failure(RuntimeException("fail"))
                 }
             }
@@ -241,29 +217,27 @@
                         override fun onCharacteristicNotification(value: ByteArray) {
                             trySend(value)
                         }
+                        override fun finish() {
+                            cancel("finished")
+                        }
                     }
                     if (!registerSubscribeListener(characteristic, listener)) {
-                        cancel(CancellationException("already subscribed"))
+                        cancel("already subscribed")
                     }
 
-                    val task = ClientTask {
+                    runTask {
                         bluetoothGatt.setCharacteristicNotification(characteristic, /*enable=*/true)
                         bluetoothGatt.writeDescriptor(
                             cccd,
                             BluetoothGattDescriptor.ENABLE_NOTIFICATION_VALUE
                         )
-                    }
-                    tasks.send(task)
-                    while (true) {
-                        val res = task.callbackChannel.receive()
-                        if (res !is ClientCallback.OnDescriptorWrite) continue
-                        if (res.descriptor != cccd) continue
-
-                        task.finished.complete(res.status == GATT_SUCCESS)
+                        val res = takeMatchingResult<CallbackResult.OnDescriptorWrite>(
+                            callbackResultsFlow) {
+                            it.descriptor == cccd
+                        }
                         if (res.status != GATT_SUCCESS) {
                             cancel(CancellationException("failed to set notification"))
                         }
-                        break
                     }
 
                     this.awaitClose {
@@ -282,8 +256,12 @@
 
             override suspend fun awaitClose(onClosed: () -> Unit) {
                 try {
-                    tasks.close()
-                    finished.join()
+                    // Wait for queued tasks done
+                    taskMutex.withLock {
+                        subscribeMutex.withLock {
+                            subscribeMap.values.forEach { it.finish() }
+                        }
+                    }
                 } finally {
                     onClosed()
                 }
@@ -310,13 +288,13 @@
                 }
             }
         }
-        coroutineScope {
-            launch {
-                gattScope.run()
-            }
-            launch {
-                gattScope.block()
-            }
-        }
+        gattScope.block()
+    }
+
+    private suspend inline fun<reified R : CallbackResult> takeMatchingResult(
+        flow: SharedFlow<CallbackResult>,
+        crossinline predicate: (R) -> Boolean
+    ): R {
+        return flow.filter { it is R && predicate(it) }.first() as R
     }
 }
diff --git a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt
index ef8e022..24f3597 100644
--- a/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt
+++ b/bluetooth/integration-tests/testapp/src/main/java/androidx/bluetooth/integration/testapp/ui/home/HomeFragment.kt
@@ -41,7 +41,6 @@
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.Job
 import kotlinx.coroutines.flow.first
-import kotlinx.coroutines.joinAll
 import kotlinx.coroutines.launch
 
 class HomeFragment : Fragment() {
@@ -140,30 +139,26 @@
         connectJob?.cancel()
         connectJob = connectScope.launch {
             bluetoothLe.connectGatt(requireContext(), scanResult.device) {
-                launch {
-                    val jobs = ArrayList<Job>()
-                    for (srv in getServices()) {
-                        for (char in srv.characteristics) {
-                            if (char.properties.and(PROPERTY_READ) == 0) continue
-                            jobs.add(launch {
-                                val value = readCharacteristic(char).getOrNull()
-                                if (value != null) {
-                                    Log.d(TAG, "Successfully read characteristic value=$value")
-                                }
-                            })
+                for (srv in getServices()) {
+                    for (char in srv.characteristics) {
+                        if (char.properties.and(PROPERTY_READ) == 0) continue
+                        launch {
+                            val value = readCharacteristic(char).getOrNull()
+                            if (value != null) {
+                                Log.d(TAG, "Successfully read characteristic value=$value")
+                            }
+                        }
+                        launch {
                             if (char.properties.and(PROPERTY_NOTIFY) != 0) {
-                                jobs.add(launch {
-                                    val value = subscribeToCharacteristic(char).first()
-                                    Log.d(TAG, "Successfully get characteristic value=$value")
-                                })
+                                val value = subscribeToCharacteristic(char).first()
+                                Log.d(TAG, "Successfully get characteristic value=$value")
                             }
                         }
                     }
-                    jobs.joinAll()
-                    awaitClose {
-                        Log.d(TAG, "GATT client is closed")
-                        connectJob = null
-                    }
+                }
+                awaitClose {
+                    Log.d(TAG, "GATT client is closed")
+                    connectJob = null
                 }
             }
         }