Hi Antoine and Wes, Thanks for the feedback. Yes, we should definitely consider these as separate features.
I agree that it makes sense for the file API (or a derived API) to expose a generic CacheRanges or PrebufferRanges API. It could then do coalescing and prefetching as desired based on the actual underlying data store. (Or, perhaps, the actual prefetch/coalesce steps could be implemented as wrappers also implementing the file API, with parameters to tune for concurrency level/resource consumption/backend storage characteristics; underlying file implementations would just treat this as a no-op. This is what we prototyped internally in C++ before writing this up - a wrapper around S3File that naively prefetched the entire file.) As a separate step, prefetching/caching should also make use of a global (or otherwise shared) IO thread pool, so that parallel reads of different files implicitly coordinate work with each other as well. Then, you could queue up reads of several Parquet files, such that a slow network call for one file doesn't block progress for other files, without issuing reads for all of these files at once. It's unclear to me what readahead at the record batch level would accomplish - Parquet reads each column chunk in a row group as a whole, and if the row groups are large, then multiple record batches would fall in the same row group, so then we wouldn't gain any parallelism, no? (Admittedly, I'm not familiar with the internals here.) The concurrency manager would apply mostly to networked filesystems, yes. It's more an observation that some workloads issue fewer reads of large ranges, and other workloads issue lots of reads on small ranges, so concurrency should be limited based on estimated bandwidth, and not purely on number of concurrent tasks. In this case, it sounds like there are at least the following concrete tasks: - Adding a (no-op) CacheRanges method to the RandomAccessFile API. - Providing a column range hint to Parquet, and compute column chunk ranges to buffer via CacheRanges, - Providing an implementation of CacheRanges (e.g. via a wrapper RandomAccessFile) that coalesces ranges based on backend storage characteristics, - Providing an implementation of CacheRanges that actually prefetches ranges in the background, - Providing a shared I/O pool to coordinate such I/O across multiple files, - Providing some mechanism to limit concurrency in this shared pool, - Sending down all these hints from the Datasets implementation. Thanks, David On 2/5/20, Wes McKinney <wesmck...@gmail.com> wrote: > I agree with separating the problem into its constituent concerns to > make sure that we are developing appropriate abstractions. > > Speaking specifically about the Parquet codebase, the way that we > access a particular ColumnChunk in a row group is fairly simplistic. > See the ReaderProperties::GetStream method > > https://github.com/apache/arrow/blob/master/cpp/src/parquet/properties.cc#L28 > > Rather than naively issuing a Read command (either buffered or > unbuffered, both bad for filesystems like S3), I think we need to > insert an abstraction at the point where the column reader class > requests an InputStream for its serialized column chunk data, which is > right here > > https://github.com/apache/arrow/blob/master/cpp/src/parquet/file_reader.cc#L123 > > It seems like a stateful object that allows certain byte ranges of the > file to be cached would do the trick. > > Next, an API needs to be provided for applications to indicate which > column chunks they intend to read so that the contiguous byte ranges > can be pre-buffered, preventing any column reader from issuing naked > IO calls. Seems like this should happen at the ReaderProperties level. > > The IO scheduling seems like it should be abstracted away from the > Parquet library itself. So there would be code similar to > > std::pair<int64_t, int64_t> read_ranges = ComputePrebufferedRanges(); > RETURN_NOT_OK(caching_file->CacheRanges(read_ranges)); > > The IO scheduling can happen inside the implementation of CacheRanges. > Then when the column reader is created it will grab a slice of the > cached data rather than issuing IO calls. > > Let me know if this analysis makes sense > > - Wes > > On Wed, Feb 5, 2020 at 9:24 AM Antoine Pitrou <anto...@python.org> wrote: >> >> >> Hi David, >> >> I think we should discuss this as individual features. >> >> > Read Coalescing: from Parquet metadata, we know exactly> which byte >> > ranges of a file will be read, and can “cheatin the S3 IO >> layer by fetching them in advance >> >> It seems there are two things here: coalescing individual reads, and >> issuing them in advance. It seems those are separate concerns. >> >> - coalescing reads: should the IO layer expose a ReadRanges function to >> issue several reads at once, which the Parquet layer can then exploit? >> >> - issuing reads in advance: isn't that solved by readahead *at the >> record batch level* (not the IO block level)? >> >> > Concurrency Manager: rather than limit parallelism by number of >> > outstanding tasks, we can instead limit the estimated bandwidth >> > consumption, allowing better performance when read sizes are small. >> >> - Concurrency Manager: is this a per-source optimization, applying >> mainly to networked filesystems? >> >> >> I think we want to make sure that each feature brings progress, instead >> of trying to lump everything at once in a big PR. >> >> Regards >> >> Antoine. >> >> >> >> Le 05/02/2020 à 14:32, David Li a écrit : >> > Hello all, >> > >> > We've been following the Arrow Datasets project with great interest, >> > especially as we have an in-house library with similar goals built on >> > top of PyArrow. Recently, we noticed some discussion around optimizing >> > I/O for such use cases (e.g. PARQUET-1698), which is also where we had >> > focused our efforts. >> > >> > Our long-term goal has been to open-source our library. However, our >> > code is in Python, but it would be most useful to everyone in the C++ >> > core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to >> > share our high-level design, and offer to work with the community on >> > the implementation - at the very least, to avoid duplicating work. >> > We've summarized our approach, and hope this can start a discussion on >> > how to integrate such optimizations into Datasets: >> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit# >> > >> > At a high level, we have three main optimizations: >> > - Given a set of columns to read, and potentially a filter on a >> > partition key, we can use Parquet metadata to compute exact byte >> > ranges to read from remote storage, and coalesce/split up reads as >> > necessary based on the characteristics of the storage platform. >> > - Given byte ranges to read, we can read them in parallel, using a >> > global thread pool and concurrency manager to limit parallelism and >> > resource consumption. >> > - By working at the level of a dataset, we can parallelize these >> > operations across files, and pipeline steps like reading Parquet >> > metadata with reading and deserialization. >> > >> > We focus on Parquet and S3/object storage here, but these concepts >> > apply to other file formats and storage systems. >> > >> > The main questions here are whether we think the optimizations are >> > useful for Arrow Datasets, and if so, how the API design and >> > implementation would proceed - I'd appreciate any feedback on the >> > approach here and potential API. >> > >> > David >> > >