I'll hijack this thread for a bit of road mapping. There are a number of significant infrastructure changes that are on my mind regarding Acero. I'll list them here in no particular order.
* [1] The scanner needs updated to properly support cancellation I mainly mention this here as it is a prerequisite for the next item * [2] Acero needs a proper serial execution mode (using an event loop) * [3] Support for ordered execution (maybe not the most on-topic JIRA) * General scheduler rework (this thread) I think it's probably important that we keep ordered execution separate from general scheduler rework. A "boil the ocean" rework of all things scheduler related is not likely to make much progress but we can incrementally tackle specific problems. > any more progress on this since May Not that I'm aware of except that we did take a step towards the morsel / batch model (somewhat by necessity) in [4]. > any future plans? I don't think I can throw out any specific dates but I think it is safe to say that these issues are important to Voltron Data as well. [1] https://issues.apache.org/jira/browse/ARROW-16072 [2] https://issues.apache.org/jira/browse/ARROW-15732 [3] https://issues.apache.org/jira/browse/ARROW-10883 [4] https://github.com/apache/arrow/pull/13679 On Fri, Jul 22, 2022 at 11:38 AM Li Jin <ice.xell...@gmail.com> wrote: > > Hi! > > Since the scheduler improvement work came up in some recent discussions > about how backpresures are handled in Acero, I am curious if there has been > any more progress on this since May or any future plans? > > Thanks, > Li > > On Mon, May 23, 2022 at 10:37 PM Weston Pace <weston.p...@gmail.com> wrote: > > > > About point 2. I have previously seen the pipeline prioritization you've > > > described with both sides running simultaneously. My experience was not > > > good with that approach and one side's first approach was much better. > > > > This is good insight, I appreciate it. I hope we can run this kind of > > experiment when we create the scheduler. From my read of the > > scheduler doc it looks like we can probably tweak the generator > > priority. > > > > > About these points, if you are using queues, won't they automatically > > slow > > > down? > > > > It's a non-blocking queue so when the capacity is hit a backpressure > > signal is sent towards the source and yes, we eventually slow down. > > This means the capacity is really a "soft capacity" and we may exceed > > it by some small amount. > > > > > Even if we need > > > back-pressure join-like operations will break the graph into sections > > where > > > we don't have to consider backpressure across these boundaries > > > > This is possible, we can play around with it. However, we are working > > on adding spillover support to joins. So the path downstream of the > > join could still be asynchronous as we might be reading (slowly) from > > the spilled partitions. > > > > We do have a few places where we utilize a synchronous parallel-for > > without backpressure. The scheduler doc refers to these as > > non-pipeline tasks. For example, once we've got all the build data we > > synchronously divide and conquer to probe any queued batches. > > > > On Sat, May 21, 2022 at 5:08 AM Supun Kamburugamuve <su...@apache.org> > > wrote: > > > > > > Thanks, Weston. Now I understand a little deeper. > > > > > > About point 2. I have previously seen the pipeline prioritization you've > > > described with both sides running simultaneously. My experience was not > > > good with that approach and one side's first approach was much better. > > > > > > About these points, if you are using queues, won't they automatically > > slow > > > down? > > > > > > Further for joins, we need to read the complete tables and apply some > > > operations like partitioning before they can do the actual join. Also, > > join > > > is a synchronous operation across the threads (partitions). So we have to > > > synchronize the tasks/threads (doing the join in parallel) at that time > > and > > > downstream tasks cannot really start working until everything is read up > > to > > > the join and they start producing some partitions. Even if we need > > > back-pressure join-like operations will break the graph into sections > > where > > > we don't have to consider backpressure across these boundaries > > > > > > Best, > > > Supun. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 20, 2022 at 9:08 PM Weston Pace <weston.p...@gmail.com> > > wrote: > > > > > > > I think I understand what you are saying now. There are a few > > > > different things going on that lead to a need for backpressure. > > > > > > > > 1. Write backpressure > > > > > > > > We support asynchronous writes. Filesystems like AWS want many > > > > parallel I/O operations. On a two core system you might have 8 > > > > different parallel writes. So we do exactly what you described and we > > > > create a queue with a defined capacity. When that queue fills up we > > > > stop reading. > > > > > > > > 2. Pipeline prioritization > > > > > > > > The method you are describing requires that we prioritize the order of > > > > pipeline processing. For example, in a join node, we would fully > > > > process the shorter build side and then process the probe side in a > > > > streaming fashion. We don't currently do that (and it is a separate > > > > discussion whether we would even want to). Instead we read from both > > > > sides simultaneously. If the build side read takes too long then we > > > > need to pause reading from the probe side. > > > > > > > > 3. Non-typical operators > > > > > > > > The as-of join node is an interesting operator. It can have many > > > > different inputs and It requires us to read from all inputs at roughly > > > > the same rate. However, if some inputs are denser than other inputs > > > > (e.g. more rows per time) then we need to pause the other inputs while > > > > we catch up on the long tail. > > > > > > > > On Fri, May 20, 2022 at 5:46 PM Supun Kamburugamuve <su...@apache.org> > > > > wrote: > > > > > > > > > > Hi Sasha, > > > > > > > > > > For case 2, I don't see why we need a back-pressure mechanism. Lets > > say > > > > > there is an IO thread. All we need is a queue with a defined capacity > > > > that > > > > > feeds data from IO thread to the Read task. > > > > > > > > > > Supun.. > > > > > > > > > > On Fri, May 20, 2022 at 8:25 PM Sasha Krassovsky < > > > > krassovskysa...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Supun, > > > > > > Roughly what happens now is #2. However, in your example, it may > > be the > > > > > > case that we are reading CSV data from disk faster than we are > > > > transcoding > > > > > > it into Parquet and writing it. Note that we attempt to use the > > full > > > > disk > > > > > > bandwidth and assign batches to cores once the reads are done, so > > > > ideally a > > > > > > core is never blocked on IO. In other words, even if we have 2 only > > > > cores, > > > > > > we may kick off 100 batch reads and process them when the read > > > > completes > > > > > > and a core is available. This is where backpressure is needed: to > > > > prevent > > > > > > us from having this huge number of reads piling up and filling up > > > > memory > > > > > > faster than we can process. > > > > > > > > > > > > Sasha Krassovsky > > > > > > > > > > > > > 20 мая 2022 г., в 20:09, Supun Kamburugamuve <su...@apache.org> > > > > > > написал(а): > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Supun Kamburugamuve > > > > > > > > > > > > > -- > > > Supun Kamburugamuve > >