I agree with making the decomposition of a fragment into tasks an internal detail of the scan implementation. It seems that we want to be moving toward a world of consuming a stream of Future<shared_ptr<RecordBatch>> and not pushing the complexity of concurrency management (necessarily) onto the consumer. The nature of multithreading/scheduling would be pushed higher in the stack -- for example, you might decide that a fragment and all its child parallel / nested tasks could go into the task queue of a single CPU core, where idle CPUs are able to steal work from that queue if they want.
On Fri, Mar 26, 2021 at 11:32 AM David Li <lidav...@apache.org> wrote: > > I agree we should present a simplified interface, and then also make ScanTask > internal, but I think that is orthogonal to whether a fragment produces one > or multiple scan tasks. > > At first, my worry with having (Parquet)ScanTask handle concurrency itself > was that it does need to coordinate with the overall scanner, right? If you > have two files with 100 row groups each, that's much different than 100 files > with two row groups each. With a scan task per row group, a single rea > naturally handles both cases, but with a single scan task per file, you have > to juggle the exact amount of readahead on an inter- and intra-file level. > > That said, there is an issue for making readahead operate by amount of memory > used instead of number of files/tasks which would presumably handle that just > as well. And right now, one (Parquet)ScanTask-per-row group does lead to some > implementation nuisance elsewhere (since all scan tasks for a file have to > share the same Parquet reader and pre-buffering task). > > Also I realize my example is poor, because you do actually want to separate > intra- and inter-fragment concurrency - you want to at least be buffering the > next files (without decoding them) while decoding the current file. And the > proposed model would make it easier to support a consumer that can process > batches out of order while limiting memory usage (just limit the > inter-scan-task readahead). > > So on balance I'm in favor of this. > > I'll also note that there could be other Fragments which may naturally have > intra-fragment parallelism, if the concern is mostly that ParquetScanTask is > a bit of an outlier. For instance, a hypothetical FlightFragment wrapping a > FlightInfo struct could generate multiple scan tasks, one per FlightEndpoint > in the FlightInfo. > > Best, > David > > On Thu, Mar 25, 2021, at 19:48, Weston Pace wrote: > > This is a bit of a follow-up on > > https://issues.apache.org/jira/browse/ARROW-11782 and also a bit of a > > consequence of my work on > > https://issues.apache.org/jira/browse/ARROW-7001 (nested scan > > parallelism). > > > > I think the current dataset interface should be simplified. > > Currently, we have Dataset ->* Fragment ->* ScanTask ->* RecordBatch > > with the components being... > > > > Dataset - Binds together a format & fragment discovery > > Fragment - Something that maps to an input stream (usually a file) > > ScanTask - Created by a format, turns an input stream into record batches. > > RecordBatch - I hope I don't need to define this one :) > > > > The change I'm recommending (and starting to implement in ARROW-7001) > > is to change the cardinality of Fragment ->* ScanTask to Fragment -> > > ScanTask (i.e. one scan task per fragment instead of many). > > > > The IPC format and CSV format already do this (one scan task per > > fragment). The only exception is Parquet which maps "scan task" to > > "row group" (keeping in mind row groups may correspond to multiple > > batches). However, that feels like it is a detail that can be > > encapsulated in ParquetScanTask (I can implement this in > > https://issues.apache.org/jira/browse/ARROW-11843). In other words... > > > > The scanner is responsible for managing inter-fragment parallelism > > (how many files to read at once, pipelining file reads, etc.) > > The scan task is responsible for managing intra-fragment parallelism > > (how many row groups to read at once, whether to scan columns in > > parallel, etc) > > > > Then, scan task can be made fully internal (ala ARROW-11782) and the > > primary external interface would be a record batch iterator. > > > > This doesn't just simplify the external interface by removing a type, > > it actually changes the workflow requirements as well (admittedly, > > some of this is an inevitable benefit of ARROW-7001 and not directly > > related to removing scan task). Currently, if you want maximum > > performance from a dataset scan, you need to run the scan tasks in > > parallel. For example... > > > > for scan_task in scanner.scan(): > > for record_batch in scan_task: > > # Do something, but do it very fast or do it on another thread > > because every ms > > # you spend here is a ms you could be doing I/O > > > > With the simplification it should simply be... > > > > for record_batch in scanner.scan(): > > # While you are processing this record batch the scanner is going to > > continue > > # running on a different thread. It will be queing up a backlog of > > batches for you > > # to process. As long as you don't take "too long" you should be > > able to keep up. > > # In other words, as long as your processing time here + the time it took > > to > > # decode and prepare the batch is less than the time it takes to > > read the batch > > # you will never have a break in I/O. > > > > -Weston > >