I'll hijack this thread for a bit of road mapping.  There are a number
of significant infrastructure changes that are on my mind regarding
Acero.  I'll list them here in no particular order.

 * [1] The scanner needs updated to properly support cancellation
    I mainly mention this here as it is a prerequisite for the next item
 * [2] Acero needs a proper serial execution mode (using an event loop)
 * [3] Support for ordered execution (maybe not the most on-topic JIRA)
 * General scheduler rework (this thread)

I think it's probably important that we keep ordered execution
separate from general scheduler rework.  A "boil the ocean" rework of
all things scheduler related is not likely to make much progress but
we can incrementally tackle specific problems.

> any more progress on this since May

Not that I'm aware of except that we did take a step towards the
morsel / batch model (somewhat by necessity) in [4].

> any future plans?

I don't think I can throw out any specific dates but I think it is
safe to say that these issues are important to Voltron Data as well.

[1] https://issues.apache.org/jira/browse/ARROW-16072
[2] https://issues.apache.org/jira/browse/ARROW-15732
[3] https://issues.apache.org/jira/browse/ARROW-10883
[4] https://github.com/apache/arrow/pull/13679

On Fri, Jul 22, 2022 at 11:38 AM Li Jin <ice.xell...@gmail.com> wrote:
>
> Hi!
>
> Since the scheduler improvement work came up in some recent discussions
> about how backpresures are handled in Acero, I am curious if there has been
> any more progress on this since May or any future plans?
>
> Thanks,
> Li
>
> On Mon, May 23, 2022 at 10:37 PM Weston Pace <weston.p...@gmail.com> wrote:
>
> > > About point 2. I have previously seen the pipeline prioritization you've
> > > described with both sides running simultaneously. My experience was not
> > > good with that approach and one side's first approach was much better.
> >
> > This is good insight, I appreciate it.  I hope we can run this kind of
> > experiment when we create the scheduler.  From my read of the
> > scheduler doc it looks like we can probably tweak the generator
> > priority.
> >
> > > About these points, if you are using queues, won't they automatically
> > slow
> > > down?
> >
> > It's a non-blocking queue so when the capacity is hit a backpressure
> > signal is sent towards the source and yes, we eventually slow down.
> > This means the capacity is really a "soft capacity" and we may exceed
> > it by some small amount.
> >
> > > Even if we need
> > > back-pressure join-like operations will break the graph into sections
> > where
> > > we don't have to consider backpressure across these boundaries
> >
> > This is possible, we can play around with it.  However, we are working
> > on adding spillover support to joins.  So the path downstream of the
> > join could still be asynchronous as we might be reading (slowly) from
> > the spilled partitions.
> >
> > We do have a few places where we utilize a synchronous parallel-for
> > without backpressure.  The scheduler doc refers to these as
> > non-pipeline tasks.  For example, once we've got all the build data we
> > synchronously divide and conquer to probe any queued batches.
> >
> > On Sat, May 21, 2022 at 5:08 AM Supun Kamburugamuve <su...@apache.org>
> > wrote:
> > >
> > > Thanks, Weston. Now I understand a little deeper.
> > >
> > > About point 2. I have previously seen the pipeline prioritization you've
> > > described with both sides running simultaneously. My experience was not
> > > good with that approach and one side's first approach was much better.
> > >
> > > About these points, if you are using queues, won't they automatically
> > slow
> > > down?
> > >
> > > Further for joins, we need to read the complete tables and apply some
> > > operations like partitioning before they can do the actual join. Also,
> > join
> > > is a synchronous operation across the threads (partitions). So we have to
> > > synchronize the tasks/threads (doing the join in parallel) at that time
> > and
> > > downstream tasks cannot really start working until everything is read up
> > to
> > > the join and they start producing some partitions. Even if we need
> > > back-pressure join-like operations will break the graph into sections
> > where
> > > we don't have to consider backpressure across these boundaries
> > >
> > > Best,
> > > Supun.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, May 20, 2022 at 9:08 PM Weston Pace <weston.p...@gmail.com>
> > wrote:
> > >
> > > > I think I understand what you are saying now.  There are a few
> > > > different things going on that lead to a need for backpressure.
> > > >
> > > > 1. Write backpressure
> > > >
> > > > We support asynchronous writes.  Filesystems like AWS want many
> > > > parallel I/O operations.  On a two core system you might have 8
> > > > different parallel writes.  So we do exactly what you described and we
> > > > create a queue with a defined capacity.  When that queue fills up we
> > > > stop reading.
> > > >
> > > > 2. Pipeline prioritization
> > > >
> > > > The method you are describing requires that we prioritize the order of
> > > > pipeline processing.  For example, in a join node, we would fully
> > > > process the shorter build side and then process the probe side in a
> > > > streaming fashion.  We don't currently do that (and it is a separate
> > > > discussion whether we would even want to).  Instead we read from both
> > > > sides simultaneously.  If the build side read takes too long then we
> > > > need to pause reading from the probe side.
> > > >
> > > > 3. Non-typical operators
> > > >
> > > > The as-of join node is an interesting operator.  It can have many
> > > > different inputs and It requires us to read from all inputs at roughly
> > > > the same rate.  However, if some inputs are denser than other inputs
> > > > (e.g. more rows per time) then we need to pause the other inputs while
> > > > we catch up on the long tail.
> > > >
> > > > On Fri, May 20, 2022 at 5:46 PM Supun Kamburugamuve <su...@apache.org>
> > > > wrote:
> > > > >
> > > > > Hi Sasha,
> > > > >
> > > > > For case 2, I don't see why we need a back-pressure mechanism. Lets
> > say
> > > > > there is an IO thread. All we need is a queue with a defined capacity
> > > > that
> > > > > feeds data from IO thread to the Read task.
> > > > >
> > > > > Supun..
> > > > >
> > > > > On Fri, May 20, 2022 at 8:25 PM Sasha Krassovsky <
> > > > krassovskysa...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Supun,
> > > > > > Roughly what happens now is #2. However, in your example, it may
> > be the
> > > > > > case that we are reading CSV data from disk faster than we are
> > > > transcoding
> > > > > > it into Parquet and writing it. Note that we attempt to use the
> > full
> > > > disk
> > > > > > bandwidth and assign batches to cores once the reads are done, so
> > > > ideally a
> > > > > > core is never blocked on IO. In other words, even if we have 2 only
> > > > cores,
> > > > > > we may kick off 100 batch reads and process them when the read
> > > > completes
> > > > > > and a core is available. This is where backpressure is needed: to
> > > > prevent
> > > > > > us from having this huge number of reads piling up and filling up
> > > > memory
> > > > > > faster than we can process.
> > > > > >
> > > > > > Sasha Krassovsky
> > > > > >
> > > > > > > 20 мая 2022 г., в 20:09, Supun Kamburugamuve <su...@apache.org>
> > > > > > написал(а):
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Supun Kamburugamuve
> > > >
> > >
> > >
> > > --
> > > Supun Kamburugamuve
> >

Reply via email to