hi all, I've been catching up on the C++ execution engine codebase after a fairly long development hiatus.
I have several questions / comments about the current design of the ExecNode and their implementations (currently: source / scan, filter, project, union, aggregate, sink, hash join). My current understanding of how things work is the following: * Scan/Source nodes initiate execution through the StartProducing() function, which spawns an asynchronous generator that yields a sequence of input data batches. When each batch is available, it is passed to child operators by calling their InputReceived methods * When InputReceived is called * For non-blocking operators (e.g. Filter, Project), the unit of work is performed immediately and the result is passed to the child operator by calling its InputReceived method * For blocking operators (e.g. HashAggregate, HashJoin), partial results are accumulated until the operator can begin producing output (all input for aggregation, or until the HT has been built for the HashJoin) * When an error occurs, a signal to abort will be propagated up and down the execution tree * Eventually output lands in a Sink node, which is the desired result One concern I have about the current structure is the way in which ExecNode implementations are responsible for downstream control flow, and the extent to which operator pipelining (the same thread advancing input-output chains until reaching a pipeline breaker) is implicit versus explicit. To give a couple examples: * In hash aggregations (GroupByNode), when the input has been exhausted, the GroupByNode splits the result into the desired execution chunk size (e.g. splitting a 1M row aggregate into batches of 64K rows) and then spawns future tasks that push these chunks through the child output exec node (by calling InputReceived) * In hash joins, the ExecNode accumulates batches to be inserted into the hash table (the "probed" input), until the probed input is exhausted, and then start asynchronously spawning tasks to probe the completed hash table and passing the probed results into the child output node I would suggest that we consider a different design that decouples task control flow from the ExecNode implementation. The purpose would be to give the user of the C++ engine more control over task scheduling (including the order of execution) and prioritization. One system that does things different from the Arrow C++ Engine is Meta's Velox project, whose operators work like this (slightly simplified and colored by my own imperfect understanding): * The Driver class (which is associated with a single thread) is responsible for execution control flow. A driver moves input batches through an operator pipeline. * The Driver calls the Operator::addInput function with an input batch. Operators are blocking vs. non-blocking based on whether the Operator::needsMoreInput() function returns true. Simple operators like Project can produce their output immediately by calling Operator::getOutput * When the Driver hits a blocking operator in a pipeline, it returns control to the calling thread so the thread can switch to doing work for a different driver * One artifact of this design is that hash joins are split into a HashBuild operator and a HashProbe operator so that the build and probe stages of the hash join can be scheduled and executed more precisely (for example: work for the pipeline that feeds the build operator can be prioritized over the pipeline feeding the other input to the probe). The idea in refactoring the Arrow C++ Engine would be instead of having a tree of ExecNodes, each of which has its own internal control flow (including the ability to spawn downstream tasks), instead pipelinable operators can be grouped into PipelineExecutors (which correspond roughly to Velox's Driver concept) which are responsible for control flow and invoking the ExecNodes in sequence. This would make it much easier for users to customize the control flow for particular needs (for example, the recent discussion of adding time series joins to the C++ engine means that the current eager-push / "local" control flow can create problematic input ordering problems). I think this might make the codebase easier to understand and test also (and profile / trace, maybe, too), but that is just conjecture. As a separate matter, the C++ Engine does not have a separation between input batches (what are called "morsels" in the HyPer paper) and pipeline tasks (smaller cache-friendly units to move through the pipeline), nor the ability (AFAICT) to do nested parallelism / work stealing within pipelines (this concept is discussed in [1]). Hopefully the above makes sense and I look forward to others' thoughts. Thanks, Wes [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf