From: Wolfgang Bumiller Date: Wed, 24 Jun 2020 08:11:41 +0000 (+0200) Subject: make ReadAt trait more correct X-Git-Url: https://git.proxmox.com/?a=commitdiff_plain;h=e72062a97a35608539cb171b96296105bd830299;p=pxar.git make ReadAt trait more correct A simple `poll_read_at` with an immutable self reference lacks the information which poll *operation* is being polled. An associated type won't work well enough (particularly with trait objects and lifetimes), and GATs are unstable (and not advanced enough yet), so we need to improvise. To be more async-friendly, the `start_read_at()` method's Pending now includes a reference to the operation which also grabs the buffer lifetime, and has to be completed with `poll_complete()`. Signed-off-by: Wolfgang Bumiller --- diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs index 5f16d73..8e20dd2 100644 --- a/src/accessor/aio.rs +++ b/src/accessor/aio.rs @@ -4,7 +4,9 @@ //! //! TODO: Implement a locking version for AsyncSeek+AsyncRead files? +use std::future::Future; use std::io; +use std::mem; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::Path; @@ -12,10 +14,10 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::accessor::{self, cache::Cache, ReadAt}; +use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation}; use crate::decoder::aio::Decoder; use crate::format::GoodbyeItem; -use crate::poll_fn::poll_fn; +use crate::util; use crate::Entry; use super::sync::{FileReader, FileRefReader}; @@ -134,6 +136,8 @@ impl Accessor { FileContents { inner: self.inner.open_contents_at_range(range), at: 0, + buffer: Vec::new(), + future: None, } } @@ -218,6 +222,8 @@ impl FileEntry { Ok(FileContents { inner: self.inner.contents().await?, at: 0, + buffer: Vec::new(), + future: None, }) } @@ -311,6 +317,60 @@ impl<'a, T: Clone + ReadAt> DirEntry<'a, T> { pub struct FileContents { inner: accessor::FileContentsImpl, at: u64, + buffer: Vec, + future: Option)>> + 'static>>>, +} + +// We lose `Send` via the boxed trait object and don't want to force the trait object to +// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync +// depending on T. +unsafe impl Send for FileContents {} +unsafe impl Sync for FileContents {} + +#[cfg(any(feature = "futures-io", feature = "tokio-io"))] +impl FileContents { + /// Similar implementation exists for SeqReadAtAdapter in mod.rs + fn do_poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + dest: &mut [u8], + ) -> Poll> { + let this = unsafe { Pin::into_inner_unchecked(self) }; + loop { + match this.future.take() { + None => { + let mut buffer = mem::take(&mut this.buffer); + util::scale_read_buffer(&mut buffer, dest.len()); + let reader: accessor::FileContentsImpl = this.inner.clone(); + let at = this.at; + let future: Pin)>>>> = + Box::pin(async move { + let got = reader.read_at(&mut buffer, at).await?; + io::Result::Ok((got, buffer)) + }); + // This future has the lifetime from T. Self also has this lifetime and we + // store this in a pinned self. T maybe a reference with a non-'static life + // time, but then it cannot be a self-reference into Self, so this should be + // valid in all cases: + this.future = Some(unsafe { mem::transmute(future) }); + } + Some(mut fut) => match fut.as_mut().poll(cx) { + Poll::Pending => { + this.future = Some(fut); + return Poll::Pending; + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Ready(Ok((got, buffer))) => { + this.buffer = buffer; + this.at += got as u64; + let len = got.min(dest.len()); + dest[..len].copy_from_slice(&this.buffer[..len]); + return Poll::Ready(Ok(len)); + } + }, + } + } + } } #[cfg(feature = "futures-io")] @@ -320,16 +380,7 @@ impl futures::io::AsyncRead for FileContents { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - let inner = unsafe { Pin::new_unchecked(&self.inner) }; - match inner.poll_read_at(cx, buf, self.at) { - Poll::Ready(Ok(got)) => { - unsafe { - self.get_unchecked_mut().at += got as u64; - } - Poll::Ready(Ok(got)) - } - other => other, - } + Self::do_poll_read(self, cx, buf) } } @@ -340,34 +391,31 @@ impl tokio::io::AsyncRead for FileContents { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - let inner = unsafe { Pin::new_unchecked(&self.inner) }; - match inner.poll_read_at(cx, buf, self.at) { - Poll::Ready(Ok(got)) => { - unsafe { - self.get_unchecked_mut().at += got as u64; - } - Poll::Ready(Ok(got)) - } - other => other, - } + Self::do_poll_read(self, cx, buf) } } impl ReadAt for FileContents { - fn poll_read_at( - self: Pin<&Self>, + fn start_read_at<'a>( + self: Pin<&'a Self>, cx: &mut Context, - buf: &mut [u8], + buf: &'a mut [u8], offset: u64, - ) -> Poll> { - unsafe { self.map_unchecked(|this| &this.inner) }.poll_read_at(cx, buf, offset) + ) -> MaybeReady, ReadAtOperation<'a>> { + unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset) + } + + fn poll_complete<'a>( + self: Pin<&'a Self>, + op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + unsafe { self.map_unchecked(|this| &this.inner) }.poll_complete(op) } } impl FileContents { /// Convenience helper for `read_at`: pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { - let this = unsafe { Pin::new_unchecked(self) }; - poll_fn(move |cx| this.poll_read_at(cx, buf, offset)).await + self.inner.read_at(buf, offset).await } } diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs index 71bbd6b..cf3409a 100644 --- a/src/accessor/mod.rs +++ b/src/accessor/mod.rs @@ -1,6 +1,7 @@ //! Random access for PXAR files. use std::ffi::{OsStr, OsString}; +use std::future::Future; use std::io; use std::mem::{self, size_of, size_of_val, MaybeUninit}; use std::ops::Range; @@ -15,7 +16,6 @@ use endian_trait::Endian; use crate::binary_tree_array; use crate::decoder::{self, DecoderImpl}; use crate::format::{self, GoodbyeItem}; -use crate::poll_fn::poll_fn; use crate::util; use crate::{Entry, EntryKind}; @@ -23,9 +23,14 @@ pub mod aio; pub mod cache; pub mod sync; +pub mod read_at; + #[doc(inline)] pub use sync::{Accessor, DirEntry, Directory, FileEntry, ReadDir}; +#[doc(inline)] +pub use read_at::{MaybeReady, ReadAt, ReadAtExt, ReadAtOperation}; + use cache::Cache; /// Range information used for unsafe raw random access: @@ -44,28 +49,18 @@ impl EntryRangeInfo { } } -/// Random access read implementation. -pub trait ReadAt { - fn poll_read_at( - self: Pin<&Self>, - cx: &mut Context, - buf: &mut [u8], - offset: u64, - ) -> Poll>; -} - -/// awaitable version of `poll_read_at`. +/// awaitable version of `ReadAt`. async fn read_at(input: &T, buf: &mut [u8], offset: u64) -> io::Result where - T: ReadAt + ?Sized, + T: ReadAtExt, { - poll_fn(|cx| unsafe { Pin::new_unchecked(input).poll_read_at(cx, buf, offset) }).await + input.read_at(buf, offset).await } /// `read_exact_at` - since that's what we _actually_ want most of the time. async fn read_exact_at(input: &T, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> where - T: ReadAt + ?Sized, + T: ReadAt, { while !buf.is_empty() { match read_at(input, buf, offset).await? { @@ -82,7 +77,7 @@ where /// Helper to read into an `Endian`-implementing `struct`. async fn read_entry_at(input: &T, offset: u64) -> io::Result where - T: ReadAt + ?Sized, + T: ReadAt, { let mut data = MaybeUninit::::uninit(); let buf = @@ -94,7 +89,7 @@ where /// Helper to read into an allocated byte vector. async fn read_exact_data_at(input: &T, size: usize, offset: u64) -> io::Result> where - T: ReadAt + ?Sized, + T: ReadAt, { let mut data = util::vec_new(size); read_exact_at(input, &mut data[..], offset).await?; @@ -102,14 +97,21 @@ where } /// Allow using trait objects for `T: ReadAt` -impl<'a> ReadAt for &(dyn ReadAt + 'a) { - fn poll_read_at( - self: Pin<&Self>, +impl<'d> ReadAt for &(dyn ReadAt + 'd) { + fn start_read_at<'a>( + self: Pin<&'a Self>, cx: &mut Context, - buf: &mut [u8], + buf: &'a mut [u8], offset: u64, - ) -> Poll> { - unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) } + ) -> MaybeReady, ReadAtOperation<'a>> { + unsafe { Pin::new_unchecked(&**self).start_read_at(cx, buf, offset) } + } + + fn poll_complete<'a>( + self: Pin<&'a Self>, + op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + unsafe { Pin::new_unchecked(&**self).poll_complete(op) } } } @@ -117,13 +119,23 @@ impl<'a> ReadAt for &(dyn ReadAt + 'a) { /// immutable `&self`, this adds some convenience by allowing to just `Arc` any `'static` type that /// implemments `ReadAt` for type monomorphization. impl ReadAt for Arc { - fn poll_read_at( - self: Pin<&Self>, + fn start_read_at<'a>( + self: Pin<&'a Self>, cx: &mut Context, - buf: &mut [u8], + buf: &'a mut [u8], offset: u64, - ) -> Poll> { - unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) } + ) -> MaybeReady, ReadAtOperation<'a>> { + unsafe { + self.map_unchecked(|this| &**this) + .start_read_at(cx, buf, offset) + } + } + + fn poll_complete<'a>( + self: Pin<&'a Self>, + op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + unsafe { self.map_unchecked(|this| &**this).poll_complete(op) } } } @@ -212,7 +224,10 @@ async fn get_decoder_at_filename( io_bail!("filename exceeds current file range"); } - Ok((get_decoder(input, entry_offset..entry_range.end, path).await?, entry_offset)) + Ok(( + get_decoder(input, entry_offset..entry_range.end, path).await?, + entry_offset, + )) } impl AccessorImpl { @@ -777,6 +792,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> { } /// A reader for file contents. +#[derive(Clone)] pub(crate) struct FileContentsImpl { input: T, @@ -810,15 +826,15 @@ impl FileContentsImpl { } impl ReadAt for FileContentsImpl { - fn poll_read_at( - self: Pin<&Self>, + fn start_read_at<'a>( + self: Pin<&'a Self>, cx: &mut Context, - mut buf: &mut [u8], + mut buf: &'a mut [u8], offset: u64, - ) -> Poll> { + ) -> MaybeReady, ReadAtOperation<'a>> { let size = self.file_size(); if offset >= size { - return Poll::Ready(Ok(0)); + return MaybeReady::Ready(Ok(0)); } let remaining = size - offset; @@ -827,7 +843,14 @@ impl ReadAt for FileContentsImpl { } let offset = self.range.start + offset; - unsafe { self.map_unchecked(|this| &this.input) }.poll_read_at(cx, buf, offset) + unsafe { self.map_unchecked(|this| &this.input) }.start_read_at(cx, buf, offset) + } + + fn poll_complete<'a>( + self: Pin<&'a Self>, + op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + unsafe { self.map_unchecked(|this| &this.input) }.poll_complete(op) } } @@ -835,6 +858,21 @@ impl ReadAt for FileContentsImpl { pub struct SeqReadAtAdapter { input: T, range: Range, + buffer: Vec, + future: Option)>> + 'static>>>, +} + +// We lose `Send` via the boxed trait object and don't want to force the trait object to +// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync +// depending on T. +unsafe impl Send for SeqReadAtAdapter {} +unsafe impl Sync for SeqReadAtAdapter {} + +impl Drop for SeqReadAtAdapter { + fn drop(&mut self) { + // drop order + self.future = None; + } } impl SeqReadAtAdapter { @@ -842,7 +880,12 @@ impl SeqReadAtAdapter { if range.end < range.start { panic!("BAD SEQ READ AT ADAPTER"); } - Self { input, range } + Self { + input, + range, + buffer: Vec::new(), + future: None, + } } #[inline] @@ -855,18 +898,48 @@ impl decoder::SeqRead for SeqReadAtAdapter { fn poll_seq_read( self: Pin<&mut Self>, cx: &mut Context, - buf: &mut [u8], + dest: &mut [u8], ) -> Poll> { - let len = buf.len().min(self.remaining()); - let buf = &mut buf[..len]; + let len = dest.len().min(self.remaining()); + let dest = &mut dest[..len]; let this = unsafe { self.get_unchecked_mut() }; - - let got = ready!(unsafe { - Pin::new_unchecked(&this.input).poll_read_at(cx, buf, this.range.start) - })?; - this.range.start += got as u64; - Poll::Ready(Ok(got)) + loop { + match this.future.take() { + None => { + let mut buffer = mem::take(&mut this.buffer); + util::scale_read_buffer(&mut buffer, dest.len()); + + // Note that we're pinned and we have a drop-handler which forces self.future + // to be dropped before `input`, so putting a reference to self.input into the + // future should be ok! + let reader = &this.input; + + let at = this.range.start; + let future: Pin)>>>> = + Box::pin(async move { + let got = reader.read_at(&mut buffer, at).await?; + io::Result::Ok((got, buffer)) + }); + // Ditch the self-reference life-time now: + this.future = Some(unsafe { mem::transmute(future) }); + } + Some(mut fut) => match fut.as_mut().poll(cx) { + Poll::Pending => { + this.future = Some(fut); + return Poll::Pending; + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Ready(Ok((got, buffer))) => { + this.buffer = buffer; + this.range.start += got as u64; + let len = got.min(dest.len()); + dest[..len].copy_from_slice(&this.buffer[..len]); + return Poll::Ready(Ok(len)); + } + }, + } + } } fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll>> { diff --git a/src/accessor/read_at.rs b/src/accessor/read_at.rs new file mode 100644 index 0000000..d49d080 --- /dev/null +++ b/src/accessor/read_at.rs @@ -0,0 +1,92 @@ +use std::any::Any; +use std::future::Future; +use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Like Poll but Pending yields a value. +pub enum MaybeReady { + Ready(T), + Pending(F), +} + +/// Random access read implementation. +pub trait ReadAt { + fn start_read_at<'a>( + self: Pin<&'a Self>, + cx: &mut Context, + buf: &'a mut [u8], + offset: u64, + ) -> MaybeReady, ReadAtOperation<'a>>; + + fn poll_complete<'a>( + self: Pin<&'a Self>, + op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>>; +} + +pub struct ReadAtOperation<'a> { + pub cookie: Box, + _marker: PhantomData<&'a mut [u8]>, +} + +impl<'a> ReadAtOperation<'a> { + pub fn new>>(cookie: T) -> Self { + Self { + cookie: cookie.into(), + _marker: PhantomData, + } + } +} + +// awaitable helper: + +pub trait ReadAtExt: ReadAt { + fn read_at<'a>(&'a self, buf: &'a mut [u8], offset: u64) -> ReadAtImpl<'a, Self> + where + Self: Sized, + { + ReadAtImpl::New(self, buf, offset) + } +} + +impl ReadAtExt for T {} + +pub enum ReadAtImpl<'a, T: ReadAt> { + Invalid, + New(&'a T, &'a mut [u8], u64), + Pending(&'a T, ReadAtOperation<'a>), + Ready(io::Result), +} + +impl ReadAtImpl<'_, T> { + fn take(&mut self) -> Self { + std::mem::replace(self, ReadAtImpl::Invalid) + } +} + +impl<'a, T: ReadAt> Future for ReadAtImpl<'a, T> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = unsafe { Pin::into_inner_unchecked(self) }; + loop { + match match this.take() { + ReadAtImpl::New(reader, buf, offset) => { + let pin = unsafe { Pin::new_unchecked(reader) }; + (pin.start_read_at(cx, buf, offset), reader) + } + ReadAtImpl::Pending(reader, op) => { + let pin = unsafe { Pin::new_unchecked(reader) }; + (pin.poll_complete(op), reader) + } + ReadAtImpl::Ready(out) => return Poll::Ready(out), + ReadAtImpl::Invalid => panic!("poll after ready"), + } { + (MaybeReady::Ready(out), _reader) => return Poll::Ready(out), + (MaybeReady::Pending(op), reader) => *this = ReadAtImpl::Pending(reader, op), + } + } + } +} diff --git a/src/accessor/sync.rs b/src/accessor/sync.rs index 3eb08cc..50334b9 100644 --- a/src/accessor/sync.rs +++ b/src/accessor/sync.rs @@ -6,9 +6,9 @@ use std::os::unix::fs::FileExt; use std::path::Path; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::Context; -use crate::accessor::{self, cache::Cache, ReadAt}; +use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation}; use crate::decoder::Decoder; use crate::format::GoodbyeItem; use crate::util::poll_result_once; @@ -153,13 +153,20 @@ impl FileReader { } impl ReadAt for FileReader { - fn poll_read_at( - self: Pin<&Self>, + fn start_read_at<'a>( + self: Pin<&'a Self>, _cx: &mut Context, - buf: &mut [u8], + buf: &'a mut [u8], offset: u64, - ) -> Poll> { - Poll::Ready(self.get_ref().inner.read_at(buf, offset)) + ) -> MaybeReady, ReadAtOperation<'a>> { + MaybeReady::Ready(self.get_ref().inner.read_at(buf, offset)) + } + + fn poll_complete<'a>( + self: Pin<&'a Self>, + _op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + panic!("start_read_at on sync file returned Pending"); } } @@ -180,13 +187,20 @@ where T: Clone + std::ops::Deref, T::Target: FileExt, { - fn poll_read_at( - self: Pin<&Self>, + fn start_read_at<'a>( + self: Pin<&'a Self>, _cx: &mut Context, - buf: &mut [u8], + buf: &'a mut [u8], offset: u64, - ) -> Poll> { - Poll::Ready(self.get_ref().inner.read_at(buf, offset)) + ) -> MaybeReady, ReadAtOperation<'a>> { + MaybeReady::Ready(self.get_ref().inner.read_at(buf, offset)) + } + + fn poll_complete<'a>( + self: Pin<&'a Self>, + _op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + panic!("start_read_at on sync file returned Pending"); } } @@ -383,12 +397,22 @@ impl FileExt for FileContents { } impl ReadAt for FileContents { - fn poll_read_at( - self: Pin<&Self>, - _cx: &mut Context, - buf: &mut [u8], + fn start_read_at<'a>( + self: Pin<&'a Self>, + cx: &mut Context, + buf: &'a mut [u8], offset: u64, - ) -> Poll> { - Poll::Ready(poll_result_once(self.get_ref().inner.read_at(buf, offset))) + ) -> MaybeReady, ReadAtOperation<'a>> { + match unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset) { + MaybeReady::Ready(ready) => MaybeReady::Ready(ready), + MaybeReady::Pending(_) => panic!("start_read_at on sync file returned Pending"), + } + } + + fn poll_complete<'a>( + self: Pin<&'a Self>, + _op: ReadAtOperation<'a>, + ) -> MaybeReady, ReadAtOperation<'a>> { + panic!("start_read_at on sync file returned Pending"); } } diff --git a/src/util.rs b/src/util.rs index 79a8785..7389cb2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -5,6 +5,22 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +pub const MAX_READ_BUF_LEN: usize = 4 * 1024 * 1024; + +pub fn scale_read_buffer(buffer: &mut Vec, target: usize) { + let target = target.min(MAX_READ_BUF_LEN); + + if buffer.len() >= target { + buffer.truncate(target); + return; + } + + buffer.reserve(target - buffer.len()); + unsafe { + buffer.set_len(target); + } +} + // from /usr/include/linux/magic.h // and from casync util.h #[rustfmt::skip]