This is a bit of a follow-up on https://issues.apache.org/jira/browse/ARROW-11782 and also a bit of a consequence of my work on https://issues.apache.org/jira/browse/ARROW-7001 (nested scan parallelism).
I think the current dataset interface should be simplified. Currently, we have Dataset ->* Fragment ->* ScanTask ->* RecordBatch with the components being... Dataset - Binds together a format & fragment discovery Fragment - Something that maps to an input stream (usually a file) ScanTask - Created by a format, turns an input stream into record batches. RecordBatch - I hope I don't need to define this one :) The change I'm recommending (and starting to implement in ARROW-7001) is to change the cardinality of Fragment ->* ScanTask to Fragment -> ScanTask (i.e. one scan task per fragment instead of many). The IPC format and CSV format already do this (one scan task per fragment). The only exception is Parquet which maps "scan task" to "row group" (keeping in mind row groups may correspond to multiple batches). However, that feels like it is a detail that can be encapsulated in ParquetScanTask (I can implement this in https://issues.apache.org/jira/browse/ARROW-11843). In other words... The scanner is responsible for managing inter-fragment parallelism (how many files to read at once, pipelining file reads, etc.) The scan task is responsible for managing intra-fragment parallelism (how many row groups to read at once, whether to scan columns in parallel, etc) Then, scan task can be made fully internal (ala ARROW-11782) and the primary external interface would be a record batch iterator. This doesn't just simplify the external interface by removing a type, it actually changes the workflow requirements as well (admittedly, some of this is an inevitable benefit of ARROW-7001 and not directly related to removing scan task). Currently, if you want maximum performance from a dataset scan, you need to run the scan tasks in parallel. For example... for scan_task in scanner.scan(): for record_batch in scan_task: # Do something, but do it very fast or do it on another thread because every ms # you spend here is a ms you could be doing I/O With the simplification it should simply be... for record_batch in scanner.scan(): # While you are processing this record batch the scanner is going to continue # running on a different thread. It will be queing up a backlog of batches for you # to process. As long as you don't take "too long" you should be able to keep up. # In other words, as long as your processing time here + the time it took to # decode and prepare the batch is less than the time it takes to read the batch # you will never have a break in I/O. -Weston
