I talked about these problems with my colleague Michal Nowakiewicz who has been developing some of the C++ engine implementation over the last year and a half, and he wrote up this document with some ideas about task scheduling and control flow in the query engine for everyone to look at and comment:
https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit# Feedback also welcome from the Rust developers to compare/contrast with how DataFusion works On Tue, May 3, 2022 at 1:05 AM Weston Pace <weston.p...@gmail.com> wrote: > > Thanks for investigating and looking through this. Your understanding > of how things work is pretty much spot on. In addition, I think the > points you are making are valid. Our ExecNode/ExecPlan interfaces are > extremely bare bones and similar nodes have had to reimplement the > same solutions (e.g. many nodes are using things like AtomicCounter, > ThreadIndexer, AsyncTaskGroup, etc. in similar ways). Probably the > most significant short term impact of cleaning this up would be to > avoid things like the race condition in [1] which happened because one > node was doing things in a slightly older way. If anyone is > particularly interested in tackling this problem I'd be happy to go > into more details. > > However, I think you are slightly overselling the potential benefits. > I don't think this would make it easier to adopt morsel/batch, > implement asymmetric backpressure, better scheduling, work stealing, > or sequencing (all of which I agree are good ideas with the exception > of work stealing which I don't think we would significantly benefit > from). What's more, we don't have very many nodes today and I think > there is a risk of over-learning from this small sample size. For > example, this sequencing discussion is very interesting. I think an > asof join node is not a pipeline breaker, but it also does not fit the > mold of a standard pipeline node. It has multiple inputs and there is > not a clear 1:1 mapping between input and output batches. I don't > know the Velox driver model well enough to comment on it specifically > but if you were to put this node in the middle of a pipeline you might > end up generating empty batches, too-large batches, or not enough > thread tasks to saturate the cores. If you were to put it between > pipeline drivers you would potentially lose cache locality. > > Regarding morsel/batch. The main thing really preventing us from > moving to this model is the overhead cost of running small batches. > This is due to things like the problem you described in [2] and > somewhat demonstrated by benchmarks like [3]. As a result, as soon as > we shrink the batch size small enough to fit into L2, we start to see > overhead increase to eliminate the benefits we get from better cache > utilization (not just CPU overhead but also thread contention). > Unfortunately, some of the fixes here could possibly involve changes > to ExecBatch & Datum, which are used extensively in the kernel > infrastructure. From my profiling, this underutilization of cache is > one of the most significant performance issues we have today. > > [1] https://github.com/apache/arrow/pull/12894 > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd > [3] https://github.com/apache/arrow/pull/12755 > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <wesmck...@gmail.com> wrote: > > > > hi all, > > > > I've been catching up on the C++ execution engine codebase after a > > fairly long development hiatus. > > > > I have several questions / comments about the current design of the > > ExecNode and their implementations (currently: source / scan, filter, > > project, union, aggregate, sink, hash join). > > > > My current understanding of how things work is the following: > > > > * Scan/Source nodes initiate execution through the StartProducing() > > function, which spawns an asynchronous generator that yields a > > sequence of input data batches. When each batch is available, it is > > passed to child operators by calling their InputReceived methods > > > > * When InputReceived is called > > * For non-blocking operators (e.g. Filter, Project), the unit of > > work is performed immediately and the result is passed to the child > > operator by calling its InputReceived method > > * For blocking operators (e.g. HashAggregate, HashJoin), partial > > results are accumulated until the operator can begin producing output > > (all input for aggregation, or until the HT has been built for the > > HashJoin) > > > > * When an error occurs, a signal to abort will be propagated up and > > down the execution tree > > > > * Eventually output lands in a Sink node, which is the desired result > > > > One concern I have about the current structure is the way in which > > ExecNode implementations are responsible for downstream control flow, > > and the extent to which operator pipelining (the same thread advancing > > input-output chains until reaching a pipeline breaker) is implicit > > versus explicit. To give a couple examples: > > > > * In hash aggregations (GroupByNode), when the input has been > > exhausted, the GroupByNode splits the result into the desired > > execution chunk size (e.g. splitting a 1M row aggregate into batches > > of 64K rows) and then spawns future tasks that push these chunks > > through the child output exec node (by calling InputReceived) > > > > * In hash joins, the ExecNode accumulates batches to be inserted into > > the hash table (the "probed" input), until the probed input is > > exhausted, and then start asynchronously spawning tasks to probe the > > completed hash table and passing the probed results into the child > > output node > > > > I would suggest that we consider a different design that decouples > > task control flow from the ExecNode implementation. The purpose would > > be to give the user of the C++ engine more control over task > > scheduling (including the order of execution) and prioritization. > > > > One system that does things different from the Arrow C++ Engine is > > Meta's Velox project, whose operators work like this (slightly > > simplified and colored by my own imperfect understanding): > > > > * The Driver class (which is associated with a single thread) is > > responsible for execution control flow. A driver moves input batches > > through an operator pipeline. > > > > * The Driver calls the Operator::addInput function with an input > > batch. Operators are blocking vs. non-blocking based on whether the > > Operator::needsMoreInput() function returns true. Simple operators > > like Project can produce their output immediately by calling > > Operator::getOutput > > > > * When the Driver hits a blocking operator in a pipeline, it returns > > control to the calling thread so the thread can switch to doing work > > for a different driver > > > > * One artifact of this design is that hash joins are split into a > > HashBuild operator and a HashProbe operator so that the build and > > probe stages of the hash join can be scheduled and executed more > > precisely (for example: work for the pipeline that feeds the build > > operator can be prioritized over the pipeline feeding the other input > > to the probe). > > > > The idea in refactoring the Arrow C++ Engine would be instead of > > having a tree of ExecNodes, each of which has its own internal control > > flow (including the ability to spawn downstream tasks), instead > > pipelinable operators can be grouped into PipelineExecutors (which > > correspond roughly to Velox's Driver concept) which are responsible > > for control flow and invoking the ExecNodes in sequence. This would > > make it much easier for users to customize the control flow for > > particular needs (for example, the recent discussion of adding time > > series joins to the C++ engine means that the current eager-push / > > "local" control flow can create problematic input ordering problems). > > I think this might make the codebase easier to understand and test > > also (and profile / trace, maybe, too), but that is just conjecture. > > > > As a separate matter, the C++ Engine does not have a separation > > between input batches (what are called "morsels" in the HyPer paper) > > and pipeline tasks (smaller cache-friendly units to move through the > > pipeline), nor the ability (AFAICT) to do nested parallelism / work > > stealing within pipelines (this concept is discussed in [1]). > > > > Hopefully the above makes sense and I look forward to others' thoughts. > > > > Thanks, > > Wes > > > > [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf