]> git.proxmox.com Git - pxar.git/commitdiff
make ReadAt trait more correct
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Wed, 24 Jun 2020 08:11:41 +0000 (10:11 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Wed, 24 Jun 2020 09:53:32 +0000 (11:53 +0200)
A simple `poll_read_at` with an immutable self reference
lacks the information which poll *operation* is being
polled. An associated type won't work well enough
(particularly with trait objects and lifetimes), and GATs
are unstable (and not advanced enough yet), so we need to
improvise.

To be more async-friendly, the `start_read_at()` method's
Pending now includes a reference to the operation which also
grabs the buffer lifetime, and has to be completed with
`poll_complete()`.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/accessor/aio.rs
src/accessor/mod.rs
src/accessor/read_at.rs [new file with mode: 0644]
src/accessor/sync.rs
src/util.rs

index 5f16d73f4ab015e3ddf56eebef0609a152378168..8e20dd2f092face9bc2517f50b710d2a7d8005f5 100644 (file)
@@ -4,7 +4,9 @@
 //!
 //! TODO: Implement a locking version for AsyncSeek+AsyncRead files?
 
+use std::future::Future;
 use std::io;
+use std::mem;
 use std::ops::Range;
 use std::os::unix::fs::FileExt;
 use std::path::Path;
@@ -12,10 +14,10 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::accessor::{self, cache::Cache, ReadAt};
+use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
 use crate::decoder::aio::Decoder;
 use crate::format::GoodbyeItem;
-use crate::poll_fn::poll_fn;
+use crate::util;
 use crate::Entry;
 
 use super::sync::{FileReader, FileRefReader};
@@ -134,6 +136,8 @@ impl<T: Clone + ReadAt> Accessor<T> {
         FileContents {
             inner: self.inner.open_contents_at_range(range),
             at: 0,
+            buffer: Vec::new(),
+            future: None,
         }
     }
 
@@ -218,6 +222,8 @@ impl<T: Clone + ReadAt> FileEntry<T> {
         Ok(FileContents {
             inner: self.inner.contents().await?,
             at: 0,
+            buffer: Vec::new(),
+            future: None,
         })
     }
 
@@ -311,6 +317,60 @@ impl<'a, T: Clone + ReadAt> DirEntry<'a, T> {
 pub struct FileContents<T> {
     inner: accessor::FileContentsImpl<T>,
     at: u64,
+    buffer: Vec<u8>,
+    future: Option<Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>> + 'static>>>,
+}
+
+// We lose `Send` via the boxed trait object and don't want to force the trait object to
+// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync
+// depending on T.
+unsafe impl<T: Send> Send for FileContents<T> {}
+unsafe impl<T: Sync> Sync for FileContents<T> {}
+
+#[cfg(any(feature = "futures-io", feature = "tokio-io"))]
+impl<T: Clone + ReadAt> FileContents<T> {
+    /// Similar implementation exists for SeqReadAtAdapter in mod.rs
+    fn do_poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        dest: &mut [u8],
+    ) -> Poll<io::Result<usize>> {
+        let this = unsafe { Pin::into_inner_unchecked(self) };
+        loop {
+            match this.future.take() {
+                None => {
+                    let mut buffer = mem::take(&mut this.buffer);
+                    util::scale_read_buffer(&mut buffer, dest.len());
+                    let reader: accessor::FileContentsImpl<T> = this.inner.clone();
+                    let at = this.at;
+                    let future: Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>>>> =
+                        Box::pin(async move {
+                            let got = reader.read_at(&mut buffer, at).await?;
+                            io::Result::Ok((got, buffer))
+                        });
+                    // This future has the lifetime from T. Self also has this lifetime and we
+                    // store this in a pinned self. T maybe a reference with a non-'static life
+                    // time, but then it cannot be a self-reference into Self, so this should be
+                    // valid in all cases:
+                    this.future = Some(unsafe { mem::transmute(future) });
+                }
+                Some(mut fut) => match fut.as_mut().poll(cx) {
+                    Poll::Pending => {
+                        this.future = Some(fut);
+                        return Poll::Pending;
+                    }
+                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+                    Poll::Ready(Ok((got, buffer))) => {
+                        this.buffer = buffer;
+                        this.at += got as u64;
+                        let len = got.min(dest.len());
+                        dest[..len].copy_from_slice(&this.buffer[..len]);
+                        return Poll::Ready(Ok(len));
+                    }
+                },
+            }
+        }
+    }
 }
 
 #[cfg(feature = "futures-io")]
@@ -320,16 +380,7 @@ impl<T: Clone + ReadAt> futures::io::AsyncRead for FileContents<T> {
         cx: &mut Context,
         buf: &mut [u8],
     ) -> Poll<io::Result<usize>> {
-        let inner = unsafe { Pin::new_unchecked(&self.inner) };
-        match inner.poll_read_at(cx, buf, self.at) {
-            Poll::Ready(Ok(got)) => {
-                unsafe {
-                    self.get_unchecked_mut().at += got as u64;
-                }
-                Poll::Ready(Ok(got))
-            }
-            other => other,
-        }
+        Self::do_poll_read(self, cx, buf)
     }
 }
 
@@ -340,34 +391,31 @@ impl<T: Clone + ReadAt> tokio::io::AsyncRead for FileContents<T> {
         cx: &mut Context,
         buf: &mut [u8],
     ) -> Poll<io::Result<usize>> {
-        let inner = unsafe { Pin::new_unchecked(&self.inner) };
-        match inner.poll_read_at(cx, buf, self.at) {
-            Poll::Ready(Ok(got)) => {
-                unsafe {
-                    self.get_unchecked_mut().at += got as u64;
-                }
-                Poll::Ready(Ok(got))
-            }
-            other => other,
-        }
+        Self::do_poll_read(self, cx, buf)
     }
 }
 
 impl<T: Clone + ReadAt> ReadAt for FileContents<T> {
-    fn poll_read_at(
-        self: Pin<&Self>,
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
         cx: &mut Context,
-        buf: &mut [u8],
+        buf: &'a mut [u8],
         offset: u64,
-    ) -> Poll<io::Result<usize>> {
-        unsafe { self.map_unchecked(|this| &this.inner) }.poll_read_at(cx, buf, offset)
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset)
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        unsafe { self.map_unchecked(|this| &this.inner) }.poll_complete(op)
     }
 }
 
 impl<T: Clone + ReadAt> FileContents<T> {
     /// Convenience helper for `read_at`:
     pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
-        let this = unsafe { Pin::new_unchecked(self) };
-        poll_fn(move |cx| this.poll_read_at(cx, buf, offset)).await
+        self.inner.read_at(buf, offset).await
     }
 }
index 71bbd6bfce0b4853ddb87641f83f7cb43b95fe01..cf3409af2212100468deb80933ec87aae428979b 100644 (file)
@@ -1,6 +1,7 @@
 //! Random access for PXAR files.
 
 use std::ffi::{OsStr, OsString};
+use std::future::Future;
 use std::io;
 use std::mem::{self, size_of, size_of_val, MaybeUninit};
 use std::ops::Range;
@@ -15,7 +16,6 @@ use endian_trait::Endian;
 use crate::binary_tree_array;
 use crate::decoder::{self, DecoderImpl};
 use crate::format::{self, GoodbyeItem};
-use crate::poll_fn::poll_fn;
 use crate::util;
 use crate::{Entry, EntryKind};
 
@@ -23,9 +23,14 @@ pub mod aio;
 pub mod cache;
 pub mod sync;
 
+pub mod read_at;
+
 #[doc(inline)]
 pub use sync::{Accessor, DirEntry, Directory, FileEntry, ReadDir};
 
+#[doc(inline)]
+pub use read_at::{MaybeReady, ReadAt, ReadAtExt, ReadAtOperation};
+
 use cache::Cache;
 
 /// Range information used for unsafe raw random access:
@@ -44,28 +49,18 @@ impl EntryRangeInfo {
     }
 }
 
-/// Random access read implementation.
-pub trait ReadAt {
-    fn poll_read_at(
-        self: Pin<&Self>,
-        cx: &mut Context,
-        buf: &mut [u8],
-        offset: u64,
-    ) -> Poll<io::Result<usize>>;
-}
-
-/// awaitable version of `poll_read_at`.
+/// awaitable version of `ReadAt`.
 async fn read_at<T>(input: &T, buf: &mut [u8], offset: u64) -> io::Result<usize>
 where
-    T: ReadAt + ?Sized,
+    T: ReadAtExt,
 {
-    poll_fn(|cx| unsafe { Pin::new_unchecked(input).poll_read_at(cx, buf, offset) }).await
+    input.read_at(buf, offset).await
 }
 
 /// `read_exact_at` - since that's what we _actually_ want most of the time.
 async fn read_exact_at<T>(input: &T, mut buf: &mut [u8], mut offset: u64) -> io::Result<()>
 where
-    T: ReadAt + ?Sized,
+    T: ReadAt,
 {
     while !buf.is_empty() {
         match read_at(input, buf, offset).await? {
@@ -82,7 +77,7 @@ where
 /// Helper to read into an `Endian`-implementing `struct`.
 async fn read_entry_at<T, E: Endian>(input: &T, offset: u64) -> io::Result<E>
 where
-    T: ReadAt + ?Sized,
+    T: ReadAt,
 {
     let mut data = MaybeUninit::<E>::uninit();
     let buf =
@@ -94,7 +89,7 @@ where
 /// Helper to read into an allocated byte vector.
 async fn read_exact_data_at<T>(input: &T, size: usize, offset: u64) -> io::Result<Vec<u8>>
 where
-    T: ReadAt + ?Sized,
+    T: ReadAt,
 {
     let mut data = util::vec_new(size);
     read_exact_at(input, &mut data[..], offset).await?;
@@ -102,14 +97,21 @@ where
 }
 
 /// Allow using trait objects for `T: ReadAt`
-impl<'a> ReadAt for &(dyn ReadAt + 'a) {
-    fn poll_read_at(
-        self: Pin<&Self>,
+impl<'d> ReadAt for &(dyn ReadAt + 'd) {
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
         cx: &mut Context,
-        buf: &mut [u8],
+        buf: &'a mut [u8],
         offset: u64,
-    ) -> Poll<io::Result<usize>> {
-        unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) }
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        unsafe { Pin::new_unchecked(&**self).start_read_at(cx, buf, offset) }
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        unsafe { Pin::new_unchecked(&**self).poll_complete(op) }
     }
 }
 
@@ -117,13 +119,23 @@ impl<'a> ReadAt for &(dyn ReadAt + 'a) {
 /// immutable `&self`, this adds some convenience by allowing to just `Arc` any `'static` type that
 /// implemments `ReadAt` for type monomorphization.
 impl ReadAt for Arc<dyn ReadAt + Send + Sync + 'static> {
-    fn poll_read_at(
-        self: Pin<&Self>,
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
         cx: &mut Context,
-        buf: &mut [u8],
+        buf: &'a mut [u8],
         offset: u64,
-    ) -> Poll<io::Result<usize>> {
-        unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) }
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        unsafe {
+            self.map_unchecked(|this| &**this)
+                .start_read_at(cx, buf, offset)
+        }
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        unsafe { self.map_unchecked(|this| &**this).poll_complete(op) }
     }
 }
 
@@ -212,7 +224,10 @@ async fn get_decoder_at_filename<T: ReadAt>(
         io_bail!("filename exceeds current file range");
     }
 
-    Ok((get_decoder(input, entry_offset..entry_range.end, path).await?, entry_offset))
+    Ok((
+        get_decoder(input, entry_offset..entry_range.end, path).await?,
+        entry_offset,
+    ))
 }
 
 impl<T: Clone + ReadAt> AccessorImpl<T> {
@@ -777,6 +792,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
 }
 
 /// A reader for file contents.
+#[derive(Clone)]
 pub(crate) struct FileContentsImpl<T> {
     input: T,
 
@@ -810,15 +826,15 @@ impl<T: Clone + ReadAt> FileContentsImpl<T> {
 }
 
 impl<T: Clone + ReadAt> ReadAt for FileContentsImpl<T> {
-    fn poll_read_at(
-        self: Pin<&Self>,
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
         cx: &mut Context,
-        mut buf: &mut [u8],
+        mut buf: &'a mut [u8],
         offset: u64,
-    ) -> Poll<io::Result<usize>> {
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
         let size = self.file_size();
         if offset >= size {
-            return Poll::Ready(Ok(0));
+            return MaybeReady::Ready(Ok(0));
         }
         let remaining = size - offset;
 
@@ -827,7 +843,14 @@ impl<T: Clone + ReadAt> ReadAt for FileContentsImpl<T> {
         }
 
         let offset = self.range.start + offset;
-        unsafe { self.map_unchecked(|this| &this.input) }.poll_read_at(cx, buf, offset)
+        unsafe { self.map_unchecked(|this| &this.input) }.start_read_at(cx, buf, offset)
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        unsafe { self.map_unchecked(|this| &this.input) }.poll_complete(op)
     }
 }
 
@@ -835,6 +858,21 @@ impl<T: Clone + ReadAt> ReadAt for FileContentsImpl<T> {
 pub struct SeqReadAtAdapter<T> {
     input: T,
     range: Range<u64>,
+    buffer: Vec<u8>,
+    future: Option<Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>> + 'static>>>,
+}
+
+// We lose `Send` via the boxed trait object and don't want to force the trait object to
+// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync
+// depending on T.
+unsafe impl<T: Send> Send for SeqReadAtAdapter<T> {}
+unsafe impl<T: Sync> Sync for SeqReadAtAdapter<T> {}
+
+impl<T> Drop for SeqReadAtAdapter<T> {
+    fn drop(&mut self) {
+        // drop order
+        self.future = None;
+    }
 }
 
 impl<T: ReadAt> SeqReadAtAdapter<T> {
@@ -842,7 +880,12 @@ impl<T: ReadAt> SeqReadAtAdapter<T> {
         if range.end < range.start {
             panic!("BAD SEQ READ AT ADAPTER");
         }
-        Self { input, range }
+        Self {
+            input,
+            range,
+            buffer: Vec::new(),
+            future: None,
+        }
     }
 
     #[inline]
@@ -855,18 +898,48 @@ impl<T: ReadAt> decoder::SeqRead for SeqReadAtAdapter<T> {
     fn poll_seq_read(
         self: Pin<&mut Self>,
         cx: &mut Context,
-        buf: &mut [u8],
+        dest: &mut [u8],
     ) -> Poll<io::Result<usize>> {
-        let len = buf.len().min(self.remaining());
-        let buf = &mut buf[..len];
+        let len = dest.len().min(self.remaining());
+        let dest = &mut dest[..len];
 
         let this = unsafe { self.get_unchecked_mut() };
-
-        let got = ready!(unsafe {
-            Pin::new_unchecked(&this.input).poll_read_at(cx, buf, this.range.start)
-        })?;
-        this.range.start += got as u64;
-        Poll::Ready(Ok(got))
+        loop {
+            match this.future.take() {
+                None => {
+                    let mut buffer = mem::take(&mut this.buffer);
+                    util::scale_read_buffer(&mut buffer, dest.len());
+
+                    // Note that we're pinned and we have a drop-handler which forces self.future
+                    // to be dropped before `input`, so putting a reference to self.input into the
+                    // future should be ok!
+                    let reader = &this.input;
+
+                    let at = this.range.start;
+                    let future: Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>>>> =
+                        Box::pin(async move {
+                            let got = reader.read_at(&mut buffer, at).await?;
+                            io::Result::Ok((got, buffer))
+                        });
+                    // Ditch the self-reference life-time now:
+                    this.future = Some(unsafe { mem::transmute(future) });
+                }
+                Some(mut fut) => match fut.as_mut().poll(cx) {
+                    Poll::Pending => {
+                        this.future = Some(fut);
+                        return Poll::Pending;
+                    }
+                    Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+                    Poll::Ready(Ok((got, buffer))) => {
+                        this.buffer = buffer;
+                        this.range.start += got as u64;
+                        let len = got.min(dest.len());
+                        dest[..len].copy_from_slice(&this.buffer[..len]);
+                        return Poll::Ready(Ok(len));
+                    }
+                },
+            }
+        }
     }
 
     fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
diff --git a/src/accessor/read_at.rs b/src/accessor/read_at.rs
new file mode 100644 (file)
index 0000000..d49d080
--- /dev/null
@@ -0,0 +1,92 @@
+use std::any::Any;
+use std::future::Future;
+use std::io;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Like Poll but Pending yields a value.
+pub enum MaybeReady<T, F> {
+    Ready(T),
+    Pending(F),
+}
+
+/// Random access read implementation.
+pub trait ReadAt {
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
+        cx: &mut Context,
+        buf: &'a mut [u8],
+        offset: u64,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>>;
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>>;
+}
+
+pub struct ReadAtOperation<'a> {
+    pub cookie: Box<dyn Any + Send + Sync>,
+    _marker: PhantomData<&'a mut [u8]>,
+}
+
+impl<'a> ReadAtOperation<'a> {
+    pub fn new<T: Into<Box<dyn Any + Send + Sync>>>(cookie: T) -> Self {
+        Self {
+            cookie: cookie.into(),
+            _marker: PhantomData,
+        }
+    }
+}
+
+// awaitable helper:
+
+pub trait ReadAtExt: ReadAt {
+    fn read_at<'a>(&'a self, buf: &'a mut [u8], offset: u64) -> ReadAtImpl<'a, Self>
+    where
+        Self: Sized,
+    {
+        ReadAtImpl::New(self, buf, offset)
+    }
+}
+
+impl<T: ReadAt> ReadAtExt for T {}
+
+pub enum ReadAtImpl<'a, T: ReadAt> {
+    Invalid,
+    New(&'a T, &'a mut [u8], u64),
+    Pending(&'a T, ReadAtOperation<'a>),
+    Ready(io::Result<usize>),
+}
+
+impl<T: ReadAt> ReadAtImpl<'_, T> {
+    fn take(&mut self) -> Self {
+        std::mem::replace(self, ReadAtImpl::Invalid)
+    }
+}
+
+impl<'a, T: ReadAt> Future for ReadAtImpl<'a, T> {
+    type Output = io::Result<usize>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        let this = unsafe { Pin::into_inner_unchecked(self) };
+        loop {
+            match match this.take() {
+                ReadAtImpl::New(reader, buf, offset) => {
+                    let pin = unsafe { Pin::new_unchecked(reader) };
+                    (pin.start_read_at(cx, buf, offset), reader)
+                }
+                ReadAtImpl::Pending(reader, op) => {
+                    let pin = unsafe { Pin::new_unchecked(reader) };
+                    (pin.poll_complete(op), reader)
+                }
+                ReadAtImpl::Ready(out) => return Poll::Ready(out),
+                ReadAtImpl::Invalid => panic!("poll after ready"),
+            } {
+                (MaybeReady::Ready(out), _reader) => return Poll::Ready(out),
+                (MaybeReady::Pending(op), reader) => *this = ReadAtImpl::Pending(reader, op),
+            }
+        }
+    }
+}
index 3eb08cc44f141fd5801e3ff8d31b678a08ffc0f1..50334b9bd319113d577d9ff5c934339310e80abe 100644 (file)
@@ -6,9 +6,9 @@ use std::os::unix::fs::FileExt;
 use std::path::Path;
 use std::pin::Pin;
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::task::Context;
 
-use crate::accessor::{self, cache::Cache, ReadAt};
+use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
 use crate::decoder::Decoder;
 use crate::format::GoodbyeItem;
 use crate::util::poll_result_once;
@@ -153,13 +153,20 @@ impl<T: FileExt> FileReader<T> {
 }
 
 impl<T: FileExt> ReadAt for FileReader<T> {
-    fn poll_read_at(
-        self: Pin<&Self>,
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
         _cx: &mut Context,
-        buf: &mut [u8],
+        buf: &'a mut [u8],
         offset: u64,
-    ) -> Poll<io::Result<usize>> {
-        Poll::Ready(self.get_ref().inner.read_at(buf, offset))
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        MaybeReady::Ready(self.get_ref().inner.read_at(buf, offset))
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        _op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        panic!("start_read_at on sync file returned Pending");
     }
 }
 
@@ -180,13 +187,20 @@ where
     T: Clone + std::ops::Deref,
     T::Target: FileExt,
 {
-    fn poll_read_at(
-        self: Pin<&Self>,
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
         _cx: &mut Context,
-        buf: &mut [u8],
+        buf: &'a mut [u8],
         offset: u64,
-    ) -> Poll<io::Result<usize>> {
-        Poll::Ready(self.get_ref().inner.read_at(buf, offset))
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        MaybeReady::Ready(self.get_ref().inner.read_at(buf, offset))
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        _op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        panic!("start_read_at on sync file returned Pending");
     }
 }
 
@@ -383,12 +397,22 @@ impl<T: Clone + ReadAt> FileExt for FileContents<T> {
 }
 
 impl<T: Clone + ReadAt> ReadAt for FileContents<T> {
-    fn poll_read_at(
-        self: Pin<&Self>,
-        _cx: &mut Context,
-        buf: &mut [u8],
+    fn start_read_at<'a>(
+        self: Pin<&'a Self>,
+        cx: &mut Context,
+        buf: &'a mut [u8],
         offset: u64,
-    ) -> Poll<io::Result<usize>> {
-        Poll::Ready(poll_result_once(self.get_ref().inner.read_at(buf, offset)))
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        match unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset) {
+            MaybeReady::Ready(ready) => MaybeReady::Ready(ready),
+            MaybeReady::Pending(_) => panic!("start_read_at on sync file returned Pending"),
+        }
+    }
+
+    fn poll_complete<'a>(
+        self: Pin<&'a Self>,
+        _op: ReadAtOperation<'a>,
+    ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+        panic!("start_read_at on sync file returned Pending");
     }
 }
index 79a8785b96c97536fc25cf5ca65995bc23e7eb8d..7389cb2110feab489e521f21085bde16db33bc6d 100644 (file)
@@ -5,6 +5,22 @@ use std::io;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
+pub const MAX_READ_BUF_LEN: usize = 4 * 1024 * 1024;
+
+pub fn scale_read_buffer(buffer: &mut Vec<u8>, target: usize) {
+    let target = target.min(MAX_READ_BUF_LEN);
+
+    if buffer.len() >= target {
+        buffer.truncate(target);
+        return;
+    }
+
+    buffer.reserve(target - buffer.len());
+    unsafe {
+        buffer.set_len(target);
+    }
+}
+
 // from /usr/include/linux/magic.h
 // and from casync util.h
 #[rustfmt::skip]