Probably attachments don't work with the ML. Here are links to my drive: Current Serial: https://drive.google.com/file/d/1URbDAqXVSYfHJixzHA00CI1f0aedms8W/view?usp=sharing Proposed Serial: https://drive.google.com/file/d/1JpQiIVaGAL9mrderkid5uf888zGx21fO/view?usp=sharing
On Fri, Jul 22, 2022 at 12:32 PM Ivan Chau <ivan.c...@twosigma.com> wrote: > > Hi Weston, > > Not sure if the diagrams came through here -- is there some other place I > need to view them? > > Ivan > > -----Original Message----- > From: Weston Pace <weston.p...@gmail.com> > Sent: Thursday, July 21, 2022 10:59 PM > To: dev@arrow.apache.org > Subject: Re: [C++] ResumeProducing Future Causing Blocking > > > Do you have any suggestions for a temporary workaround? > > I don't have any great ideas at the moment but I will continue to think it > over. Unfortunately, this is somewhat tied up with the ordering issue. > Details... > > Our current implementation of "serial execution" is "executor==nullptr" and > that makes it really hard to do a lot of things, for example asynchronous I/O > (in that case we just cheat by running the I/O in the I/O thread pool > anyways, so its not truly serial). When we want to "resume producing on > source X" what we really would like to do is "add a task to the scheduler to > process the next batch on source X". However, since we don't have an > executor, we can't do that. Our only choice is to hijack the calling thread > and resume immediately. However, this makes for very confusing scheduling. > It works ok for the very simple plans we process today but, for more > complicated things like backpressure, it gets a bit silly, as you've observed. > > A better way to do "serial execution" is to use an event loop. When you want > to "resume producing on source X" you "add a task tot he scheduler to process > the next batch on source X". This task goes at the end of the event loop and > then you return control to the caller of resume producing. Once that caller > is done you can go back to the event queue. We have an event loop, it is > called the SerialExecutor, and I came very close to switching the exec plan > over to it in [1] but failed because of a limitation in the scanner [2]. > Fixing this mess is my personal top priority but I haven't been able to get > enough time to sit down and do it for a few months. I'm hoping I will have > this time after this week when 9.0.0 wraps up. > > > I tried a solution by making the ResumeProducing call with a > > fire-and-forget std::thread, but this doesn't seem like a great idea; > > It gets pretty hairy when things are getting deconstructed at the end > > of execution (it seems like ResumeProducing still has some invalid > > reads despite checking finished() on the source nodes), and I'm > > assuming we don't want to spin up more threads anyway. I also looked > > into the plan's executor (ScheduleTask, etc.), but I believe this > > waits for the task to > complete, so it causes blocking in the processing. > > This isn't all that bad of an idea. The plan's executor doesn't exist > (executor==nullptr) and so ScheduleTask does indeed just run the task > immediately. A slightly more "integrated" way of doing this would be to spin > up a task to call resume producing on the I/O thread pool. > This will help limit the total number of threads which is important for > things like thread local state. Maybe something like... > > ``` > ARROW_ASSIGN_OR_RAISE(Future<> task_lifetime, plan()->BeginExternalTask()); > if (task_lifetime.is_valid()) { > ::arrow::io::default_io_context().executor()->Spawn([this, > task_lifetime] () mutable { > inputs()[0]->ResumeProducing(this, counter_); }); } else { > // Plan has been aborted, just return and don't worry about resuming > producing } ``` > > The main downside here is that this is probably going to introduce > parallelism back into the plan and mess up your ordering. The thread that > submits this task is going to return, and then it is going to grab the next > batch from whatever unpaused source you have, and this will run in parallel > with the resume task, which continues. > > Another workaround is to just enable parallel execution. Backpressure is a > lot more manageable then. You can submit your resume producing task using > ScheduleTask. Or, even better, we should probably change source node so that > it schedules a new task for the backpressure resumption automatically. I've > created [3] to track this. Of course, parallel execution introduces out of > order processing, which if I understand, is a problem. > > As promised, I'm attaching some example diagrams. The first shows the status > quo, it's pretty messy. The second shows how things should work in the > solutions proposed in > https://github.com/apache/arrow/pull/12468 > > I will try and make an example diagram for threaded execution (executor != > nullptr) tomorrow and also make some diagrams on how sequencing might be > tackled. > > [1] https://github.com/apache/arrow/pull/12468 > [2] https://issues.apache.org/jira/browse/ARROW-16072 > [3] https://issues.apache.org/jira/browse/ARROW-17180 > > On Thu, Jul 21, 2022 at 2:31 PM Ivan Chau <ivan.c...@twosigma.com> wrote: > > > > It seems like for this to work currently, we would want to call > > ResumeProducing on the source nodes, but outside of the processing thread. > > > > I tried a solution by making the ResumeProducing call with a > > fire-and-forget std::thread, but this doesn't seem like a great idea; It > > gets pretty hairy when things are getting deconstructed at the end of > > execution (it seems like ResumeProducing still has some invalid reads > > despite checking finished() on the source nodes), and I'm assuming we don't > > want to spin up more threads anyway. I also looked into the plan's executor > > (ScheduleTask, etc.), but I believe this waits for the task to complete, so > > it causes blocking in the processing. > > > > Do you have any suggestions for a temporary workaround? > > > > Ivan > > > > -----Original Message----- > > From: Ivan Chau <ivan.c...@twosigma.com> > > Sent: Thursday, July 21, 2022 9:28 AM > > To: dev@arrow.apache.org > > Subject: RE: [C++] ResumeProducing Future Causing Blocking > > > > Thanks Sasha and Weston! The diagrams would be helpful! > > > > Would the new first class support in the scheduler be something similar to > > what's available currently in BackpressureMonitor? We are looking to > > implement some more custom backpressure schemes that depend on batch > > ordering/completion rather than memory size. > > > > Ivan > > > > -----Original Message----- > > From: Weston Pace <weston.p...@gmail.com> > > Sent: Wednesday, July 20, 2022 8:31 PM > > To: dev@arrow.apache.org > > Subject: Re: [C++] ResumeProducing Future Causing Blocking > > > > > 4) control is not returned to the processing thread > > > > Yes, it looks like the current implementation does not return control to > > the processing thread, but I think this is correct, or at least "as > > designed". The thread will be used to continue iterating the source. > > > > > control is not returned to the processing thread, and instead blocks > > > when marking the backpressure_future_ as finished. > > > > As Sasha said, the call to "MarkFinished" will then run callbacks. > > One of those callbacks (the only one in this case) then continues to > > iterate from the source, doing the work that was originally started by the > > call to StartProducing. > > > > Generally, the code will only go so far before it creates a new thread task > > and then control will eventually return. However, if you are running > > without an executor, then there are no thread tasks, all callbacks run > > immediately in the thread calling mark finished, and it can be rather hard > > to understand the logic. I'll try and draw up some sequence diagrams as an > > example for this and Li Jin's earlier question regarding ordering. > > > > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing > > > unfortunately. It is currently not tested anywhere as far as I can tell > > > and ignored by a lot of nodes (such as HashJoinNode). > > > > We do test, and rely, on the PauseProducing/ResumeProducing mechanics > > to implement back-pressure for the datasets API. This limits plans to > > scan->filter->project->sink and all of these nodes have been tested to > > accurately work with backpressure. I think you're free to experiment with > > it. I agree however, that backpressure could maybe be a more minor concern > > until some of the scheduler improvements are available. > > > > > > On Wed, Jul 20, 2022 at 3:13 PM Sasha Krassovsky > > <krassovskysa...@gmail.com> wrote: > > > > > > Hi, > > > Futures run callbacks on the thread that marks then as finished. It seems > > > that inside of the Source node’s generator loop does add a callback > > > (https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130 > > > > > > <https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130>) > > > which continues the loop. I’m not entirely sure myself how this code > > > works (this generator + control flow thing is very opaque), but my guess > > > is that’s what’s causing it. > > > > > > One further note, copying a Future<> actually maintains a reference to > > > the same underlying future, which may also be unexpected at first. > > > Specifically in your code, doing Future<> to_finish = > > > backpressure_future_; to_finish.MarkFinished(); is equivalent to just > > > backpressure_future_.MarkFinished(). > > > > > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing > > > unfortunately. It is currently not tested anywhere as far as I can tell > > > and ignored by a lot of nodes (such as HashJoinNode). Michal and I have > > > some work in progress involving a new scheduler with first-class support > > > for back pressure. > > > > > > Sasha > > > > > > > On Jul 20, 2022, at 1:49 PM, Ivan Chau <ivan.m.c...@gmail.com> wrote: > > > > > > > > backpressure_future_ > > >