Add BufferedAudioStream

The BufferedAudioStream can add functionality to another
AudioStream, the ability to buffer input audio data and to
decouple audio data producing with consuming.

Bug: 287935624
Test: BufferedAudioStreamTest
Change-Id: I4a32fc0e3d4deda964833836e455901985dc1d6f
diff --git a/camera/camera-core/src/main/java/androidx/camera/core/impl/utils/executor/AudioExecutor.java b/camera/camera-core/src/main/java/androidx/camera/core/impl/utils/executor/AudioExecutor.java
new file mode 100644
index 0000000..8359836
--- /dev/null
+++ b/camera/camera-core/src/main/java/androidx/camera/core/impl/utils/executor/AudioExecutor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.core.impl.utils.executor;
+
+import android.os.Process;
+
+import androidx.annotation.NonNull;
+import androidx.annotation.RequiresApi;
+import androidx.camera.core.CameraXThreads;
+
+import java.util.Locale;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A singleton executor which is suitable for audio I/O tasks.
+ */
+@RequiresApi(21) // TODO(b/200306659): Remove and replace with annotation on package-info.java
+public class AudioExecutor implements Executor {
+    private static volatile Executor sExecutor;
+
+    private final ExecutorService mAudioService =
+            Executors.newFixedThreadPool(
+                    2,
+                    new ThreadFactory() {
+                        private static final String THREAD_NAME_STEM =
+                                CameraXThreads.TAG + "camerax_audio_%d";
+
+                        private final AtomicInteger mThreadId = new AtomicInteger(0);
+
+                        @Override
+                        public Thread newThread(final Runnable r) {
+                            Runnable wrapper = () -> {
+                                Process.setThreadPriority(Process.THREAD_PRIORITY_AUDIO);
+                                r.run();
+                            };
+                            Thread t = new Thread(wrapper);
+                            t.setName(
+                                    String.format(
+                                            Locale.US,
+                                            THREAD_NAME_STEM,
+                                            mThreadId.getAndIncrement()));
+                            return t;
+                        }
+                    });
+
+    static Executor getInstance() {
+        if (sExecutor != null) {
+            return sExecutor;
+        }
+        synchronized (AudioExecutor.class) {
+            if (sExecutor == null) {
+                sExecutor = new AudioExecutor();
+            }
+        }
+
+        return sExecutor;
+    }
+
+    @Override
+    public void execute(@NonNull Runnable command) {
+        mAudioService.execute(command);
+    }
+}
diff --git a/camera/camera-core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java b/camera/camera-core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java
index 4d6d86d..8af3016 100644
--- a/camera/camera-core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java
+++ b/camera/camera-core/src/main/java/androidx/camera/core/impl/utils/executor/CameraXExecutors.java
@@ -47,6 +47,12 @@
         return IoExecutor.getInstance();
     }
 
+    /** Returns a cached {@link Executor} suitable for audio I/O. */
+    @NonNull
+    public static Executor audioExecutor() {
+        return AudioExecutor.getInstance();
+    }
+
     /** Returns a cached executor that runs tasks directly from the calling thread. */
     @NonNull
     public static Executor directExecutor() {
diff --git a/camera/camera-video/src/main/java/androidx/camera/video/internal/audio/BufferedAudioStream.java b/camera/camera-video/src/main/java/androidx/camera/video/internal/audio/BufferedAudioStream.java
new file mode 100644
index 0000000..2c1c122
--- /dev/null
+++ b/camera/camera-video/src/main/java/androidx/camera/video/internal/audio/BufferedAudioStream.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.video.internal.audio;
+
+import static androidx.camera.video.internal.audio.AudioUtils.frameCountToDurationNs;
+import static androidx.camera.video.internal.audio.AudioUtils.sizeToFrameCount;
+import static androidx.core.util.Preconditions.checkArgument;
+import static androidx.core.util.Preconditions.checkState;
+
+import androidx.annotation.GuardedBy;
+import androidx.annotation.NonNull;
+import androidx.annotation.Nullable;
+import androidx.annotation.RequiresApi;
+import androidx.camera.core.Logger;
+import androidx.camera.core.impl.annotation.ExecutedBy;
+import androidx.camera.core.impl.utils.executor.CameraXExecutors;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The class implements a buffered AudioStream.
+ *
+ * <p>A BufferedAudioStream adds functionality to another AudioStream, the ability to buffer the
+ * input audio data and to decouple audio data producing with consuming. When the
+ * BufferedAudioStream is created, an internal buffer queue is created. The queue's size is limited
+ * to prevent memory from being overused. When the queue's size exceeds the limit, the oldest
+ * cached data will be discarded.
+ *
+ * <p>This class is not thread safe, it should be used on the same thread.
+ */
+@RequiresApi(21) // TODO(b/200306659): Remove and replace with annotation on package-info.java
+public class BufferedAudioStream implements AudioStream {
+
+    private static final String TAG = "BufferedAudioStream";
+    private static final int DEFAULT_BUFFER_SIZE_IN_FRAME = 1024;
+    private static final int DEFAULT_QUEUE_SIZE = 500;
+
+    private final AtomicBoolean mIsStarted = new AtomicBoolean(false);
+    private final AtomicBoolean mIsReleased = new AtomicBoolean(false);
+    @GuardedBy("mLock")
+    private final Queue<AudioData> mAudioDataQueue = new ConcurrentLinkedQueue<>();
+    private final Executor mProducerExecutor = CameraXExecutors.newSequentialExecutor(
+            CameraXExecutors.audioExecutor());
+    private final Object mLock = new Object();
+    @GuardedBy("mLock")
+    @Nullable
+    private AudioData mAudioDataNotFullyRead = null;
+
+    ////////////////////////////////////////////////////////////////////////////////////////////////
+    //                      Members only accessed on mProducerExecutor                            //
+    ////////////////////////////////////////////////////////////////////////////////////////////////
+    private final AudioStream mAudioStream;
+    private final int mBytesPerFrame;
+    private final int mSampleRate;
+    private final int mQueueMaxSize;
+    private final AtomicBoolean mIsCollectingAudioData = new AtomicBoolean(false);
+    private int mBufferSize;
+
+    public BufferedAudioStream(@NonNull AudioStream audioStream,
+            @NonNull AudioSettings audioSettings) {
+        mAudioStream = audioStream;
+        mBytesPerFrame = audioSettings.getBytesPerFrame();
+        mSampleRate = audioSettings.getSampleRate();
+
+        checkArgument(mBytesPerFrame > 0L, "mBytesPerFrame must be greater than 0.");
+        checkArgument(mSampleRate > 0L, "mSampleRate must be greater than 0.");
+
+        mQueueMaxSize = DEFAULT_QUEUE_SIZE;
+        mBufferSize = DEFAULT_BUFFER_SIZE_IN_FRAME * mBytesPerFrame;
+    }
+
+    @Override
+    public void start() throws AudioStreamException, IllegalStateException {
+        checkNotReleasedOrThrow();
+        if (mIsStarted.getAndSet(true)) {
+            return;
+        }
+
+        // Start internal audio data collection.
+        RunnableFuture<Void> startTask = new FutureTask<>(() -> {
+            try {
+                mAudioStream.start();
+                startCollectingAudioData();
+            } catch (AudioStreamException e) {
+                throw new RuntimeException(e);
+            }
+        }, null);
+        mProducerExecutor.execute(startTask);
+
+        // Wait for the internal audio stream to start.
+        try {
+            startTask.get();
+        } catch (InterruptedException | ExecutionException e) {
+            mIsStarted.set(false);
+            throw new AudioStreamException(e);
+        }
+    }
+
+    @Override
+    public void stop() throws IllegalStateException {
+        checkNotReleasedOrThrow();
+        if (!mIsStarted.getAndSet(false)) {
+            return;
+        }
+
+        // Stop internal audio data collection.
+        mProducerExecutor.execute(() -> {
+            mIsCollectingAudioData.set(false);
+            mAudioStream.stop();
+            synchronized (mLock) {
+                mAudioDataNotFullyRead = null;
+                mAudioDataQueue.clear();
+            }
+        });
+    }
+
+    @Override
+    public void release() {
+        if (mIsReleased.getAndSet(true)) {
+            return;
+        }
+
+        mProducerExecutor.execute(() -> {
+            mIsCollectingAudioData.set(false);
+            mAudioStream.release();
+            synchronized (mLock) {
+                mAudioDataNotFullyRead = null;
+                mAudioDataQueue.clear();
+            }
+        });
+    }
+
+    @NonNull
+    @Override
+    public PacketInfo read(@NonNull ByteBuffer byteBuffer) {
+        checkNotReleasedOrThrow();
+        checkStartedOrThrow();
+
+        // Match collection buffer size and read buffer size to improve read efficiency.
+        updateCollectionBufferSizeAsync(byteBuffer.remaining());
+
+        PacketInfo packetInfo = PacketInfo.of(0, 0);
+        synchronized (mLock) {
+            AudioData audioData = mAudioDataNotFullyRead;
+            mAudioDataNotFullyRead = null;
+            if (audioData == null) {
+                audioData = mAudioDataQueue.poll();
+            }
+
+            if (audioData != null) {
+                packetInfo = audioData.read(byteBuffer);
+
+                if (audioData.getRemainingBufferSizeInBytes() > 0) {
+                    mAudioDataNotFullyRead = audioData;
+                }
+            } else {
+                Logger.d(TAG, "No data to read.");
+            }
+        }
+
+        return packetInfo;
+    }
+
+    @Override
+    public void setCallback(@Nullable AudioStreamCallback callback, @Nullable Executor executor) {
+        checkState(!mIsStarted.get(), "AudioStream can not be started when setCallback.");
+        checkNotReleasedOrThrow();
+        checkArgument(callback == null || executor != null,
+                "executor can't be null with non-null callback.");
+
+        mProducerExecutor.execute(() -> mAudioStream.setCallback(callback, executor));
+    }
+
+    private void checkNotReleasedOrThrow() {
+        checkState(!mIsReleased.get(), "AudioStream has been released.");
+    }
+
+    private void checkStartedOrThrow() {
+        checkState(mIsStarted.get(), "AudioStream has not been started.");
+    }
+
+    private void updateCollectionBufferSizeAsync(int bufferSize) {
+        mProducerExecutor.execute(() -> updateCollectionBufferSize(bufferSize));
+    }
+
+    @ExecutedBy("mProducerExecutor")
+    private void updateCollectionBufferSize(int bufferSize) {
+        if (mBufferSize == bufferSize) {
+            return;
+        }
+
+        // Ensure buffer size is multiple of the frame size.
+        int originalBufferSize = mBufferSize;
+        int newFrameSize = bufferSize / mBytesPerFrame;
+        mBufferSize = newFrameSize * mBytesPerFrame;
+
+        Logger.d(TAG, "Update buffer size from " + originalBufferSize + " to " + mBufferSize);
+    }
+
+    @ExecutedBy("mProducerExecutor")
+    private void startCollectingAudioData() {
+        if (mIsCollectingAudioData.getAndSet(true)) {
+            return;
+        }
+
+        collectAudioData();
+    }
+
+    @ExecutedBy("mProducerExecutor")
+    private void collectAudioData() {
+        if (!mIsCollectingAudioData.get()) {
+            return;
+        }
+
+        // Read audio data.
+        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(mBufferSize);
+        PacketInfo packetInfo = mAudioStream.read(byteBuffer);
+        AudioData audioData = new AudioData(byteBuffer, packetInfo, mBytesPerFrame, mSampleRate);
+
+        // Push audio data to the queue.
+        int queueMaxSize = mQueueMaxSize;
+        synchronized (mLock) {
+            mAudioDataQueue.offer(audioData);
+
+            // Pop audio data when the queue size exceeds the limit.
+            while (mAudioDataQueue.size() > queueMaxSize) {
+                mAudioDataQueue.poll();
+            }
+        }
+
+        // Start next data collection.
+        if (mIsCollectingAudioData.get()) {
+            mProducerExecutor.execute(this::collectAudioData);
+        }
+    }
+
+    @RequiresApi(21) // TODO(b/200306659): Remove and replace with annotation on package-info.java
+    private static class AudioData {
+
+        private final int mBytesPerFrame;
+        private final int mSampleRate;
+        private final ByteBuffer mByteBuffer;
+        private long mTimestampNs;
+
+        AudioData(@NonNull ByteBuffer byteBuffer, @NonNull PacketInfo packetInfo,
+                int bytesPerFrame, int sampleRate) {
+            // Make the buffer ready for reading.
+            byteBuffer.rewind();
+
+            // Check if byte buffer match with packet info.
+            int bufferSize = byteBuffer.limit() - byteBuffer.position();
+            if (bufferSize != packetInfo.getSizeInBytes()) {
+                throw new IllegalStateException(
+                        "Byte buffer size is not match with packet info: " + bufferSize + " != "
+                                + packetInfo.getSizeInBytes());
+            }
+
+            mBytesPerFrame = bytesPerFrame;
+            mSampleRate = sampleRate;
+            mByteBuffer = byteBuffer;
+            mTimestampNs = packetInfo.getTimestampNs();
+        }
+
+        public int getRemainingBufferSizeInBytes() {
+            return mByteBuffer.remaining();
+        }
+
+        public PacketInfo read(@NonNull ByteBuffer byteBuffer) {
+            long timestampNs = mTimestampNs;
+
+            // Check the read size, read data and handle timestamp for the remaining data.
+            int readSizeInBytes;
+            int originalSourcePosition = mByteBuffer.position();
+            int originalDestinationPosition = byteBuffer.position();
+            if (mByteBuffer.remaining() > byteBuffer.remaining()) {
+                readSizeInBytes = byteBuffer.remaining();
+
+                // Update the next timestamp to the start of the unread part.
+                long readFrames = sizeToFrameCount(readSizeInBytes, mBytesPerFrame);
+                long readDurationNs = frameCountToDurationNs(readFrames, mSampleRate);
+                mTimestampNs += readDurationNs;
+
+                // Use the duplicated byte buffer to put data into the destination to limit the
+                // read size and to not corrupt the source.
+                ByteBuffer duplicatedByteBuffer = mByteBuffer.duplicate();
+                duplicatedByteBuffer.position(originalSourcePosition)
+                        .limit(originalSourcePosition + readSizeInBytes);
+                byteBuffer.put(duplicatedByteBuffer)
+                        .limit(originalDestinationPosition + readSizeInBytes)
+                        .position(originalDestinationPosition);
+
+            } else {
+                readSizeInBytes = mByteBuffer.remaining();
+
+                // Put data into byte buffer.
+                byteBuffer.put(mByteBuffer)
+                        .limit(originalDestinationPosition + readSizeInBytes)
+                        .position(originalDestinationPosition);
+            }
+
+            // Point to the start of the unread part.
+            mByteBuffer.position(originalSourcePosition + readSizeInBytes);
+
+            return PacketInfo.of(readSizeInBytes, timestampNs);
+        }
+    }
+}
diff --git a/camera/camera-video/src/test/java/androidx/camera/video/internal/audio/BufferedAudioStreamTest.kt b/camera/camera-video/src/test/java/androidx/camera/video/internal/audio/BufferedAudioStreamTest.kt
new file mode 100644
index 0000000..8b8ce98
--- /dev/null
+++ b/camera/camera-video/src/test/java/androidx/camera/video/internal/audio/BufferedAudioStreamTest.kt
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.video.internal.audio
+
+import android.media.AudioFormat
+import android.media.MediaRecorder
+import android.os.Build
+import androidx.camera.core.impl.utils.executor.CameraXExecutors
+import androidx.camera.testing.mocks.helpers.CallTimes
+import androidx.camera.testing.mocks.helpers.CallTimesAtLeast
+import com.google.common.truth.Truth.assertThat
+import java.nio.ByteBuffer
+import org.junit.After
+import org.junit.Assert.assertThrows
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.robolectric.RobolectricTestRunner
+import org.robolectric.annotation.Config
+import org.robolectric.annotation.internal.DoNotInstrument
+
+@RunWith(RobolectricTestRunner::class)
+@DoNotInstrument
+@Config(minSdk = Build.VERSION_CODES.LOLLIPOP)
+class BufferedAudioStreamTest {
+
+    companion object {
+        private const val COMMON_TIMEOUT_MS = 1000L
+        private const val SAMPLE_RATE = 44100
+        private const val AUDIO_SOURCE = MediaRecorder.AudioSource.CAMCORDER
+        private const val CHANNEL_COUNT = 1
+        private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT
+        private const val SOURCE_BYTE_BUFFER_CAPACITY = 16
+        private const val SOURCE_TIMESTAMP_OFFSET = 1L
+        private const val DESTINATION_BYTE_BUFFER_CAPACITY = 16
+    }
+
+    private lateinit var byteBuffer: ByteBuffer
+    private lateinit var baseAudioStream: FakeAudioStream
+    private lateinit var bufferedAudioStream: BufferedAudioStream
+    private lateinit var audioStreamCallback: FakeAudioStreamCallback
+
+    @Before
+    fun setUp() {
+        val audioSettings = AudioSettings.builder()
+            .setAudioSource(AUDIO_SOURCE)
+            .setSampleRate(SAMPLE_RATE)
+            .setChannelCount(CHANNEL_COUNT)
+            .setAudioFormat(AUDIO_FORMAT)
+            .build()
+        baseAudioStream = FakeAudioStream(createAudioDataProvider())
+        bufferedAudioStream = BufferedAudioStream(baseAudioStream, audioSettings)
+        audioStreamCallback = FakeAudioStreamCallback()
+        bufferedAudioStream.setCallback(audioStreamCallback, CameraXExecutors.ioExecutor())
+        byteBuffer = ByteBuffer.allocate(DESTINATION_BYTE_BUFFER_CAPACITY)
+    }
+
+    @After
+    fun tearDown() {
+        if (this::bufferedAudioStream.isInitialized) {
+            bufferedAudioStream.release()
+        }
+    }
+
+    @Test
+    fun readBeforeStart_throwException() {
+        assertThrows(IllegalStateException::class.java) {
+            bufferedAudioStream.read(byteBuffer)
+        }
+    }
+
+    @Test
+    fun readAfterStop_throwException() {
+        bufferedAudioStream.start()
+        bufferedAudioStream.stop()
+        assertThrows(IllegalStateException::class.java) {
+            bufferedAudioStream.read(byteBuffer)
+        }
+    }
+
+    @Test
+    fun startAfterReleased_throwException() {
+        bufferedAudioStream.release()
+        assertThrows(IllegalStateException::class.java) {
+            bufferedAudioStream.start()
+        }
+    }
+
+    @Test
+    fun setCallbackAfterStarted_throwException() {
+        bufferedAudioStream.start()
+        assertThrows(IllegalStateException::class.java) {
+            bufferedAudioStream.setCallback(audioStreamCallback, CameraXExecutors.ioExecutor())
+        }
+    }
+
+    @Test
+    fun setCallbackAfterReleased_throwException() {
+        bufferedAudioStream.release()
+        assertThrows(IllegalStateException::class.java) {
+            bufferedAudioStream.setCallback(audioStreamCallback, CameraXExecutors.ioExecutor())
+        }
+    }
+
+    @Test
+    fun canReadAudioStream() {
+        // Act.
+        bufferedAudioStream.start()
+
+        // Assert.
+        bufferedAudioStream.verifyMultipleReads(10, byteBuffer)
+
+        // Clean up.
+        bufferedAudioStream.stop()
+    }
+
+    @Test
+    fun canReadAudioStreamWhenDestinationBufferSizeIsSmaller() {
+        // Act.
+        bufferedAudioStream.start()
+
+        // Assert.
+        val destinationByteBuffer = ByteBuffer.allocate(SOURCE_BYTE_BUFFER_CAPACITY - 5)
+        bufferedAudioStream.verifyMultipleReads(10, destinationByteBuffer)
+
+        // Clean up.
+        bufferedAudioStream.stop()
+    }
+
+    @Test
+    fun canReadAudioStreamWhenDestinationBufferSizeIsLarger() {
+        // Act.
+        bufferedAudioStream.start()
+
+        // Assert.
+        val destinationByteBuffer = ByteBuffer.allocate(SOURCE_BYTE_BUFFER_CAPACITY + 5)
+        bufferedAudioStream.verifyMultipleReads(10, destinationByteBuffer)
+
+        // Clean up.
+        bufferedAudioStream.stop()
+    }
+
+    @Test
+    fun canRestartAudioStream() {
+        repeat(2) {
+            // Act.
+            bufferedAudioStream.start()
+
+            // Assert.
+            bufferedAudioStream.verifyMultipleReads(3, byteBuffer)
+
+            // Act.
+            bufferedAudioStream.stop()
+
+            // Assert.
+            assertThrows(IllegalStateException::class.java) {
+                byteBuffer.clear()
+                bufferedAudioStream.read(byteBuffer)
+            }
+        }
+    }
+
+    @Test
+    fun canReceiveOnSilenceStateChangedAfterStarted() {
+        // Act.
+        bufferedAudioStream.start()
+
+        // Assert: Initial isSilenced of FakeAudioStream is always false.
+        audioStreamCallback.verifyOnSilenceStateChangedCall(CallTimes(1), COMMON_TIMEOUT_MS) {
+            assertThat(it.first()).isFalse()
+        }
+
+        // Clean up.
+        bufferedAudioStream.stop()
+    }
+
+    private fun AudioStream.verifyMultipleReads(verifyTimes: Int, byteBuffer: ByteBuffer) {
+        repeat(verifyTimes) { index ->
+            // Since the audio data producer and consumer are not on the same thread, waiting for the
+            // baseAudioStream to be read to ensure that the BufferAudioStream has at least one
+            // AudioData that can be read.
+            baseAudioStream.verifyReadCall(CallTimesAtLeast(index + 2), COMMON_TIMEOUT_MS)
+
+            // Assert.
+            byteBuffer.clear()
+            this.readAndVerify(byteBuffer)
+        }
+    }
+
+    private fun AudioStream.readAndVerify(byteBuffer: ByteBuffer) {
+        val packetInfo = this.read(byteBuffer)
+
+        // Assert.
+        assertThat(packetInfo.sizeInBytes).isGreaterThan(0)
+        assertThat(packetInfo.timestampNs).isGreaterThan(0)
+    }
+
+    private fun createAudioDataProvider(): (Int) -> FakeAudioStream.AudioData = { index ->
+        val byteBuffer = ByteBuffer.allocate(SOURCE_BYTE_BUFFER_CAPACITY).put(0, index.toByte())
+        val timestampNs = (index + SOURCE_TIMESTAMP_OFFSET)
+        FakeAudioStream.AudioData(byteBuffer, timestampNs)
+    }
+}
\ No newline at end of file
diff --git a/camera/camera-video/src/test/java/androidx/camera/video/internal/audio/FakeAudioStream.kt b/camera/camera-video/src/test/java/androidx/camera/video/internal/audio/FakeAudioStream.kt
index 2b12cf6..74b6644 100644
--- a/camera/camera-video/src/test/java/androidx/camera/video/internal/audio/FakeAudioStream.kt
+++ b/camera/camera-video/src/test/java/androidx/camera/video/internal/audio/FakeAudioStream.kt
@@ -23,6 +23,7 @@
 import androidx.core.util.Preconditions.checkArgument
 import java.nio.ByteBuffer
 import java.util.concurrent.Executor
+import kotlin.math.min
 
 @RequiresApi(21)
 class FakeAudioStream(
@@ -42,6 +43,7 @@
     private val startCalls = MockConsumer<Unit>()
     private val stopCalls = MockConsumer<Unit>()
     private val releaseCalls = MockConsumer<Unit>()
+    private val readCalls = MockConsumer<Unit>()
     private var bufferIndex = 0
     private var isReleased = false
     private var isStarted = false
@@ -74,6 +76,7 @@
         isStarted = false
         exceptionOnStartTimes = 0
         stopCalls.accept(Unit)
+        readCalls.clearAcceptCalls()
     }
 
     override fun release() {
@@ -90,13 +93,19 @@
         }
         val audioData = audioDataProvider.invoke(bufferIndex++)
         _audioDataList.add(audioData)
-        val packet = PacketInfo.of(audioData.byteBuffer.remaining(), audioData.timestampNs)
+        val readSizeInByte = min(audioData.byteBuffer.remaining(), byteBuffer.remaining())
+        val packet = PacketInfo.of(readSizeInByte, audioData.timestampNs)
         if (packet.sizeInBytes > 0) {
+            // Duplicate and limit source size to prevent BufferOverflowException.
+            val sourceByteBuffer = audioData.byteBuffer.duplicate()
+            sourceByteBuffer.limit(readSizeInByte)
+
             val originalPosition = byteBuffer.position()
-            byteBuffer.put(audioData.byteBuffer)
+            byteBuffer.put(sourceByteBuffer)
             byteBuffer.limit(byteBuffer.position())
             byteBuffer.position(originalPosition)
         }
+        readCalls.accept(Unit)
         return packet
     }
 
@@ -147,6 +156,17 @@
         callTimes,
     )
 
+    fun verifyReadCall(
+        callTimes: CallTimes,
+        timeoutMs: Long = MockConsumer.NO_TIMEOUT,
+        inOder: Boolean = false
+    ) = readCalls.verifyAcceptCall(
+        Unit::class.java,
+        inOder,
+        timeoutMs,
+        callTimes,
+    )
+
     private fun notifySilence() {
         if (!isStarted || isReleased) {
             return