Thanks, Weston. From your description, I can think about how the current
engine works. Let me try to map your example into execution. Then we can
explore a little bit more in detail.

WE have 300GB of data and we have a read CSV operator  (that creates an
Arrow Table) and a Write Parquet operator. Now let's say we have 2 CPU
cores (in the system) and we run both these operators at the same time.
We read; let's say 10GB of data and write that 10GB of data. Since we do
this continuously we essentially create a streaming-like system as shown
below.

Read (0 th CPU Core) ----> Write (1st CPU Core)

I believe what you are describing is something like the above. I may be
wrong.

There is another way to do this problem. That is

Read --> Write (0th Core)  (read a 10GB batch, write it and next go to the
next 10GB)
Read --> Write (1st Core)  (read a 10GB batch, write it them and next go to
the next 10GB)

The second one doesn't need back-pressure and is usually much more
efficient than the first one for batch processing.

Best,
Supun..





On Fri, May 20, 2022 at 7:38 PM Weston Pace <weston.p...@gmail.com> wrote:

> If the amount of batch data you are processing is larger than the RAM
> on the system then back pressure is needed.  A common use case is
> dataset repartitioning.  If you are repartitioning a large (e.g.
> 300GB) dataset from CSV to parquet then the bottleneck will typically
> be the "write" stage.  Backpressure must be applied or else the system
> will run out of RAM.
>
> I'm also not sure I would describe the engine as a "batch processing
> engine".  I think the C++ engine operates at a lower level than a
> typical Spark vs. Hadoop (e.g. batch vs. streaming) abstraction.  A
> streaming application and a batch application could both make use of
> the C++ engine.  If I had to pick one then I would pick a "streaming
> engine" because we do process data in a streaming fashion.
>
> On Fri, May 20, 2022 at 4:14 PM Supun Kamburugamuve <su...@apache.org>
> wrote:
> >
> > Looking at the proposal I couldn't understand why there is a need for
> > back-pressure handling. My understanding of the Arrow C++ engine is that
> it
> > is meant to process batch data. So I couldn't think of why we need to
> > handle back-pressure as it is normally needed in streaming engines.
> >
> > Best,
> > Supun.;
> >
> > On Thu, May 12, 2022 at 1:14 PM Andrew Lamb <al...@influxdata.com>
> wrote:
> >
> > > Thank you for sharing this document.
> > >
> > > Raphael Taylor-Davies is working on a similar exercise  scheduling
> > > execution for DataFusion plans. The design doc[1] and initial PR [2]
> may be
> > > an interesting reference.
> > >
> > > In the DataFusion case we were trying to improve performance in a few
> ways:
> > > 1. Within a pipeline (same definition as in C++ proposal) consume a
> batch
> > > that was produced in the same thread if possible
> > > 2. Restrict parallelism by the number of available workers rather than
> the
> > > plan structure (e.g. if reading 100 parquet files, with 8 workers,
> don't
> > > start reading all of them at once)
> > > 3. Segregate pools used  to do async IO and CPU bound work within the
> same
> > > plan execution
> > >
> > > I think the C++ proposal would achieve 1, but it isn't clear to me
> that it
> > > would achieve 2 (though I will admit to not fully understanding it)
> and I
> > > don't know about 3
> > >
> > > While there are many similarities with what is described in the C++
> > > proposal, I would say the Rust implementation is significantly less
> > > complicated than what I think is described. In particular:
> > > * There is no notion of generators
> > > * There is no notion of internal tasks (the operators themselves are
> single
> > > threaded and the parallelism is created by generating batches in
> parallel
> > > * The scheduler logic is run directly by the worker threads (rather
> than a
> > > separate thread with message queues) as the operators produce each new
> > > batch
> > >
> > > Andrew
> > >
> > > [1]
> > >
> > >
> https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA/edit#
> > > [2] https://github.com/apache/arrow-datafusion/pull/2226
> > >
> > >
> > >
> > > On Thu, May 12, 2022 at 3:24 PM Li Jin <ice.xell...@gmail.com> wrote:
> > >
> > > > Thanks Wes and Michal.
> > > >
> > > > We have similar concern about the current eager-push control flow
> with
> > > time
> > > > series / ordered data processing and am glad that we are not the
> only one
> > > > thinking about this.
> > > >
> > > > I have read the doc and so far just left some questions to make sure
> I
> > > > understand the proposal (admittedly the generator concept is
> somewhat new
> > > > to me) and also thinking about it in the context of streaming ordered
> > > data
> > > > processing.
> > > >
> > > > Excited to see where this goes,
> > > > Li
> > > >
> > > > On Wed, May 11, 2022 at 6:43 PM Wes McKinney <wesmck...@gmail.com>
> > > wrote:
> > > >
> > > > > I talked about these problems with my colleague Michal Nowakiewicz
> who
> > > > > has been developing some of the C++ engine implementation over the
> > > > > last year and a half, and he wrote up this document with some ideas
> > > > > about task scheduling and control flow in the query engine for
> > > > > everyone to look at and comment:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
> > > > >
> > > > > Feedback also welcome from the Rust developers to compare/contrast
> > > > > with how DataFusion works
> > > > >
> > > > > On Tue, May 3, 2022 at 1:05 AM Weston Pace <weston.p...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > Thanks for investigating and looking through this.  Your
> > > understanding
> > > > > > of how things work is pretty much spot on.  In addition, I think
> the
> > > > > > points you are making are valid.  Our ExecNode/ExecPlan
> interfaces
> > > are
> > > > > > extremely bare bones and similar nodes have had to reimplement
> the
> > > > > > same solutions (e.g. many nodes are using things like
> AtomicCounter,
> > > > > > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably
> the
> > > > > > most significant short term impact of cleaning this up would be
> to
> > > > > > avoid things like the race condition in [1] which happened
> because
> > > one
> > > > > > node was doing things in a slightly older way.  If anyone is
> > > > > > particularly interested in tackling this problem I'd be happy to
> go
> > > > > > into more details.
> > > > > >
> > > > > > However, I think you are slightly overselling the potential
> benefits.
> > > > > > I don't think this would make it easier to adopt morsel/batch,
> > > > > > implement asymmetric backpressure, better scheduling, work
> stealing,
> > > > > > or sequencing (all of which I agree are good ideas with the
> exception
> > > > > > of work stealing which I don't think we would significantly
> benefit
> > > > > > from).  What's more, we don't have very many nodes today and I
> think
> > > > > > there is a risk of over-learning from this small sample size.
> For
> > > > > > example, this sequencing discussion is very interesting.  I
> think an
> > > > > > asof join node is not a pipeline breaker, but it also does not
> fit
> > > the
> > > > > > mold of a standard pipeline node.  It has multiple inputs and
> there
> > > is
> > > > > > not a clear 1:1 mapping between input and output batches.  I
> don't
> > > > > > know the Velox driver model well enough to comment on it
> specifically
> > > > > > but if you were to put this node in the middle of a pipeline you
> > > might
> > > > > > end up generating empty batches, too-large batches, or not enough
> > > > > > thread tasks to saturate the cores.  If you were to put it
> between
> > > > > > pipeline drivers you would potentially lose cache locality.
> > > > > >
> > > > > > Regarding morsel/batch.  The main thing really preventing us from
> > > > > > moving to this model is the overhead cost of running small
> batches.
> > > > > > This is due to things like the problem you described in [2] and
> > > > > > somewhat demonstrated by benchmarks like [3].  As a result, as
> soon
> > > as
> > > > > > we shrink the batch size small enough to fit into L2, we start
> to see
> > > > > > overhead increase to eliminate the benefits we get from better
> cache
> > > > > > utilization (not just CPU overhead but also thread contention).
> > > > > > Unfortunately, some of the fixes here could possibly involve
> changes
> > > > > > to ExecBatch & Datum, which are used extensively in the kernel
> > > > > > infrastructure.  From my profiling, this underutilization of
> cache is
> > > > > > one of the most significant performance issues we have today.
> > > > > >
> > > > > > [1] https://github.com/apache/arrow/pull/12894
> > > > > > [2]
> https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > > > > > [3] https://github.com/apache/arrow/pull/12755
> > > > > > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <wesmck...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > hi all,
> > > > > > >
> > > > > > > I've been catching up on the C++ execution engine codebase
> after a
> > > > > > > fairly long development hiatus.
> > > > > > >
> > > > > > > I have several questions / comments about the current design
> of the
> > > > > > > ExecNode and their implementations (currently: source / scan,
> > > filter,
> > > > > > > project, union, aggregate, sink, hash join).
> > > > > > >
> > > > > > > My current understanding of how things work is the following:
> > > > > > >
> > > > > > > * Scan/Source nodes initiate execution through the
> StartProducing()
> > > > > > > function, which spawns an asynchronous generator that yields a
> > > > > > > sequence of input data batches. When each batch is available,
> it is
> > > > > > > passed to child operators by calling their InputReceived
> methods
> > > > > > >
> > > > > > > * When InputReceived is called
> > > > > > >     * For non-blocking operators (e.g. Filter, Project), the
> unit
> > > of
> > > > > > > work is performed immediately and the result is passed to the
> child
> > > > > > > operator by calling its InputReceived method
> > > > > > >     * For blocking operators (e.g. HashAggregate, HashJoin),
> > > partial
> > > > > > > results are accumulated until the operator can begin producing
> > > output
> > > > > > > (all input for aggregation, or until the HT has been built for
> the
> > > > > > > HashJoin)
> > > > > > >
> > > > > > > * When an error occurs, a signal to abort will be propagated
> up and
> > > > > > > down the execution tree
> > > > > > >
> > > > > > > * Eventually output lands in a Sink node, which is the desired
> > > result
> > > > > > >
> > > > > > > One concern I have about the current structure is the way in
> which
> > > > > > > ExecNode implementations are responsible for downstream control
> > > flow,
> > > > > > > and the extent to which operator pipelining (the same thread
> > > > advancing
> > > > > > > input-output chains until reaching a pipeline breaker) is
> implicit
> > > > > > > versus explicit. To give a couple examples:
> > > > > > >
> > > > > > > * In hash aggregations (GroupByNode), when the input has been
> > > > > > > exhausted, the GroupByNode splits the result into the desired
> > > > > > > execution chunk size (e.g. splitting a 1M row aggregate into
> > > batches
> > > > > > > of 64K rows) and then spawns future tasks that push these
> chunks
> > > > > > > through the child output exec node (by calling InputReceived)
> > > > > > >
> > > > > > > * In hash joins, the ExecNode accumulates batches to be
> inserted
> > > into
> > > > > > > the hash table (the "probed" input), until the probed input is
> > > > > > > exhausted, and then start asynchronously spawning tasks to
> probe
> > > the
> > > > > > > completed hash table and passing the probed results into the
> child
> > > > > > > output node
> > > > > > >
> > > > > > > I would suggest that we consider a different design that
> decouples
> > > > > > > task control flow from the ExecNode implementation. The purpose
> > > would
> > > > > > > be to give the user of the C++ engine more control over task
> > > > > > > scheduling (including the order of execution) and
> prioritization.
> > > > > > >
> > > > > > > One system that does things different from the Arrow C++
> Engine is
> > > > > > > Meta's Velox project, whose operators work like this (slightly
> > > > > > > simplified and colored by my own imperfect understanding):
> > > > > > >
> > > > > > > * The Driver class (which is associated with a single thread)
> is
> > > > > > > responsible for execution control flow. A driver moves input
> > > batches
> > > > > > > through an operator pipeline.
> > > > > > >
> > > > > > > * The Driver calls the Operator::addInput function with an
> input
> > > > > > > batch. Operators are blocking vs. non-blocking based on
> whether the
> > > > > > > Operator::needsMoreInput() function returns true. Simple
> operators
> > > > > > > like Project can produce their output immediately by calling
> > > > > > > Operator::getOutput
> > > > > > >
> > > > > > > * When the Driver hits a blocking operator in a pipeline, it
> > > returns
> > > > > > > control to the calling thread so the thread can switch to doing
> > > work
> > > > > > > for a different driver
> > > > > > >
> > > > > > > * One artifact of this design is that hash joins are split
> into a
> > > > > > > HashBuild operator and a HashProbe operator so that the build
> and
> > > > > > > probe stages of the hash join can be scheduled and executed
> more
> > > > > > > precisely (for example: work for the pipeline that feeds the
> build
> > > > > > > operator can be prioritized over the pipeline feeding the other
> > > input
> > > > > > > to the probe).
> > > > > > >
> > > > > > > The idea in refactoring the Arrow C++ Engine would be instead
> of
> > > > > > > having a tree of ExecNodes, each of which has its own internal
> > > > control
> > > > > > > flow (including the ability to spawn downstream tasks), instead
> > > > > > > pipelinable operators can be grouped into PipelineExecutors
> (which
> > > > > > > correspond roughly to Velox's Driver concept) which are
> responsible
> > > > > > > for control flow and invoking the ExecNodes in sequence. This
> would
> > > > > > > make it much easier for users to customize the control flow for
> > > > > > > particular needs (for example, the recent discussion of adding
> time
> > > > > > > series joins to the C++ engine means that the current
> eager-push /
> > > > > > > "local" control flow can create problematic input ordering
> > > problems).
> > > > > > > I think this might make the codebase easier to understand and
> test
> > > > > > > also (and profile / trace, maybe, too), but that is just
> > > conjecture.
> > > > > > >
> > > > > > > As a separate matter, the C++ Engine does not have a separation
> > > > > > > between input batches (what are called "morsels" in the HyPer
> > > paper)
> > > > > > > and pipeline tasks (smaller cache-friendly units to move
> through
> > > the
> > > > > > > pipeline), nor the ability (AFAICT) to do nested parallelism /
> work
> > > > > > > stealing within pipelines (this concept is discussed in [1]).
> > > > > > >
> > > > > > > Hopefully the above makes sense and I look forward to others'
> > > > thoughts.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Wes
> > > > > > >
> > > > > > > [1]:
> > > > https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
> > > > >
> > > >
> > >
> >
> >
> > --
> > Supun Kamburugamuve
>


-- 
Supun Kamburugamuve

Reply via email to