> Do you have any suggestions for a temporary workaround? I don't have any great ideas at the moment but I will continue to think it over. Unfortunately, this is somewhat tied up with the ordering issue. Details...
Our current implementation of "serial execution" is "executor==nullptr" and that makes it really hard to do a lot of things, for example asynchronous I/O (in that case we just cheat by running the I/O in the I/O thread pool anyways, so its not truly serial). When we want to "resume producing on source X" what we really would like to do is "add a task to the scheduler to process the next batch on source X". However, since we don't have an executor, we can't do that. Our only choice is to hijack the calling thread and resume immediately. However, this makes for very confusing scheduling. It works ok for the very simple plans we process today but, for more complicated things like backpressure, it gets a bit silly, as you've observed. A better way to do "serial execution" is to use an event loop. When you want to "resume producing on source X" you "add a task tot he scheduler to process the next batch on source X". This task goes at the end of the event loop and then you return control to the caller of resume producing. Once that caller is done you can go back to the event queue. We have an event loop, it is called the SerialExecutor, and I came very close to switching the exec plan over to it in [1] but failed because of a limitation in the scanner [2]. Fixing this mess is my personal top priority but I haven't been able to get enough time to sit down and do it for a few months. I'm hoping I will have this time after this week when 9.0.0 wraps up. > I tried a solution by making the ResumeProducing call with a fire-and-forget > std::thread, but this doesn't seem like a great idea; It gets pretty hairy > when > things are getting deconstructed at the end of execution (it seems like > ResumeProducing still has some invalid reads despite checking finished() > on the source nodes), and I'm assuming we don't want to spin up more > threads anyway. I also looked into the plan's executor (ScheduleTask, etc.), > but I believe this waits for the task to > complete, so it causes blocking in > the processing. This isn't all that bad of an idea. The plan's executor doesn't exist (executor==nullptr) and so ScheduleTask does indeed just run the task immediately. A slightly more "integrated" way of doing this would be to spin up a task to call resume producing on the I/O thread pool. This will help limit the total number of threads which is important for things like thread local state. Maybe something like... ``` ARROW_ASSIGN_OR_RAISE(Future<> task_lifetime, plan()->BeginExternalTask()); if (task_lifetime.is_valid()) { ::arrow::io::default_io_context().executor()->Spawn([this, task_lifetime] () mutable { inputs()[0]->ResumeProducing(this, counter_); }); } else { // Plan has been aborted, just return and don't worry about resuming producing } ``` The main downside here is that this is probably going to introduce parallelism back into the plan and mess up your ordering. The thread that submits this task is going to return, and then it is going to grab the next batch from whatever unpaused source you have, and this will run in parallel with the resume task, which continues. Another workaround is to just enable parallel execution. Backpressure is a lot more manageable then. You can submit your resume producing task using ScheduleTask. Or, even better, we should probably change source node so that it schedules a new task for the backpressure resumption automatically. I've created [3] to track this. Of course, parallel execution introduces out of order processing, which if I understand, is a problem. As promised, I'm attaching some example diagrams. The first shows the status quo, it's pretty messy. The second shows how things should work in the solutions proposed in https://github.com/apache/arrow/pull/12468 I will try and make an example diagram for threaded execution (executor != nullptr) tomorrow and also make some diagrams on how sequencing might be tackled. [1] https://github.com/apache/arrow/pull/12468 [2] https://issues.apache.org/jira/browse/ARROW-16072 [3] https://issues.apache.org/jira/browse/ARROW-17180 On Thu, Jul 21, 2022 at 2:31 PM Ivan Chau <ivan.c...@twosigma.com> wrote: > > It seems like for this to work currently, we would want to call > ResumeProducing on the source nodes, but outside of the processing thread. > > I tried a solution by making the ResumeProducing call with a fire-and-forget > std::thread, but this doesn't seem like a great idea; It gets pretty hairy > when things are getting deconstructed at the end of execution (it seems like > ResumeProducing still has some invalid reads despite checking finished() on > the source nodes), and I'm assuming we don't want to spin up more threads > anyway. I also looked into the plan's executor (ScheduleTask, etc.), but I > believe this waits for the task to complete, so it causes blocking in the > processing. > > Do you have any suggestions for a temporary workaround? > > Ivan > > -----Original Message----- > From: Ivan Chau <ivan.c...@twosigma.com> > Sent: Thursday, July 21, 2022 9:28 AM > To: dev@arrow.apache.org > Subject: RE: [C++] ResumeProducing Future Causing Blocking > > Thanks Sasha and Weston! The diagrams would be helpful! > > Would the new first class support in the scheduler be something similar to > what's available currently in BackpressureMonitor? We are looking to > implement some more custom backpressure schemes that depend on batch > ordering/completion rather than memory size. > > Ivan > > -----Original Message----- > From: Weston Pace <weston.p...@gmail.com> > Sent: Wednesday, July 20, 2022 8:31 PM > To: dev@arrow.apache.org > Subject: Re: [C++] ResumeProducing Future Causing Blocking > > > 4) control is not returned to the processing thread > > Yes, it looks like the current implementation does not return control to the > processing thread, but I think this is correct, or at least "as designed". > The thread will be used to continue iterating the source. > > > control is not returned to the processing thread, and instead blocks > > when marking the backpressure_future_ as finished. > > As Sasha said, the call to "MarkFinished" will then run callbacks. > One of those callbacks (the only one in this case) then continues to iterate > from the source, doing the work that was originally started by the call to > StartProducing. > > Generally, the code will only go so far before it creates a new thread task > and then control will eventually return. However, if you are running without > an executor, then there are no thread tasks, all callbacks run immediately in > the thread calling mark finished, and it can be rather hard to understand the > logic. I'll try and draw up some sequence diagrams as an example for this > and Li Jin's earlier question regarding ordering. > > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing > > unfortunately. It is currently not tested anywhere as far as I can tell and > > ignored by a lot of nodes (such as HashJoinNode). > > We do test, and rely, on the PauseProducing/ResumeProducing mechanics to > implement back-pressure for the datasets API. This limits plans to > scan->filter->project->sink and all of these nodes have been tested to > accurately work with backpressure. I think you're free to experiment with > it. I agree however, that backpressure could maybe be a more minor concern > until some of the scheduler improvements are available. > > > On Wed, Jul 20, 2022 at 3:13 PM Sasha Krassovsky <krassovskysa...@gmail.com> > wrote: > > > > Hi, > > Futures run callbacks on the thread that marks then as finished. It seems > > that inside of the Source node’s generator loop does add a callback > > (https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130 > > > > <https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130>) > > which continues the loop. I’m not entirely sure myself how this code works > > (this generator + control flow thing is very opaque), but my guess is > > that’s what’s causing it. > > > > One further note, copying a Future<> actually maintains a reference to the > > same underlying future, which may also be unexpected at first. Specifically > > in your code, doing Future<> to_finish = backpressure_future_; > > to_finish.MarkFinished(); is equivalent to just > > backpressure_future_.MarkFinished(). > > > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing > > unfortunately. It is currently not tested anywhere as far as I can tell and > > ignored by a lot of nodes (such as HashJoinNode). Michal and I have some > > work in progress involving a new scheduler with first-class support for > > back pressure. > > > > Sasha > > > > > On Jul 20, 2022, at 1:49 PM, Ivan Chau <ivan.m.c...@gmail.com> wrote: > > > > > > backpressure_future_ > >