I agree with separating the problem into its constituent concerns to
make sure that we are developing appropriate abstractions.

Speaking specifically about the Parquet codebase, the way that we
access a particular ColumnChunk in a row group is fairly simplistic.
See the ReaderProperties::GetStream method

https://github.com/apache/arrow/blob/master/cpp/src/parquet/properties.cc#L28

Rather than naively issuing a Read command (either buffered or
unbuffered, both bad for filesystems like S3), I think we need to
insert an abstraction at the point where the column reader class
requests an InputStream for its serialized column chunk data, which is
right here

https://github.com/apache/arrow/blob/master/cpp/src/parquet/file_reader.cc#L123

It seems like a stateful object that allows certain byte ranges of the
file to be cached would do the trick.

Next, an API needs to be provided for applications to indicate which
column chunks they intend to read so that the contiguous byte ranges
can be pre-buffered, preventing any column reader from issuing naked
IO calls. Seems like this should happen at the ReaderProperties level.

The IO scheduling seems like it should be abstracted away from the
Parquet library itself. So there would be code similar to

std::pair<int64_t, int64_t> read_ranges = ComputePrebufferedRanges();
RETURN_NOT_OK(caching_file->CacheRanges(read_ranges));

The IO scheduling can happen inside the implementation of CacheRanges.
Then when the column reader is created it will grab a slice of the
cached data rather than issuing IO calls.

Let me know if this analysis makes sense

- Wes

On Wed, Feb 5, 2020 at 9:24 AM Antoine Pitrou <anto...@python.org> wrote:
>
>
> Hi David,
>
> I think we should discuss this as individual features.
>
> > Read Coalescing: from Parquet metadata, we know exactly> which byte ranges 
> > of a file will be read, and can “cheatin the S3 IO
> layer by fetching them in advance
>
> It seems there are two things here: coalescing individual reads, and
> issuing them in advance.  It seems those are separate concerns.
>
> - coalescing reads: should the IO layer expose a ReadRanges function to
> issue several reads at once, which the Parquet layer can then exploit?
>
> - issuing reads in advance: isn't that solved by readahead *at the
> record batch level* (not the IO block level)?
>
> > Concurrency Manager: rather than limit parallelism by number of
> > outstanding tasks, we can instead limit the estimated bandwidth
> > consumption, allowing better performance when read sizes are small.
>
> - Concurrency Manager: is this a per-source optimization, applying
> mainly to networked filesystems?
>
>
> I think we want to make sure that each feature brings progress, instead
> of trying to lump everything at once in a big PR.
>
> Regards
>
> Antoine.
>
>
>
> Le 05/02/2020 à 14:32, David Li a écrit :
> > Hello all,
> >
> > We've been following the Arrow Datasets project with great interest,
> > especially as we have an in-house library with similar goals built on
> > top of PyArrow. Recently, we noticed some discussion around optimizing
> > I/O for such use cases (e.g. PARQUET-1698), which is also where we had
> > focused our efforts.
> >
> > Our long-term goal has been to open-source our library. However, our
> > code is in Python, but it would be most useful to everyone in the C++
> > core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to
> > share our high-level design, and offer to work with the community on
> > the implementation - at the very least, to avoid duplicating work.
> > We've summarized our approach, and hope this can start a discussion on
> > how to integrate such optimizations into Datasets:
> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit#
> >
> > At a high level, we have three main optimizations:
> > - Given a set of columns to read, and potentially a filter on a
> > partition key, we can use Parquet metadata to compute exact byte
> > ranges to read from remote storage, and coalesce/split up reads as
> > necessary based on the characteristics of the storage platform.
> > - Given byte ranges to read, we can read them in parallel, using a
> > global thread pool and concurrency manager to limit parallelism and
> > resource consumption.
> > - By working at the level of a dataset, we can parallelize these
> > operations across files, and pipeline steps like reading Parquet
> > metadata with reading and deserialization.
> >
> > We focus on Parquet and S3/object storage here, but these concepts
> > apply to other file formats and storage systems.
> >
> > The main questions here are whether we think the optimizations are
> > useful for Arrow Datasets, and if so, how the API design and
> > implementation would proceed - I'd appreciate any feedback on the
> > approach here and potential API.
> >
> > David
> >

Reply via email to