I talked about these problems with my colleague Michal Nowakiewicz who
has been developing some of the C++ engine implementation over the
last year and a half, and he wrote up this document with some ideas
about task scheduling and control flow in the query engine for
everyone to look at and comment:

https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#

Feedback also welcome from the Rust developers to compare/contrast
with how DataFusion works

On Tue, May 3, 2022 at 1:05 AM Weston Pace <weston.p...@gmail.com> wrote:
>
> Thanks for investigating and looking through this.  Your understanding
> of how things work is pretty much spot on.  In addition, I think the
> points you are making are valid.  Our ExecNode/ExecPlan interfaces are
> extremely bare bones and similar nodes have had to reimplement the
> same solutions (e.g. many nodes are using things like AtomicCounter,
> ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> most significant short term impact of cleaning this up would be to
> avoid things like the race condition in [1] which happened because one
> node was doing things in a slightly older way.  If anyone is
> particularly interested in tackling this problem I'd be happy to go
> into more details.
>
> However, I think you are slightly overselling the potential benefits.
> I don't think this would make it easier to adopt morsel/batch,
> implement asymmetric backpressure, better scheduling, work stealing,
> or sequencing (all of which I agree are good ideas with the exception
> of work stealing which I don't think we would significantly benefit
> from).  What's more, we don't have very many nodes today and I think
> there is a risk of over-learning from this small sample size.  For
> example, this sequencing discussion is very interesting.  I think an
> asof join node is not a pipeline breaker, but it also does not fit the
> mold of a standard pipeline node.  It has multiple inputs and there is
> not a clear 1:1 mapping between input and output batches.  I don't
> know the Velox driver model well enough to comment on it specifically
> but if you were to put this node in the middle of a pipeline you might
> end up generating empty batches, too-large batches, or not enough
> thread tasks to saturate the cores.  If you were to put it between
> pipeline drivers you would potentially lose cache locality.
>
> Regarding morsel/batch.  The main thing really preventing us from
> moving to this model is the overhead cost of running small batches.
> This is due to things like the problem you described in [2] and
> somewhat demonstrated by benchmarks like [3].  As a result, as soon as
> we shrink the batch size small enough to fit into L2, we start to see
> overhead increase to eliminate the benefits we get from better cache
> utilization (not just CPU overhead but also thread contention).
> Unfortunately, some of the fixes here could possibly involve changes
> to ExecBatch & Datum, which are used extensively in the kernel
> infrastructure.  From my profiling, this underutilization of cache is
> one of the most significant performance issues we have today.
>
> [1] https://github.com/apache/arrow/pull/12894
> [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> [3] https://github.com/apache/arrow/pull/12755
> On Mon, May 2, 2022 at 1:20 PM Wes McKinney <wesmck...@gmail.com> wrote:
> >
> > hi all,
> >
> > I've been catching up on the C++ execution engine codebase after a
> > fairly long development hiatus.
> >
> > I have several questions / comments about the current design of the
> > ExecNode and their implementations (currently: source / scan, filter,
> > project, union, aggregate, sink, hash join).
> >
> > My current understanding of how things work is the following:
> >
> > * Scan/Source nodes initiate execution through the StartProducing()
> > function, which spawns an asynchronous generator that yields a
> > sequence of input data batches. When each batch is available, it is
> > passed to child operators by calling their InputReceived methods
> >
> > * When InputReceived is called
> >     * For non-blocking operators (e.g. Filter, Project), the unit of
> > work is performed immediately and the result is passed to the child
> > operator by calling its InputReceived method
> >     * For blocking operators (e.g. HashAggregate, HashJoin), partial
> > results are accumulated until the operator can begin producing output
> > (all input for aggregation, or until the HT has been built for the
> > HashJoin)
> >
> > * When an error occurs, a signal to abort will be propagated up and
> > down the execution tree
> >
> > * Eventually output lands in a Sink node, which is the desired result
> >
> > One concern I have about the current structure is the way in which
> > ExecNode implementations are responsible for downstream control flow,
> > and the extent to which operator pipelining (the same thread advancing
> > input-output chains until reaching a pipeline breaker) is implicit
> > versus explicit. To give a couple examples:
> >
> > * In hash aggregations (GroupByNode), when the input has been
> > exhausted, the GroupByNode splits the result into the desired
> > execution chunk size (e.g. splitting a 1M row aggregate into batches
> > of 64K rows) and then spawns future tasks that push these chunks
> > through the child output exec node (by calling InputReceived)
> >
> > * In hash joins, the ExecNode accumulates batches to be inserted into
> > the hash table (the "probed" input), until the probed input is
> > exhausted, and then start asynchronously spawning tasks to probe the
> > completed hash table and passing the probed results into the child
> > output node
> >
> > I would suggest that we consider a different design that decouples
> > task control flow from the ExecNode implementation. The purpose would
> > be to give the user of the C++ engine more control over task
> > scheduling (including the order of execution) and prioritization.
> >
> > One system that does things different from the Arrow C++ Engine is
> > Meta's Velox project, whose operators work like this (slightly
> > simplified and colored by my own imperfect understanding):
> >
> > * The Driver class (which is associated with a single thread) is
> > responsible for execution control flow. A driver moves input batches
> > through an operator pipeline.
> >
> > * The Driver calls the Operator::addInput function with an input
> > batch. Operators are blocking vs. non-blocking based on whether the
> > Operator::needsMoreInput() function returns true. Simple operators
> > like Project can produce their output immediately by calling
> > Operator::getOutput
> >
> > * When the Driver hits a blocking operator in a pipeline, it returns
> > control to the calling thread so the thread can switch to doing work
> > for a different driver
> >
> > * One artifact of this design is that hash joins are split into a
> > HashBuild operator and a HashProbe operator so that the build and
> > probe stages of the hash join can be scheduled and executed more
> > precisely (for example: work for the pipeline that feeds the build
> > operator can be prioritized over the pipeline feeding the other input
> > to the probe).
> >
> > The idea in refactoring the Arrow C++ Engine would be instead of
> > having a tree of ExecNodes, each of which has its own internal control
> > flow (including the ability to spawn downstream tasks), instead
> > pipelinable operators can be grouped into PipelineExecutors (which
> > correspond roughly to Velox's Driver concept) which are responsible
> > for control flow and invoking the ExecNodes in sequence. This would
> > make it much easier for users to customize the control flow for
> > particular needs (for example, the recent discussion of adding time
> > series joins to the C++ engine means that the current eager-push /
> > "local" control flow can create problematic input ordering problems).
> > I think this might make the codebase easier to understand and test
> > also (and profile / trace, maybe, too), but that is just conjecture.
> >
> > As a separate matter, the C++ Engine does not have a separation
> > between input batches (what are called "morsels" in the HyPer paper)
> > and pipeline tasks (smaller cache-friendly units to move through the
> > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > stealing within pipelines (this concept is discussed in [1]).
> >
> > Hopefully the above makes sense and I look forward to others' thoughts.
> >
> > Thanks,
> > Wes
> >
> > [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf

Reply via email to