Thanks for investigating and looking through this.  Your understanding
of how things work is pretty much spot on.  In addition, I think the
points you are making are valid.  Our ExecNode/ExecPlan interfaces are
extremely bare bones and similar nodes have had to reimplement the
same solutions (e.g. many nodes are using things like AtomicCounter,
ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
most significant short term impact of cleaning this up would be to
avoid things like the race condition in [1] which happened because one
node was doing things in a slightly older way.  If anyone is
particularly interested in tackling this problem I'd be happy to go
into more details.

However, I think you are slightly overselling the potential benefits.
I don't think this would make it easier to adopt morsel/batch,
implement asymmetric backpressure, better scheduling, work stealing,
or sequencing (all of which I agree are good ideas with the exception
of work stealing which I don't think we would significantly benefit
from).  What's more, we don't have very many nodes today and I think
there is a risk of over-learning from this small sample size.  For
example, this sequencing discussion is very interesting.  I think an
asof join node is not a pipeline breaker, but it also does not fit the
mold of a standard pipeline node.  It has multiple inputs and there is
not a clear 1:1 mapping between input and output batches.  I don't
know the Velox driver model well enough to comment on it specifically
but if you were to put this node in the middle of a pipeline you might
end up generating empty batches, too-large batches, or not enough
thread tasks to saturate the cores.  If you were to put it between
pipeline drivers you would potentially lose cache locality.

Regarding morsel/batch.  The main thing really preventing us from
moving to this model is the overhead cost of running small batches.
This is due to things like the problem you described in [2] and
somewhat demonstrated by benchmarks like [3].  As a result, as soon as
we shrink the batch size small enough to fit into L2, we start to see
overhead increase to eliminate the benefits we get from better cache
utilization (not just CPU overhead but also thread contention).
Unfortunately, some of the fixes here could possibly involve changes
to ExecBatch & Datum, which are used extensively in the kernel
infrastructure.  From my profiling, this underutilization of cache is
one of the most significant performance issues we have today.

[1] https://github.com/apache/arrow/pull/12894
[2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
[3] https://github.com/apache/arrow/pull/12755
On Mon, May 2, 2022 at 1:20 PM Wes McKinney <wesmck...@gmail.com> wrote:
>
> hi all,
>
> I've been catching up on the C++ execution engine codebase after a
> fairly long development hiatus.
>
> I have several questions / comments about the current design of the
> ExecNode and their implementations (currently: source / scan, filter,
> project, union, aggregate, sink, hash join).
>
> My current understanding of how things work is the following:
>
> * Scan/Source nodes initiate execution through the StartProducing()
> function, which spawns an asynchronous generator that yields a
> sequence of input data batches. When each batch is available, it is
> passed to child operators by calling their InputReceived methods
>
> * When InputReceived is called
>     * For non-blocking operators (e.g. Filter, Project), the unit of
> work is performed immediately and the result is passed to the child
> operator by calling its InputReceived method
>     * For blocking operators (e.g. HashAggregate, HashJoin), partial
> results are accumulated until the operator can begin producing output
> (all input for aggregation, or until the HT has been built for the
> HashJoin)
>
> * When an error occurs, a signal to abort will be propagated up and
> down the execution tree
>
> * Eventually output lands in a Sink node, which is the desired result
>
> One concern I have about the current structure is the way in which
> ExecNode implementations are responsible for downstream control flow,
> and the extent to which operator pipelining (the same thread advancing
> input-output chains until reaching a pipeline breaker) is implicit
> versus explicit. To give a couple examples:
>
> * In hash aggregations (GroupByNode), when the input has been
> exhausted, the GroupByNode splits the result into the desired
> execution chunk size (e.g. splitting a 1M row aggregate into batches
> of 64K rows) and then spawns future tasks that push these chunks
> through the child output exec node (by calling InputReceived)
>
> * In hash joins, the ExecNode accumulates batches to be inserted into
> the hash table (the "probed" input), until the probed input is
> exhausted, and then start asynchronously spawning tasks to probe the
> completed hash table and passing the probed results into the child
> output node
>
> I would suggest that we consider a different design that decouples
> task control flow from the ExecNode implementation. The purpose would
> be to give the user of the C++ engine more control over task
> scheduling (including the order of execution) and prioritization.
>
> One system that does things different from the Arrow C++ Engine is
> Meta's Velox project, whose operators work like this (slightly
> simplified and colored by my own imperfect understanding):
>
> * The Driver class (which is associated with a single thread) is
> responsible for execution control flow. A driver moves input batches
> through an operator pipeline.
>
> * The Driver calls the Operator::addInput function with an input
> batch. Operators are blocking vs. non-blocking based on whether the
> Operator::needsMoreInput() function returns true. Simple operators
> like Project can produce their output immediately by calling
> Operator::getOutput
>
> * When the Driver hits a blocking operator in a pipeline, it returns
> control to the calling thread so the thread can switch to doing work
> for a different driver
>
> * One artifact of this design is that hash joins are split into a
> HashBuild operator and a HashProbe operator so that the build and
> probe stages of the hash join can be scheduled and executed more
> precisely (for example: work for the pipeline that feeds the build
> operator can be prioritized over the pipeline feeding the other input
> to the probe).
>
> The idea in refactoring the Arrow C++ Engine would be instead of
> having a tree of ExecNodes, each of which has its own internal control
> flow (including the ability to spawn downstream tasks), instead
> pipelinable operators can be grouped into PipelineExecutors (which
> correspond roughly to Velox's Driver concept) which are responsible
> for control flow and invoking the ExecNodes in sequence. This would
> make it much easier for users to customize the control flow for
> particular needs (for example, the recent discussion of adding time
> series joins to the C++ engine means that the current eager-push /
> "local" control flow can create problematic input ordering problems).
> I think this might make the codebase easier to understand and test
> also (and profile / trace, maybe, too), but that is just conjecture.
>
> As a separate matter, the C++ Engine does not have a separation
> between input batches (what are called "morsels" in the HyPer paper)
> and pipeline tasks (smaller cache-friendly units to move through the
> pipeline), nor the ability (AFAICT) to do nested parallelism / work
> stealing within pipelines (this concept is discussed in [1]).
>
> Hopefully the above makes sense and I look forward to others' thoughts.
>
> Thanks,
> Wes
>
> [1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf

Reply via email to