hi David, Yes, this sounds right to me. I would say that we should come up with the public API for column prebuffering ASAP and then get to work on implementing it and working to maximize the throughput.
- Wes On Wed, Mar 18, 2020 at 11:37 AM David Li <li.david...@gmail.com> wrote: > > Hi all, > > Thanks to Antoine for implementing the core read coalescing logic. > > We've taken a look at what else needs to be done to get this working, > and it sounds like the following changes would be worthwhile, > independent of the rest of the optimizations we discussed: > > - Add benchmarks of the current Parquet reader with the current S3File > (and other file implementations) so we can track > improvements/regressions > - Use the coalescing inside the Parquet reader (even without a column > filter hint - this would subsume PARQUET-1698) > - In coalescing, split large read ranges into smaller ones (this would > further improve on PARQUET-1698 by taking advantage of parallel reads) > - Accept a column filter hint in the Parquet reader and use that to > compute read ranges > > Does this sound reasonable? > > Thanks, > David > > On 2/6/20, David Li <li.david...@gmail.com> wrote: > > Catching up on questions here... > > > >> Typically you can solve this by having enough IO concurrency at once :-) > >> I'm not sure having sophisticated global coordination (based on which > >> algorithms) would bring anything. Would you care to elaborate? > > > > We aren't proposing *sophisticated* global coordination, rather, just > > using a global pool with a global limit, so that a user doesn't > > unintentionally start hundreds of requests in parallel, and so that > > you can adjust the resource consumption/performance tradeoff. > > > > Essentially, what our library does is maintain two pools (for I/O): > > - One pool produces I/O requests, by going through the list of files, > > fetching the Parquet footers, and queuing up I/O requests on the main > > pool. (This uses a pool so we can fetch and parse metadata from > > multiple Parquet files at once.) > > - One pool serves I/O requests, by fetching chunks and placing them in > > buffers inside the file object implementation. > > > > The global concurrency manager additionally limits the second pool by > > not servicing I/O requests for a file until all of the I/O requests > > for previous files have at least started. (By just having lots of > > concurrency, you might end up starving yourself by reading data you > > don't want quite yet.) > > > > Additionally, the global pool could still be a win for non-Parquet > > files - an implementation can at least submit, say, an entire CSV file > > as a "chunk" and have it read in the background. > > > >> Actually, on a more high-level basis, is the goal to prefetch for > >> sequential consumption of row groups? > > > > At least for us, our query pattern is to sequentially consume row > > groups from a large dataset, where we select a subset of columns and a > > subset of the partition key range (usually time range). Prefetching > > speeds this up substantially, or in general, pipelining discovery of > > files, I/O, and deserialization. > > > >> There are no situations where you would want to consume a scattered > >> subset of row groups (e.g. predicate pushdown)? > > > > With coalescing, this "automatically" gets optimized. If you happen to > > need column chunks from separate row groups that are adjacent or close > > on-disk, coalescing will still fetch them in a single IO call. > > > > We found that having large row groups was more beneficial than small > > row groups, since when you combine small row groups with column > > selection, you end up with a lot of small non-adjacent column chunks - > > which coalescing can't help with. The exact tradeoff depends on the > > dataset and workload, of course. > > > >> This seems like too much to try to build into RandomAccessFile. I would > >> suggest a class that wraps a random access file and manages cached > >> segments > >> and their lifetimes through explicit APIs. > > > > A wrapper class seems ideal, especially as the logic is agnostic to > > the storage backend (except for some parameters which can either be > > hand-tuned or estimated on the fly). It also keeps the scope of the > > changes down. > > > >> Where to put the "async multiple range request" API is a separate > >> question, > >> though. Probably makes sense to start writing some working code and sort > >> it > >> out there. > > > > We haven't looked in this direction much. Our designs are based around > > thread pools partly because we wanted to avoid modifying the Parquet > > and Arrow internals, instead choosing to modify the I/O layer to "keep > > Parquet fed" as quickly as possible. > > > > Overall, I recall there's an issue open for async APIs in > > Arrow...perhaps we want to move that to a separate discussion, or on > > the contrary, explore some experimental APIs here to inform the > > overall design. > > > > Thanks, > > David > > > > On 2/6/20, Wes McKinney <wesmck...@gmail.com> wrote: > >> On Thu, Feb 6, 2020 at 1:30 PM Antoine Pitrou <anto...@python.org> wrote: > >>> > >>> > >>> Le 06/02/2020 à 20:20, Wes McKinney a écrit : > >>> >> Actually, on a more high-level basis, is the goal to prefetch for > >>> >> sequential consumption of row groups? > >>> >> > >>> > > >>> > Essentially yes. One "easy" optimization is to prefetch the entire > >>> > serialized row group. This is an evolution of that idea where we want > >>> > to > >>> > prefetch only the needed parts of a row group in a minimum number of > >>> > IO > >>> > calls (consider reading the first 10 columns from a file with 1000 > >>> > columns > >>> > -- so we want to do one IO call instead of 10 like we do now). > >>> > >>> There are no situations where you would want to consume a scattered > >>> subset of row groups (e.g. predicate pushdown)? > >> > >> There are. If it can be demonstrated that there are performance gains > >> resulting from IO optimizations involving multiple row groups then I > >> see no reason not to implement them. > >> > >>> Regards > >>> > >>> Antoine. > >> > >