//!
//! TODO: Implement a locking version for AsyncSeek+AsyncRead files?
+use std::future::Future;
use std::io;
+use std::mem;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::Path;
use std::sync::Arc;
use std::task::{Context, Poll};
-use crate::accessor::{self, cache::Cache, ReadAt};
+use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
use crate::decoder::aio::Decoder;
use crate::format::GoodbyeItem;
-use crate::poll_fn::poll_fn;
+use crate::util;
use crate::Entry;
use super::sync::{FileReader, FileRefReader};
FileContents {
inner: self.inner.open_contents_at_range(range),
at: 0,
+ buffer: Vec::new(),
+ future: None,
}
}
Ok(FileContents {
inner: self.inner.contents().await?,
at: 0,
+ buffer: Vec::new(),
+ future: None,
})
}
pub struct FileContents<T> {
inner: accessor::FileContentsImpl<T>,
at: u64,
+ buffer: Vec<u8>,
+ future: Option<Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>> + 'static>>>,
+}
+
+// We lose `Send` via the boxed trait object and don't want to force the trait object to
+// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync
+// depending on T.
+unsafe impl<T: Send> Send for FileContents<T> {}
+unsafe impl<T: Sync> Sync for FileContents<T> {}
+
+#[cfg(any(feature = "futures-io", feature = "tokio-io"))]
+impl<T: Clone + ReadAt> FileContents<T> {
+ /// Similar implementation exists for SeqReadAtAdapter in mod.rs
+ fn do_poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ dest: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ let this = unsafe { Pin::into_inner_unchecked(self) };
+ loop {
+ match this.future.take() {
+ None => {
+ let mut buffer = mem::take(&mut this.buffer);
+ util::scale_read_buffer(&mut buffer, dest.len());
+ let reader: accessor::FileContentsImpl<T> = this.inner.clone();
+ let at = this.at;
+ let future: Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>>>> =
+ Box::pin(async move {
+ let got = reader.read_at(&mut buffer, at).await?;
+ io::Result::Ok((got, buffer))
+ });
+ // This future has the lifetime from T. Self also has this lifetime and we
+ // store this in a pinned self. T maybe a reference with a non-'static life
+ // time, but then it cannot be a self-reference into Self, so this should be
+ // valid in all cases:
+ this.future = Some(unsafe { mem::transmute(future) });
+ }
+ Some(mut fut) => match fut.as_mut().poll(cx) {
+ Poll::Pending => {
+ this.future = Some(fut);
+ return Poll::Pending;
+ }
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Ready(Ok((got, buffer))) => {
+ this.buffer = buffer;
+ this.at += got as u64;
+ let len = got.min(dest.len());
+ dest[..len].copy_from_slice(&this.buffer[..len]);
+ return Poll::Ready(Ok(len));
+ }
+ },
+ }
+ }
+ }
}
#[cfg(feature = "futures-io")]
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- let inner = unsafe { Pin::new_unchecked(&self.inner) };
- match inner.poll_read_at(cx, buf, self.at) {
- Poll::Ready(Ok(got)) => {
- unsafe {
- self.get_unchecked_mut().at += got as u64;
- }
- Poll::Ready(Ok(got))
- }
- other => other,
- }
+ Self::do_poll_read(self, cx, buf)
}
}
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- let inner = unsafe { Pin::new_unchecked(&self.inner) };
- match inner.poll_read_at(cx, buf, self.at) {
- Poll::Ready(Ok(got)) => {
- unsafe {
- self.get_unchecked_mut().at += got as u64;
- }
- Poll::Ready(Ok(got))
- }
- other => other,
- }
+ Self::do_poll_read(self, cx, buf)
}
}
impl<T: Clone + ReadAt> ReadAt for FileContents<T> {
- fn poll_read_at(
- self: Pin<&Self>,
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
cx: &mut Context,
- buf: &mut [u8],
+ buf: &'a mut [u8],
offset: u64,
- ) -> Poll<io::Result<usize>> {
- unsafe { self.map_unchecked(|this| &this.inner) }.poll_read_at(cx, buf, offset)
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset)
+ }
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ unsafe { self.map_unchecked(|this| &this.inner) }.poll_complete(op)
}
}
impl<T: Clone + ReadAt> FileContents<T> {
/// Convenience helper for `read_at`:
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
- let this = unsafe { Pin::new_unchecked(self) };
- poll_fn(move |cx| this.poll_read_at(cx, buf, offset)).await
+ self.inner.read_at(buf, offset).await
}
}
//! Random access for PXAR files.
use std::ffi::{OsStr, OsString};
+use std::future::Future;
use std::io;
use std::mem::{self, size_of, size_of_val, MaybeUninit};
use std::ops::Range;
use crate::binary_tree_array;
use crate::decoder::{self, DecoderImpl};
use crate::format::{self, GoodbyeItem};
-use crate::poll_fn::poll_fn;
use crate::util;
use crate::{Entry, EntryKind};
pub mod cache;
pub mod sync;
+pub mod read_at;
+
#[doc(inline)]
pub use sync::{Accessor, DirEntry, Directory, FileEntry, ReadDir};
+#[doc(inline)]
+pub use read_at::{MaybeReady, ReadAt, ReadAtExt, ReadAtOperation};
+
use cache::Cache;
/// Range information used for unsafe raw random access:
}
}
-/// Random access read implementation.
-pub trait ReadAt {
- fn poll_read_at(
- self: Pin<&Self>,
- cx: &mut Context,
- buf: &mut [u8],
- offset: u64,
- ) -> Poll<io::Result<usize>>;
-}
-
-/// awaitable version of `poll_read_at`.
+/// awaitable version of `ReadAt`.
async fn read_at<T>(input: &T, buf: &mut [u8], offset: u64) -> io::Result<usize>
where
- T: ReadAt + ?Sized,
+ T: ReadAtExt,
{
- poll_fn(|cx| unsafe { Pin::new_unchecked(input).poll_read_at(cx, buf, offset) }).await
+ input.read_at(buf, offset).await
}
/// `read_exact_at` - since that's what we _actually_ want most of the time.
async fn read_exact_at<T>(input: &T, mut buf: &mut [u8], mut offset: u64) -> io::Result<()>
where
- T: ReadAt + ?Sized,
+ T: ReadAt,
{
while !buf.is_empty() {
match read_at(input, buf, offset).await? {
/// Helper to read into an `Endian`-implementing `struct`.
async fn read_entry_at<T, E: Endian>(input: &T, offset: u64) -> io::Result<E>
where
- T: ReadAt + ?Sized,
+ T: ReadAt,
{
let mut data = MaybeUninit::<E>::uninit();
let buf =
/// Helper to read into an allocated byte vector.
async fn read_exact_data_at<T>(input: &T, size: usize, offset: u64) -> io::Result<Vec<u8>>
where
- T: ReadAt + ?Sized,
+ T: ReadAt,
{
let mut data = util::vec_new(size);
read_exact_at(input, &mut data[..], offset).await?;
}
/// Allow using trait objects for `T: ReadAt`
-impl<'a> ReadAt for &(dyn ReadAt + 'a) {
- fn poll_read_at(
- self: Pin<&Self>,
+impl<'d> ReadAt for &(dyn ReadAt + 'd) {
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
cx: &mut Context,
- buf: &mut [u8],
+ buf: &'a mut [u8],
offset: u64,
- ) -> Poll<io::Result<usize>> {
- unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) }
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ unsafe { Pin::new_unchecked(&**self).start_read_at(cx, buf, offset) }
+ }
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ unsafe { Pin::new_unchecked(&**self).poll_complete(op) }
}
}
/// immutable `&self`, this adds some convenience by allowing to just `Arc` any `'static` type that
/// implemments `ReadAt` for type monomorphization.
impl ReadAt for Arc<dyn ReadAt + Send + Sync + 'static> {
- fn poll_read_at(
- self: Pin<&Self>,
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
cx: &mut Context,
- buf: &mut [u8],
+ buf: &'a mut [u8],
offset: u64,
- ) -> Poll<io::Result<usize>> {
- unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) }
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ unsafe {
+ self.map_unchecked(|this| &**this)
+ .start_read_at(cx, buf, offset)
+ }
+ }
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ unsafe { self.map_unchecked(|this| &**this).poll_complete(op) }
}
}
io_bail!("filename exceeds current file range");
}
- Ok((get_decoder(input, entry_offset..entry_range.end, path).await?, entry_offset))
+ Ok((
+ get_decoder(input, entry_offset..entry_range.end, path).await?,
+ entry_offset,
+ ))
}
impl<T: Clone + ReadAt> AccessorImpl<T> {
}
/// A reader for file contents.
+#[derive(Clone)]
pub(crate) struct FileContentsImpl<T> {
input: T,
}
impl<T: Clone + ReadAt> ReadAt for FileContentsImpl<T> {
- fn poll_read_at(
- self: Pin<&Self>,
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
cx: &mut Context,
- mut buf: &mut [u8],
+ mut buf: &'a mut [u8],
offset: u64,
- ) -> Poll<io::Result<usize>> {
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
let size = self.file_size();
if offset >= size {
- return Poll::Ready(Ok(0));
+ return MaybeReady::Ready(Ok(0));
}
let remaining = size - offset;
}
let offset = self.range.start + offset;
- unsafe { self.map_unchecked(|this| &this.input) }.poll_read_at(cx, buf, offset)
+ unsafe { self.map_unchecked(|this| &this.input) }.start_read_at(cx, buf, offset)
+ }
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ unsafe { self.map_unchecked(|this| &this.input) }.poll_complete(op)
}
}
pub struct SeqReadAtAdapter<T> {
input: T,
range: Range<u64>,
+ buffer: Vec<u8>,
+ future: Option<Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>> + 'static>>>,
+}
+
+// We lose `Send` via the boxed trait object and don't want to force the trait object to
+// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync
+// depending on T.
+unsafe impl<T: Send> Send for SeqReadAtAdapter<T> {}
+unsafe impl<T: Sync> Sync for SeqReadAtAdapter<T> {}
+
+impl<T> Drop for SeqReadAtAdapter<T> {
+ fn drop(&mut self) {
+ // drop order
+ self.future = None;
+ }
}
impl<T: ReadAt> SeqReadAtAdapter<T> {
if range.end < range.start {
panic!("BAD SEQ READ AT ADAPTER");
}
- Self { input, range }
+ Self {
+ input,
+ range,
+ buffer: Vec::new(),
+ future: None,
+ }
}
#[inline]
fn poll_seq_read(
self: Pin<&mut Self>,
cx: &mut Context,
- buf: &mut [u8],
+ dest: &mut [u8],
) -> Poll<io::Result<usize>> {
- let len = buf.len().min(self.remaining());
- let buf = &mut buf[..len];
+ let len = dest.len().min(self.remaining());
+ let dest = &mut dest[..len];
let this = unsafe { self.get_unchecked_mut() };
-
- let got = ready!(unsafe {
- Pin::new_unchecked(&this.input).poll_read_at(cx, buf, this.range.start)
- })?;
- this.range.start += got as u64;
- Poll::Ready(Ok(got))
+ loop {
+ match this.future.take() {
+ None => {
+ let mut buffer = mem::take(&mut this.buffer);
+ util::scale_read_buffer(&mut buffer, dest.len());
+
+ // Note that we're pinned and we have a drop-handler which forces self.future
+ // to be dropped before `input`, so putting a reference to self.input into the
+ // future should be ok!
+ let reader = &this.input;
+
+ let at = this.range.start;
+ let future: Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>>>> =
+ Box::pin(async move {
+ let got = reader.read_at(&mut buffer, at).await?;
+ io::Result::Ok((got, buffer))
+ });
+ // Ditch the self-reference life-time now:
+ this.future = Some(unsafe { mem::transmute(future) });
+ }
+ Some(mut fut) => match fut.as_mut().poll(cx) {
+ Poll::Pending => {
+ this.future = Some(fut);
+ return Poll::Pending;
+ }
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Ready(Ok((got, buffer))) => {
+ this.buffer = buffer;
+ this.range.start += got as u64;
+ let len = got.min(dest.len());
+ dest[..len].copy_from_slice(&this.buffer[..len]);
+ return Poll::Ready(Ok(len));
+ }
+ },
+ }
+ }
}
fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
--- /dev/null
+use std::any::Any;
+use std::future::Future;
+use std::io;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Like Poll but Pending yields a value.
+pub enum MaybeReady<T, F> {
+ Ready(T),
+ Pending(F),
+}
+
+/// Random access read implementation.
+pub trait ReadAt {
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
+ cx: &mut Context,
+ buf: &'a mut [u8],
+ offset: u64,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>>;
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>>;
+}
+
+pub struct ReadAtOperation<'a> {
+ pub cookie: Box<dyn Any + Send + Sync>,
+ _marker: PhantomData<&'a mut [u8]>,
+}
+
+impl<'a> ReadAtOperation<'a> {
+ pub fn new<T: Into<Box<dyn Any + Send + Sync>>>(cookie: T) -> Self {
+ Self {
+ cookie: cookie.into(),
+ _marker: PhantomData,
+ }
+ }
+}
+
+// awaitable helper:
+
+pub trait ReadAtExt: ReadAt {
+ fn read_at<'a>(&'a self, buf: &'a mut [u8], offset: u64) -> ReadAtImpl<'a, Self>
+ where
+ Self: Sized,
+ {
+ ReadAtImpl::New(self, buf, offset)
+ }
+}
+
+impl<T: ReadAt> ReadAtExt for T {}
+
+pub enum ReadAtImpl<'a, T: ReadAt> {
+ Invalid,
+ New(&'a T, &'a mut [u8], u64),
+ Pending(&'a T, ReadAtOperation<'a>),
+ Ready(io::Result<usize>),
+}
+
+impl<T: ReadAt> ReadAtImpl<'_, T> {
+ fn take(&mut self) -> Self {
+ std::mem::replace(self, ReadAtImpl::Invalid)
+ }
+}
+
+impl<'a, T: ReadAt> Future for ReadAtImpl<'a, T> {
+ type Output = io::Result<usize>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let this = unsafe { Pin::into_inner_unchecked(self) };
+ loop {
+ match match this.take() {
+ ReadAtImpl::New(reader, buf, offset) => {
+ let pin = unsafe { Pin::new_unchecked(reader) };
+ (pin.start_read_at(cx, buf, offset), reader)
+ }
+ ReadAtImpl::Pending(reader, op) => {
+ let pin = unsafe { Pin::new_unchecked(reader) };
+ (pin.poll_complete(op), reader)
+ }
+ ReadAtImpl::Ready(out) => return Poll::Ready(out),
+ ReadAtImpl::Invalid => panic!("poll after ready"),
+ } {
+ (MaybeReady::Ready(out), _reader) => return Poll::Ready(out),
+ (MaybeReady::Pending(op), reader) => *this = ReadAtImpl::Pending(reader, op),
+ }
+ }
+ }
+}
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::task::Context;
-use crate::accessor::{self, cache::Cache, ReadAt};
+use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
use crate::decoder::Decoder;
use crate::format::GoodbyeItem;
use crate::util::poll_result_once;
}
impl<T: FileExt> ReadAt for FileReader<T> {
- fn poll_read_at(
- self: Pin<&Self>,
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
_cx: &mut Context,
- buf: &mut [u8],
+ buf: &'a mut [u8],
offset: u64,
- ) -> Poll<io::Result<usize>> {
- Poll::Ready(self.get_ref().inner.read_at(buf, offset))
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ MaybeReady::Ready(self.get_ref().inner.read_at(buf, offset))
+ }
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ _op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ panic!("start_read_at on sync file returned Pending");
}
}
T: Clone + std::ops::Deref,
T::Target: FileExt,
{
- fn poll_read_at(
- self: Pin<&Self>,
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
_cx: &mut Context,
- buf: &mut [u8],
+ buf: &'a mut [u8],
offset: u64,
- ) -> Poll<io::Result<usize>> {
- Poll::Ready(self.get_ref().inner.read_at(buf, offset))
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ MaybeReady::Ready(self.get_ref().inner.read_at(buf, offset))
+ }
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ _op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ panic!("start_read_at on sync file returned Pending");
}
}
}
impl<T: Clone + ReadAt> ReadAt for FileContents<T> {
- fn poll_read_at(
- self: Pin<&Self>,
- _cx: &mut Context,
- buf: &mut [u8],
+ fn start_read_at<'a>(
+ self: Pin<&'a Self>,
+ cx: &mut Context,
+ buf: &'a mut [u8],
offset: u64,
- ) -> Poll<io::Result<usize>> {
- Poll::Ready(poll_result_once(self.get_ref().inner.read_at(buf, offset)))
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ match unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset) {
+ MaybeReady::Ready(ready) => MaybeReady::Ready(ready),
+ MaybeReady::Pending(_) => panic!("start_read_at on sync file returned Pending"),
+ }
+ }
+
+ fn poll_complete<'a>(
+ self: Pin<&'a Self>,
+ _op: ReadAtOperation<'a>,
+ ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
+ panic!("start_read_at on sync file returned Pending");
}
}
use std::pin::Pin;
use std::task::{Context, Poll};
+pub const MAX_READ_BUF_LEN: usize = 4 * 1024 * 1024;
+
+pub fn scale_read_buffer(buffer: &mut Vec<u8>, target: usize) {
+ let target = target.min(MAX_READ_BUF_LEN);
+
+ if buffer.len() >= target {
+ buffer.truncate(target);
+ return;
+ }
+
+ buffer.reserve(target - buffer.len());
+ unsafe {
+ buffer.set_len(target);
+ }
+}
+
// from /usr/include/linux/magic.h
// and from casync util.h
#[rustfmt::skip]