Implemented as a seperate struct SeekableCachedChunkReader that contains the original as an Arc, since the read_at future captures the CachedChunkReader, which would otherwise not work with the lifetimes required by AsyncRead. This is also the reason we cannot use a shared read buffer and have to allocate a new one for every read. It also means that the struct items required for AsyncRead/Seek do not need to be included in a regular CachedChunkReader.
This is intended as a replacement for AsyncIndexReader, so we have less code duplication and can utilize the LRU cache there too (even though actual request concurrency is not supported in these traits). Signed-off-by: Stefan Reiter <s.rei...@proxmox.com> --- src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++- 1 file changed, 114 insertions(+), 2 deletions(-) diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs index fd5a049f..9b56fd14 100644 --- a/src/backup/cached_chunk_reader.rs +++ b/src/backup/cached_chunk_reader.rs @@ -1,12 +1,19 @@ //! An async and concurrency safe data reader backed by a local LRU cache. use anyhow::Error; +use futures::future::Future; +use futures::ready; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; -use std::future::Future; +use std::io::SeekFrom; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; -use crate::backup::{AsyncReadChunk, IndexFile}; +use super::{AsyncReadChunk, IndexFile}; use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; +use proxmox::io_format_err; +use proxmox::sys::error::io_err_other; struct AsyncChunkCacher<T> { reader: Arc<T>, @@ -85,3 +92,108 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader< Ok(read) } } + +impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static> + CachedChunkReader<I, R> +{ + /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and + /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred + /// otherwise. + pub fn seekable(self) -> SeekableCachedChunkReader<I, R> { + SeekableCachedChunkReader { + index_bytes: self.index.index_bytes(), + reader: Arc::new(self), + position: 0, + seek_to_pos: 0, + read_future: None, + } + } +} + +pub struct SeekableCachedChunkReader< + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +> { + reader: Arc<CachedChunkReader<I, R>>, + index_bytes: u64, + position: u64, + seek_to_pos: i64, + read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>, +} + +impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R> +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> { + let this = Pin::get_mut(self); + this.seek_to_pos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => this.index_bytes as i64 + offset, + SeekFrom::Current(offset) => this.position as i64 + offset, + }; + Ok(()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> { + let this = Pin::get_mut(self); + + let index_bytes = this.index_bytes; + if this.seek_to_pos < 0 { + return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); + } else if this.seek_to_pos > index_bytes as i64 { + this.position = index_bytes; + } else { + this.position = this.seek_to_pos as u64; + } + + Poll::Ready(Ok(this.position)) + } +} + +impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R> +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut ReadBuf, + ) -> Poll<tokio::io::Result<()>> { + let this = Pin::get_mut(self); + + let fut = match this.read_future { + Some(ref mut fut) => fut, + None => { + let offset = this.position; + let wanted = buf.capacity(); + let reader = Arc::clone(&this.reader); + let fut = Box::pin(async move { + let mut read_buf = vec![0u8; wanted]; + let read = reader.read_at(&mut read_buf[..wanted], offset).await?; + Ok((read_buf, read)) + }); + this.read_future = Some(fut); + this.read_future.as_mut().unwrap() + } + }; + + let ret = match ready!(fut.as_mut().poll(cx)) { + Ok((read_buf, read)) => { + buf.put_slice(&read_buf[..read]); + this.position += read as u64; + Ok(()) + } + Err(err) => { + Err(io_err_other(err)) + } + }; + + // future completed, drop + let _drop = this.read_future.take(); + + Poll::Ready(ret) + } +} -- 2.30.2 _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel