Thanks Weston! Appreciate the information. On Mon, Jul 25, 2022 at 10:04 PM Weston Pace <weston.p...@gmail.com> wrote:
> I'll hijack this thread for a bit of road mapping. There are a number > of significant infrastructure changes that are on my mind regarding > Acero. I'll list them here in no particular order. > > * [1] The scanner needs updated to properly support cancellation > I mainly mention this here as it is a prerequisite for the next item > * [2] Acero needs a proper serial execution mode (using an event loop) > * [3] Support for ordered execution (maybe not the most on-topic JIRA) > * General scheduler rework (this thread) > > I think it's probably important that we keep ordered execution > separate from general scheduler rework. A "boil the ocean" rework of > all things scheduler related is not likely to make much progress but > we can incrementally tackle specific problems. > > > any more progress on this since May > > Not that I'm aware of except that we did take a step towards the > morsel / batch model (somewhat by necessity) in [4]. > > > any future plans? > > I don't think I can throw out any specific dates but I think it is > safe to say that these issues are important to Voltron Data as well. > > [1] https://issues.apache.org/jira/browse/ARROW-16072 > [2] https://issues.apache.org/jira/browse/ARROW-15732 > [3] https://issues.apache.org/jira/browse/ARROW-10883 > [4] https://github.com/apache/arrow/pull/13679 > > On Fri, Jul 22, 2022 at 11:38 AM Li Jin <ice.xell...@gmail.com> wrote: > > > > Hi! > > > > Since the scheduler improvement work came up in some recent discussions > > about how backpresures are handled in Acero, I am curious if there has > been > > any more progress on this since May or any future plans? > > > > Thanks, > > Li > > > > On Mon, May 23, 2022 at 10:37 PM Weston Pace <weston.p...@gmail.com> > wrote: > > > > > > About point 2. I have previously seen the pipeline prioritization > you've > > > > described with both sides running simultaneously. My experience was > not > > > > good with that approach and one side's first approach was much > better. > > > > > > This is good insight, I appreciate it. I hope we can run this kind of > > > experiment when we create the scheduler. From my read of the > > > scheduler doc it looks like we can probably tweak the generator > > > priority. > > > > > > > About these points, if you are using queues, won't they automatically > > > slow > > > > down? > > > > > > It's a non-blocking queue so when the capacity is hit a backpressure > > > signal is sent towards the source and yes, we eventually slow down. > > > This means the capacity is really a "soft capacity" and we may exceed > > > it by some small amount. > > > > > > > Even if we need > > > > back-pressure join-like operations will break the graph into sections > > > where > > > > we don't have to consider backpressure across these boundaries > > > > > > This is possible, we can play around with it. However, we are working > > > on adding spillover support to joins. So the path downstream of the > > > join could still be asynchronous as we might be reading (slowly) from > > > the spilled partitions. > > > > > > We do have a few places where we utilize a synchronous parallel-for > > > without backpressure. The scheduler doc refers to these as > > > non-pipeline tasks. For example, once we've got all the build data we > > > synchronously divide and conquer to probe any queued batches. > > > > > > On Sat, May 21, 2022 at 5:08 AM Supun Kamburugamuve <su...@apache.org> > > > wrote: > > > > > > > > Thanks, Weston. Now I understand a little deeper. > > > > > > > > About point 2. I have previously seen the pipeline prioritization > you've > > > > described with both sides running simultaneously. My experience was > not > > > > good with that approach and one side's first approach was much > better. > > > > > > > > About these points, if you are using queues, won't they automatically > > > slow > > > > down? > > > > > > > > Further for joins, we need to read the complete tables and apply some > > > > operations like partitioning before they can do the actual join. > Also, > > > join > > > > is a synchronous operation across the threads (partitions). So we > have to > > > > synchronize the tasks/threads (doing the join in parallel) at that > time > > > and > > > > downstream tasks cannot really start working until everything is > read up > > > to > > > > the join and they start producing some partitions. Even if we need > > > > back-pressure join-like operations will break the graph into sections > > > where > > > > we don't have to consider backpressure across these boundaries > > > > > > > > Best, > > > > Supun. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 20, 2022 at 9:08 PM Weston Pace <weston.p...@gmail.com> > > > wrote: > > > > > > > > > I think I understand what you are saying now. There are a few > > > > > different things going on that lead to a need for backpressure. > > > > > > > > > > 1. Write backpressure > > > > > > > > > > We support asynchronous writes. Filesystems like AWS want many > > > > > parallel I/O operations. On a two core system you might have 8 > > > > > different parallel writes. So we do exactly what you described > and we > > > > > create a queue with a defined capacity. When that queue fills up > we > > > > > stop reading. > > > > > > > > > > 2. Pipeline prioritization > > > > > > > > > > The method you are describing requires that we prioritize the > order of > > > > > pipeline processing. For example, in a join node, we would fully > > > > > process the shorter build side and then process the probe side in a > > > > > streaming fashion. We don't currently do that (and it is a > separate > > > > > discussion whether we would even want to). Instead we read from > both > > > > > sides simultaneously. If the build side read takes too long then > we > > > > > need to pause reading from the probe side. > > > > > > > > > > 3. Non-typical operators > > > > > > > > > > The as-of join node is an interesting operator. It can have many > > > > > different inputs and It requires us to read from all inputs at > roughly > > > > > the same rate. However, if some inputs are denser than other > inputs > > > > > (e.g. more rows per time) then we need to pause the other inputs > while > > > > > we catch up on the long tail. > > > > > > > > > > On Fri, May 20, 2022 at 5:46 PM Supun Kamburugamuve < > su...@apache.org> > > > > > wrote: > > > > > > > > > > > > Hi Sasha, > > > > > > > > > > > > For case 2, I don't see why we need a back-pressure mechanism. > Lets > > > say > > > > > > there is an IO thread. All we need is a queue with a defined > capacity > > > > > that > > > > > > feeds data from IO thread to the Read task. > > > > > > > > > > > > Supun.. > > > > > > > > > > > > On Fri, May 20, 2022 at 8:25 PM Sasha Krassovsky < > > > > > krassovskysa...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Supun, > > > > > > > Roughly what happens now is #2. However, in your example, it > may > > > be the > > > > > > > case that we are reading CSV data from disk faster than we are > > > > > transcoding > > > > > > > it into Parquet and writing it. Note that we attempt to use the > > > full > > > > > disk > > > > > > > bandwidth and assign batches to cores once the reads are done, > so > > > > > ideally a > > > > > > > core is never blocked on IO. In other words, even if we have 2 > only > > > > > cores, > > > > > > > we may kick off 100 batch reads and process them when the read > > > > > completes > > > > > > > and a core is available. This is where backpressure is needed: > to > > > > > prevent > > > > > > > us from having this huge number of reads piling up and filling > up > > > > > memory > > > > > > > faster than we can process. > > > > > > > > > > > > > > Sasha Krassovsky > > > > > > > > > > > > > > > 20 мая 2022 г., в 20:09, Supun Kamburugamuve < > su...@apache.org> > > > > > > > написал(а): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Supun Kamburugamuve > > > > > > > > > > > > > > > > > -- > > > > Supun Kamburugamuve > > > >