Thanks Wes and Michal.

We have similar concern about the current eager-push control flow with time
series / ordered data processing and am glad that we are not the only one
thinking about this.

I have read the doc and so far just left some questions to make sure I
understand the proposal (admittedly the generator concept is somewhat new
to me) and also thinking about it in the context of streaming ordered data
processing.

Excited to see where this goes,
Li

On Wed, May 11, 2022 at 6:43 PM Wes McKinney <wesmck...@gmail.com> wrote:

> I talked about these problems with my colleague Michal Nowakiewicz who
> has been developing some of the C++ engine implementation over the
> last year and a half, and he wrote up this document with some ideas
> about task scheduling and control flow in the query engine for
> everyone to look at and comment:
>
>
> https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
>
> Feedback also welcome from the Rust developers to compare/contrast
> with how DataFusion works
>
> On Tue, May 3, 2022 at 1:05 AM Weston Pace <weston.p...@gmail.com> wrote:
> >
> > Thanks for investigating and looking through this.  Your understanding
> > of how things work is pretty much spot on.  In addition, I think the
> > points you are making are valid.  Our ExecNode/ExecPlan interfaces are
> > extremely bare bones and similar nodes have had to reimplement the
> > same solutions (e.g. many nodes are using things like AtomicCounter,
> > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> > most significant short term impact of cleaning this up would be to
> > avoid things like the race condition in [1] which happened because one
> > node was doing things in a slightly older way.  If anyone is
> > particularly interested in tackling this problem I'd be happy to go
> > into more details.
> >
> > However, I think you are slightly overselling the potential benefits.
> > I don't think this would make it easier to adopt morsel/batch,
> > implement asymmetric backpressure, better scheduling, work stealing,
> > or sequencing (all of which I agree are good ideas with the exception
> > of work stealing which I don't think we would significantly benefit
> > from).  What's more, we don't have very many nodes today and I think
> > there is a risk of over-learning from this small sample size.  For
> > example, this sequencing discussion is very interesting.  I think an
> > asof join node is not a pipeline breaker, but it also does not fit the
> > mold of a standard pipeline node.  It has multiple inputs and there is
> > not a clear 1:1 mapping between input and output batches.  I don't
> > know the Velox driver model well enough to comment on it specifically
> > but if you were to put this node in the middle of a pipeline you might
> > end up generating empty batches, too-large batches, or not enough
> > thread tasks to saturate the cores.  If you were to put it between
> > pipeline drivers you would potentially lose cache locality.
> >
> > Regarding morsel/batch.  The main thing really preventing us from
> > moving to this model is the overhead cost of running small batches.
> > This is due to things like the problem you described in [2] and
> > somewhat demonstrated by benchmarks like [3].  As a result, as soon as
> > we shrink the batch size small enough to fit into L2, we start to see
> > overhead increase to eliminate the benefits we get from better cache
> > utilization (not just CPU overhead but also thread contention).
> > Unfortunately, some of the fixes here could possibly involve changes
> > to ExecBatch & Datum, which are used extensively in the kernel
> > infrastructure.  From my profiling, this underutilization of cache is
> > one of the most significant performance issues we have today.
> >
> > [1] https://github.com/apache/arrow/pull/12894
> > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > [3] https://github.com/apache/arrow/pull/12755
> > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <wesmck...@gmail.com> wrote:
> > >
> > > hi all,
> > >
> > > I've been catching up on the C++ execution engine codebase after a
> > > fairly long development hiatus.
> > >
> > > I have several questions / comments about the current design of the
> > > ExecNode and their implementations (currently: source / scan, filter,
> > > project, union, aggregate, sink, hash join).
> > >
> > > My current understanding of how things work is the following:
> > >
> > > * Scan/Source nodes initiate execution through the StartProducing()
> > > function, which spawns an asynchronous generator that yields a
> > > sequence of input data batches. When each batch is available, it is
> > > passed to child operators by calling their InputReceived methods
> > >
> > > * When InputReceived is called
> > >     * For non-blocking operators (e.g. Filter, Project), the unit of
> > > work is performed immediately and the result is passed to the child
> > > operator by calling its InputReceived method
> > >     * For blocking operators (e.g. HashAggregate, HashJoin), partial
> > > results are accumulated until the operator can begin producing output
> > > (all input for aggregation, or until the HT has been built for the
> > > HashJoin)
> > >
> > > * When an error occurs, a signal to abort will be propagated up and
> > > down the execution tree
> > >
> > > * Eventually output lands in a Sink node, which is the desired result
> > >
> > > One concern I have about the current structure is the way in which
> > > ExecNode implementations are responsible for downstream control flow,
> > > and the extent to which operator pipelining (the same thread advancing
> > > input-output chains until reaching a pipeline breaker) is implicit
> > > versus explicit. To give a couple examples:
> > >
> > > * In hash aggregations (GroupByNode), when the input has been
> > > exhausted, the GroupByNode splits the result into the desired
> > > execution chunk size (e.g. splitting a 1M row aggregate into batches
> > > of 64K rows) and then spawns future tasks that push these chunks
> > > through the child output exec node (by calling InputReceived)
> > >
> > > * In hash joins, the ExecNode accumulates batches to be inserted into
> > > the hash table (the "probed" input), until the probed input is
> > > exhausted, and then start asynchronously spawning tasks to probe the
> > > completed hash table and passing the probed results into the child
> > > output node
> > >
> > > I would suggest that we consider a different design that decouples
> > > task control flow from the ExecNode implementation. The purpose would
> > > be to give the user of the C++ engine more control over task
> > > scheduling (including the order of execution) and prioritization.
> > >
> > > One system that does things different from the Arrow C++ Engine is
> > > Meta's Velox project, whose operators work like this (slightly
> > > simplified and colored by my own imperfect understanding):
> > >
> > > * The Driver class (which is associated with a single thread) is
> > > responsible for execution control flow. A driver moves input batches
> > > through an operator pipeline.
> > >
> > > * The Driver calls the Operator::addInput function with an input
> > > batch. Operators are blocking vs. non-blocking based on whether the
> > > Operator::needsMoreInput() function returns true. Simple operators
> > > like Project can produce their output immediately by calling
> > > Operator::getOutput
> > >
> > > * When the Driver hits a blocking operator in a pipeline, it returns
> > > control to the calling thread so the thread can switch to doing work
> > > for a different driver
> > >
> > > * One artifact of this design is that hash joins are split into a
> > > HashBuild operator and a HashProbe operator so that the build and
> > > probe stages of the hash join can be scheduled and executed more
> > > precisely (for example: work for the pipeline that feeds the build
> > > operator can be prioritized over the pipeline feeding the other input
> > > to the probe).
> > >
> > > The idea in refactoring the Arrow C++ Engine would be instead of
> > > having a tree of ExecNodes, each of which has its own internal control
> > > flow (including the ability to spawn downstream tasks), instead
> > > pipelinable operators can be grouped into PipelineExecutors (which
> > > correspond roughly to Velox's Driver concept) which are responsible
> > > for control flow and invoking the ExecNodes in sequence. This would
> > > make it much easier for users to customize the control flow for
> > > particular needs (for example, the recent discussion of adding time
> > > series joins to the C++ engine means that the current eager-push /
> > > "local" control flow can create problematic input ordering problems).
> > > I think this might make the codebase easier to understand and test
> > > also (and profile / trace, maybe, too), but that is just conjecture.
> > >
> > > As a separate matter, the C++ Engine does not have a separation
> > > between input batches (what are called "morsels" in the HyPer paper)
> > > and pipeline tasks (smaller cache-friendly units to move through the
> > > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > > stealing within pipelines (this concept is discussed in [1]).
> > >
> > > Hopefully the above makes sense and I look forward to others' thoughts.
> > >
> > > Thanks,
> > > Wes
> > >
> > > [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
>

Reply via email to