I agree with making the decomposition of a fragment into tasks an
internal detail of the scan implementation. It seems that we want to
be moving toward a world of consuming a stream of
Future<shared_ptr<RecordBatch>> and not pushing the complexity of
concurrency management (necessarily) onto the consumer. The nature of
multithreading/scheduling would be pushed higher in the stack -- for
example, you might decide that a fragment and all its child parallel /
nested tasks could go into the task queue of a single CPU core, where
idle CPUs are able to steal work from that queue if they want.

On Fri, Mar 26, 2021 at 11:32 AM David Li <lidav...@apache.org> wrote:
>
> I agree we should present a simplified interface, and then also make ScanTask 
> internal, but I think that is orthogonal to whether a fragment produces one 
> or multiple scan tasks.
>
> At first, my worry with having (Parquet)ScanTask handle concurrency itself 
> was that it does need to coordinate with the overall scanner, right? If you 
> have two files with 100 row groups each, that's much different than 100 files 
> with two row groups each. With a scan task per row group, a single rea 
> naturally handles both cases, but with a single scan task per file, you have 
> to juggle the exact amount of readahead on an inter- and intra-file level.
>
> That said, there is an issue for making readahead operate by amount of memory 
> used instead of number of files/tasks which would presumably handle that just 
> as well. And right now, one (Parquet)ScanTask-per-row group does lead to some 
> implementation nuisance elsewhere (since all scan tasks for a file have to 
> share the same Parquet reader and pre-buffering task).
>
> Also I realize my example is poor, because you do actually want to separate 
> intra- and inter-fragment concurrency - you want to at least be buffering the 
> next files (without decoding them) while decoding the current file. And the 
> proposed model would make it easier to support a consumer that can process 
> batches out of order while limiting memory usage (just limit the 
> inter-scan-task readahead).
>
> So on balance I'm in favor of this.
>
> I'll also note that there could be other Fragments which may naturally have 
> intra-fragment parallelism, if the concern is mostly that ParquetScanTask is 
> a bit of an outlier. For instance, a hypothetical FlightFragment wrapping a 
> FlightInfo struct could generate multiple scan tasks, one per FlightEndpoint 
> in the FlightInfo.
>
> Best,
> David
>
> On Thu, Mar 25, 2021, at 19:48, Weston Pace wrote:
> > This is a bit of a follow-up on
> > https://issues.apache.org/jira/browse/ARROW-11782 and also a bit of a
> > consequence of my work on
> > https://issues.apache.org/jira/browse/ARROW-7001 (nested scan
> > parallelism).
> >
> > I think the current dataset interface should be simplified.
> > Currently, we have Dataset ->* Fragment ->* ScanTask ->* RecordBatch
> > with the components being...
> >
> > Dataset - Binds together a format & fragment discovery
> > Fragment - Something that maps to an input stream (usually a file)
> > ScanTask - Created by a format, turns an input stream into record batches.
> > RecordBatch - I hope I don't need to define this one :)
> >
> > The change I'm recommending (and starting to implement in ARROW-7001)
> > is to change the cardinality of Fragment ->* ScanTask to Fragment ->
> > ScanTask (i.e. one scan task per fragment instead of many).
> >
> > The IPC format and CSV format already do this (one scan task per
> > fragment).  The only exception is Parquet which maps "scan task" to
> > "row group" (keeping in mind row groups may correspond to multiple
> > batches).  However, that feels like it is a detail that can be
> > encapsulated in ParquetScanTask (I can implement this in
> > https://issues.apache.org/jira/browse/ARROW-11843).  In other words...
> >
> > The scanner is responsible for managing inter-fragment parallelism
> > (how many files to read at once, pipelining file reads, etc.)
> > The scan task is responsible for managing intra-fragment parallelism
> > (how many row groups to read at once, whether to scan columns in
> > parallel, etc)
> >
> > Then, scan task can be made fully internal (ala ARROW-11782) and the
> > primary external interface would be a record batch iterator.
> >
> > This doesn't just simplify the external interface by removing a type,
> > it actually changes the workflow requirements as well (admittedly,
> > some of this is an inevitable benefit of ARROW-7001 and not directly
> > related to removing scan task).  Currently, if you want maximum
> > performance from a dataset scan, you need to run the scan tasks in
> > parallel.  For example...
> >
> > for scan_task in scanner.scan():
> >   for record_batch in scan_task:
> >     # Do something, but do it very fast or do it on another thread
> > because every ms
> >     # you spend here is a ms you could be doing I/O
> >
> > With the simplification it should simply be...
> >
> > for record_batch in scanner.scan():
> >   # While you are processing this record batch the scanner is going to 
> > continue
> >   # running on a different thread.  It will be queing up a backlog of
> > batches for you
> >   # to process.  As long as you don't take "too long" you should be
> > able to keep up.
> >   # In other words, as long as your processing time here + the time it took 
> > to
> >   # decode and prepare the batch is less than the time it takes to
> > read the batch
> >   # you will never have a break in I/O.
> >
> > -Weston
> >

Reply via email to