> About point 2. I have previously seen the pipeline prioritization you've > described with both sides running simultaneously. My experience was not > good with that approach and one side's first approach was much better.
This is good insight, I appreciate it. I hope we can run this kind of experiment when we create the scheduler. From my read of the scheduler doc it looks like we can probably tweak the generator priority. > About these points, if you are using queues, won't they automatically slow > down? It's a non-blocking queue so when the capacity is hit a backpressure signal is sent towards the source and yes, we eventually slow down. This means the capacity is really a "soft capacity" and we may exceed it by some small amount. > Even if we need > back-pressure join-like operations will break the graph into sections where > we don't have to consider backpressure across these boundaries This is possible, we can play around with it. However, we are working on adding spillover support to joins. So the path downstream of the join could still be asynchronous as we might be reading (slowly) from the spilled partitions. We do have a few places where we utilize a synchronous parallel-for without backpressure. The scheduler doc refers to these as non-pipeline tasks. For example, once we've got all the build data we synchronously divide and conquer to probe any queued batches. On Sat, May 21, 2022 at 5:08 AM Supun Kamburugamuve <su...@apache.org> wrote: > > Thanks, Weston. Now I understand a little deeper. > > About point 2. I have previously seen the pipeline prioritization you've > described with both sides running simultaneously. My experience was not > good with that approach and one side's first approach was much better. > > About these points, if you are using queues, won't they automatically slow > down? > > Further for joins, we need to read the complete tables and apply some > operations like partitioning before they can do the actual join. Also, join > is a synchronous operation across the threads (partitions). So we have to > synchronize the tasks/threads (doing the join in parallel) at that time and > downstream tasks cannot really start working until everything is read up to > the join and they start producing some partitions. Even if we need > back-pressure join-like operations will break the graph into sections where > we don't have to consider backpressure across these boundaries > > Best, > Supun. > > > > > > > > > > On Fri, May 20, 2022 at 9:08 PM Weston Pace <weston.p...@gmail.com> wrote: > > > I think I understand what you are saying now. There are a few > > different things going on that lead to a need for backpressure. > > > > 1. Write backpressure > > > > We support asynchronous writes. Filesystems like AWS want many > > parallel I/O operations. On a two core system you might have 8 > > different parallel writes. So we do exactly what you described and we > > create a queue with a defined capacity. When that queue fills up we > > stop reading. > > > > 2. Pipeline prioritization > > > > The method you are describing requires that we prioritize the order of > > pipeline processing. For example, in a join node, we would fully > > process the shorter build side and then process the probe side in a > > streaming fashion. We don't currently do that (and it is a separate > > discussion whether we would even want to). Instead we read from both > > sides simultaneously. If the build side read takes too long then we > > need to pause reading from the probe side. > > > > 3. Non-typical operators > > > > The as-of join node is an interesting operator. It can have many > > different inputs and It requires us to read from all inputs at roughly > > the same rate. However, if some inputs are denser than other inputs > > (e.g. more rows per time) then we need to pause the other inputs while > > we catch up on the long tail. > > > > On Fri, May 20, 2022 at 5:46 PM Supun Kamburugamuve <su...@apache.org> > > wrote: > > > > > > Hi Sasha, > > > > > > For case 2, I don't see why we need a back-pressure mechanism. Lets say > > > there is an IO thread. All we need is a queue with a defined capacity > > that > > > feeds data from IO thread to the Read task. > > > > > > Supun.. > > > > > > On Fri, May 20, 2022 at 8:25 PM Sasha Krassovsky < > > krassovskysa...@gmail.com> > > > wrote: > > > > > > > Hi Supun, > > > > Roughly what happens now is #2. However, in your example, it may be the > > > > case that we are reading CSV data from disk faster than we are > > transcoding > > > > it into Parquet and writing it. Note that we attempt to use the full > > disk > > > > bandwidth and assign batches to cores once the reads are done, so > > ideally a > > > > core is never blocked on IO. In other words, even if we have 2 only > > cores, > > > > we may kick off 100 batch reads and process them when the read > > completes > > > > and a core is available. This is where backpressure is needed: to > > prevent > > > > us from having this huge number of reads piling up and filling up > > memory > > > > faster than we can process. > > > > > > > > Sasha Krassovsky > > > > > > > > > 20 мая 2022 г., в 20:09, Supun Kamburugamuve <su...@apache.org> > > > > написал(а): > > > > > > > > > > > > > > > > > > -- > > > Supun Kamburugamuve > > > > > -- > Supun Kamburugamuve