On Wed, Jun 02, 2021 at 04:38:28PM +0200, Stefan Reiter wrote: > Implemented as a seperate struct SeekableCachedChunkReader that contains > the original as an Arc, since the read_at future captures the > CachedChunkReader, which would otherwise not work with the lifetimes > required by AsyncRead. This is also the reason we cannot use a shared > read buffer and have to allocate a new one for every read. It also means > that the struct items required for AsyncRead/Seek do not need to be > included in a regular CachedChunkReader. > > This is intended as a replacement for AsyncIndexReader, so we have less > code duplication and can utilize the LRU cache there too (even though > actual request concurrency is not supported in these traits). > > Signed-off-by: Stefan Reiter <s.rei...@proxmox.com> > --- > src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++- > 1 file changed, 114 insertions(+), 2 deletions(-) > > diff --git a/src/backup/cached_chunk_reader.rs > b/src/backup/cached_chunk_reader.rs > index fd5a049f..9b56fd14 100644 > --- a/src/backup/cached_chunk_reader.rs > +++ b/src/backup/cached_chunk_reader.rs > @@ -1,12 +1,19 @@ > //! An async and concurrency safe data reader backed by a local LRU cache. > > use anyhow::Error; > +use futures::future::Future; > +use futures::ready; > +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; > > -use std::future::Future; > +use std::io::SeekFrom; > +use std::pin::Pin; > use std::sync::Arc; > +use std::task::{Context, Poll}; > > -use crate::backup::{AsyncReadChunk, IndexFile}; > +use super::{AsyncReadChunk, IndexFile}; > use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; > +use proxmox::io_format_err; > +use proxmox::sys::error::io_err_other; > > struct AsyncChunkCacher<T> { > reader: Arc<T>, > @@ -85,3 +92,108 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + > 'static> CachedChunkReader< > Ok(read) > } > } > + > +impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + > 'static> > + CachedChunkReader<I, R> > +{ > + /// Returns a SeekableCachedChunkReader based on this instance, which > implements AsyncSeek and > + /// AsyncRead for use in interfaces which require that. Direct use of > read_at is preferred > + /// otherwise. > + pub fn seekable(self) -> SeekableCachedChunkReader<I, R> { > + SeekableCachedChunkReader { > + index_bytes: self.index.index_bytes(), > + reader: Arc::new(self), > + position: 0, > + seek_to_pos: 0, > + read_future: None, > + } > + } > +} > + > +pub struct SeekableCachedChunkReader< > + I: IndexFile + Send + Sync + 'static, > + R: AsyncReadChunk + Send + Sync + 'static, > +> { > + reader: Arc<CachedChunkReader<I, R>>, > + index_bytes: u64, > + position: u64, > + seek_to_pos: i64, > + read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), > Error>> + Send>>>, > +} > + > +impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R> > +where > + I: IndexFile + Send + Sync + 'static, > + R: AsyncReadChunk + Send + Sync + 'static, > +{ > + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> > tokio::io::Result<()> { > + let this = Pin::get_mut(self); > + this.seek_to_pos = match pos { > + SeekFrom::Start(offset) => offset as i64, > + SeekFrom::End(offset) => this.index_bytes as i64 + offset, > + SeekFrom::Current(offset) => this.position as i64 + offset, > + }; > + Ok(()) > + } > + > + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> > Poll<tokio::io::Result<u64>> { > + let this = Pin::get_mut(self); > + > + let index_bytes = this.index_bytes; > + if this.seek_to_pos < 0 { > + return Poll::Ready(Err(io_format_err!("cannot seek to negative > values")));
IMO we should ditch `seek_to_pos` altogether, move the error handling into `start_seek` and just always return `Poll::Ready(Ok(this.position))` straightaway. Unless there's a reason to split this up? Other resources don't guarantee much of anything if you start read/writing *between* `start_seek`/`poll_complete` after all. > + } else if this.seek_to_pos > index_bytes as i64 { > + this.position = index_bytes; > + } else { > + this.position = this.seek_to_pos as u64; > + } > + > + Poll::Ready(Ok(this.position)) > + } > +} > + > +impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R> > +where > + I: IndexFile + Send + Sync + 'static, > + R: AsyncReadChunk + Send + Sync + 'static, > +{ > + fn poll_read( > + self: Pin<&mut Self>, > + cx: &mut Context, > + buf: &mut ReadBuf, > + ) -> Poll<tokio::io::Result<()>> { > + let this = Pin::get_mut(self); > + > + let fut = match this.read_future { > + Some(ref mut fut) => fut, > + None => { > + let offset = this.position; > + let wanted = buf.capacity(); > + let reader = Arc::clone(&this.reader); > + let fut = Box::pin(async move { > + let mut read_buf = vec![0u8; wanted]; > + let read = reader.read_at(&mut read_buf[..wanted], > offset).await?; > + Ok((read_buf, read)) > + }); > + this.read_future = Some(fut); > + this.read_future.as_mut().unwrap() > + } > + }; Your `None` case seems trivial enough that you could use the Option's `.get_or_insert_with()` instead of match with `ref mut` and `.as_mut().unwrap()` (since the `None` case has no error cases) > + > + let ret = match ready!(fut.as_mut().poll(cx)) { > + Ok((read_buf, read)) => { > + buf.put_slice(&read_buf[..read]); > + this.position += read as u64; > + Ok(()) > + } > + Err(err) => { > + Err(io_err_other(err)) > + } > + }; > + > + // future completed, drop > + let _drop = this.read_future.take(); Why not just `this.read_future = None;` ? > + > + Poll::Ready(ret) > + } > +} > -- > 2.30.2 _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel