hi all,

I've been catching up on the C++ execution engine codebase after a
fairly long development hiatus.

I have several questions / comments about the current design of the
ExecNode and their implementations (currently: source / scan, filter,
project, union, aggregate, sink, hash join).

My current understanding of how things work is the following:

* Scan/Source nodes initiate execution through the StartProducing()
function, which spawns an asynchronous generator that yields a
sequence of input data batches. When each batch is available, it is
passed to child operators by calling their InputReceived methods

* When InputReceived is called
    * For non-blocking operators (e.g. Filter, Project), the unit of
work is performed immediately and the result is passed to the child
operator by calling its InputReceived method
    * For blocking operators (e.g. HashAggregate, HashJoin), partial
results are accumulated until the operator can begin producing output
(all input for aggregation, or until the HT has been built for the
HashJoin)

* When an error occurs, a signal to abort will be propagated up and
down the execution tree

* Eventually output lands in a Sink node, which is the desired result

One concern I have about the current structure is the way in which
ExecNode implementations are responsible for downstream control flow,
and the extent to which operator pipelining (the same thread advancing
input-output chains until reaching a pipeline breaker) is implicit
versus explicit. To give a couple examples:

* In hash aggregations (GroupByNode), when the input has been
exhausted, the GroupByNode splits the result into the desired
execution chunk size (e.g. splitting a 1M row aggregate into batches
of 64K rows) and then spawns future tasks that push these chunks
through the child output exec node (by calling InputReceived)

* In hash joins, the ExecNode accumulates batches to be inserted into
the hash table (the "probed" input), until the probed input is
exhausted, and then start asynchronously spawning tasks to probe the
completed hash table and passing the probed results into the child
output node

I would suggest that we consider a different design that decouples
task control flow from the ExecNode implementation. The purpose would
be to give the user of the C++ engine more control over task
scheduling (including the order of execution) and prioritization.

One system that does things different from the Arrow C++ Engine is
Meta's Velox project, whose operators work like this (slightly
simplified and colored by my own imperfect understanding):

* The Driver class (which is associated with a single thread) is
responsible for execution control flow. A driver moves input batches
through an operator pipeline.

* The Driver calls the Operator::addInput function with an input
batch. Operators are blocking vs. non-blocking based on whether the
Operator::needsMoreInput() function returns true. Simple operators
like Project can produce their output immediately by calling
Operator::getOutput

* When the Driver hits a blocking operator in a pipeline, it returns
control to the calling thread so the thread can switch to doing work
for a different driver

* One artifact of this design is that hash joins are split into a
HashBuild operator and a HashProbe operator so that the build and
probe stages of the hash join can be scheduled and executed more
precisely (for example: work for the pipeline that feeds the build
operator can be prioritized over the pipeline feeding the other input
to the probe).

The idea in refactoring the Arrow C++ Engine would be instead of
having a tree of ExecNodes, each of which has its own internal control
flow (including the ability to spawn downstream tasks), instead
pipelinable operators can be grouped into PipelineExecutors (which
correspond roughly to Velox's Driver concept) which are responsible
for control flow and invoking the ExecNodes in sequence. This would
make it much easier for users to customize the control flow for
particular needs (for example, the recent discussion of adding time
series joins to the C++ engine means that the current eager-push /
"local" control flow can create problematic input ordering problems).
I think this might make the codebase easier to understand and test
also (and profile / trace, maybe, too), but that is just conjecture.

As a separate matter, the C++ Engine does not have a separation
between input batches (what are called "morsels" in the HyPer paper)
and pipeline tasks (smaller cache-friendly units to move through the
pipeline), nor the ability (AFAICT) to do nested parallelism / work
stealing within pipelines (this concept is discussed in [1]).

Hopefully the above makes sense and I look forward to others' thoughts.

Thanks,
Wes

[1]: https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf

Reply via email to