This is a bit of a follow-up on
https://issues.apache.org/jira/browse/ARROW-11782 and also a bit of a
consequence of my work on
https://issues.apache.org/jira/browse/ARROW-7001 (nested scan
parallelism).

I think the current dataset interface should be simplified.
Currently, we have Dataset ->* Fragment ->* ScanTask ->* RecordBatch
with the components being...

Dataset - Binds together a format & fragment discovery
Fragment - Something that maps to an input stream (usually a file)
ScanTask - Created by a format, turns an input stream into record batches.
RecordBatch - I hope I don't need to define this one :)

The change I'm recommending (and starting to implement in ARROW-7001)
is to change the cardinality of Fragment ->* ScanTask to Fragment ->
ScanTask (i.e. one scan task per fragment instead of many).

The IPC format and CSV format already do this (one scan task per
fragment).  The only exception is Parquet which maps "scan task" to
"row group" (keeping in mind row groups may correspond to multiple
batches).  However, that feels like it is a detail that can be
encapsulated in ParquetScanTask (I can implement this in
https://issues.apache.org/jira/browse/ARROW-11843).  In other words...

The scanner is responsible for managing inter-fragment parallelism
(how many files to read at once, pipelining file reads, etc.)
The scan task is responsible for managing intra-fragment parallelism
(how many row groups to read at once, whether to scan columns in
parallel, etc)

Then, scan task can be made fully internal (ala ARROW-11782) and the
primary external interface would be a record batch iterator.

This doesn't just simplify the external interface by removing a type,
it actually changes the workflow requirements as well (admittedly,
some of this is an inevitable benefit of ARROW-7001 and not directly
related to removing scan task).  Currently, if you want maximum
performance from a dataset scan, you need to run the scan tasks in
parallel.  For example...

for scan_task in scanner.scan():
  for record_batch in scan_task:
    # Do something, but do it very fast or do it on another thread
because every ms
    # you spend here is a ms you could be doing I/O

With the simplification it should simply be...

for record_batch in scanner.scan():
  # While you are processing this record batch the scanner is going to continue
  # running on a different thread.  It will be queing up a backlog of
batches for you
  # to process.  As long as you don't take "too long" you should be
able to keep up.
  # In other words, as long as your processing time here + the time it took to
  # decode and prepare the batch is less than the time it takes to
read the batch
  # you will never have a break in I/O.

-Weston

Reply via email to