Probably attachments don't work with the ML.  Here are links to my drive:

Current Serial:
https://drive.google.com/file/d/1URbDAqXVSYfHJixzHA00CI1f0aedms8W/view?usp=sharing
Proposed Serial:
https://drive.google.com/file/d/1JpQiIVaGAL9mrderkid5uf888zGx21fO/view?usp=sharing

On Fri, Jul 22, 2022 at 12:32 PM Ivan Chau <ivan.c...@twosigma.com> wrote:
>
> Hi Weston,
>
> Not sure if the diagrams came through here -- is there some other place I 
> need to view them?
>
> Ivan
>
> -----Original Message-----
> From: Weston Pace <weston.p...@gmail.com>
> Sent: Thursday, July 21, 2022 10:59 PM
> To: dev@arrow.apache.org
> Subject: Re: [C++] ResumeProducing Future Causing Blocking
>
> > Do you have any suggestions for a temporary workaround?
>
> I don't have any great ideas at the moment but I will continue to think it 
> over.  Unfortunately, this is somewhat tied up with the ordering issue.  
> Details...
>
> Our current implementation of "serial execution" is "executor==nullptr" and 
> that makes it really hard to do a lot of things, for example asynchronous I/O 
> (in that case we just cheat by running the I/O in the I/O thread pool 
> anyways, so its not truly serial).  When we want to "resume producing on 
> source X" what we really would like to do is "add a task to the scheduler to 
> process the next batch on source X".  However, since we don't have an 
> executor, we can't do that.  Our only choice is to hijack the calling thread 
> and resume immediately.  However, this makes for very confusing scheduling.  
> It works ok for the very simple plans we process today but, for more 
> complicated things like backpressure, it gets a bit silly, as you've observed.
>
> A better way to do "serial execution" is to use an event loop.  When you want 
> to "resume producing on source X" you "add a task tot he scheduler to process 
> the next batch on source X".  This task goes at the end of the event loop and 
> then you return control to the caller of resume producing.  Once that caller 
> is done you can go back to the event queue.  We have an event loop, it is 
> called the SerialExecutor, and I came very close to switching the exec plan 
> over to it in [1] but failed because of a limitation in the scanner [2].  
> Fixing this mess is my personal top priority but I haven't been able to get 
> enough time to sit down and do it for a few months.  I'm hoping I will have 
> this time after this week when 9.0.0 wraps up.
>
> > I tried a solution by making the ResumeProducing call with a
> > fire-and-forget std::thread, but this doesn't seem like a great idea;
> > It gets pretty hairy when things are getting deconstructed at the end
> > of execution (it seems like ResumeProducing still has some invalid
> > reads despite checking finished() on the source nodes), and I'm
> > assuming we don't want to spin up more threads anyway. I also looked
> > into the plan's executor (ScheduleTask, etc.), but I believe this
> > waits for the task to > complete, so it causes blocking in the processing.
>
> This isn't all that bad of an idea.  The plan's executor doesn't exist
> (executor==nullptr) and so ScheduleTask does indeed just run the task 
> immediately.  A slightly more "integrated" way of doing this would be to spin 
> up a task to call resume producing on the I/O thread pool.
> This will help limit the total number of threads which is important for 
> things like thread local state.  Maybe something like...
>
> ```
> ARROW_ASSIGN_OR_RAISE(Future<> task_lifetime, plan()->BeginExternalTask()); 
> if (task_lifetime.is_valid()) { 
> ::arrow::io::default_io_context().executor()->Spawn([this,
> task_lifetime] () mutable {
>   inputs()[0]->ResumeProducing(this, counter_); }); } else {
>   // Plan has been aborted, just return and don't worry about resuming 
> producing } ```
>
> The main downside here is that this is probably going to introduce 
> parallelism back into the plan and mess up your ordering.  The thread that 
> submits this task is going to return, and then it is going to grab the next 
> batch from whatever unpaused source you have, and this will run in parallel 
> with the resume task, which continues.
>
> Another workaround is to just enable parallel execution.  Backpressure is a 
> lot more manageable then.  You can submit your resume producing task using 
> ScheduleTask.  Or, even better, we should probably change source node so that 
> it schedules a new task for the backpressure resumption automatically.  I've 
> created [3] to track this.  Of course, parallel execution introduces out of 
> order processing, which if I understand, is a problem.
>
> As promised, I'm attaching some example diagrams.  The first shows the status 
> quo, it's pretty messy.  The second shows how things should work in the 
> solutions proposed in
> https://github.com/apache/arrow/pull/12468
>
> I will try and make an example diagram for threaded execution (executor != 
> nullptr) tomorrow and also make some diagrams on how sequencing might be 
> tackled.
>
> [1] https://github.com/apache/arrow/pull/12468
> [2] https://issues.apache.org/jira/browse/ARROW-16072
> [3] https://issues.apache.org/jira/browse/ARROW-17180
>
> On Thu, Jul 21, 2022 at 2:31 PM Ivan Chau <ivan.c...@twosigma.com> wrote:
> >
> > It seems like for this to work currently, we would want to call 
> > ResumeProducing on the source nodes, but outside of the processing thread.
> >
> > I tried a solution by making the ResumeProducing call with a 
> > fire-and-forget std::thread, but this doesn't seem like a great idea; It 
> > gets pretty hairy when things are getting deconstructed at the end of 
> > execution (it seems like ResumeProducing still has some invalid reads 
> > despite checking finished() on the source nodes), and I'm assuming we don't 
> > want to spin up more threads anyway. I also looked into the plan's executor 
> > (ScheduleTask, etc.), but I believe this waits for the task to complete, so 
> > it causes blocking in the processing.
> >
> > Do you have any suggestions for a temporary workaround?
> >
> > Ivan
> >
> > -----Original Message-----
> > From: Ivan Chau <ivan.c...@twosigma.com>
> > Sent: Thursday, July 21, 2022 9:28 AM
> > To: dev@arrow.apache.org
> > Subject: RE: [C++] ResumeProducing Future Causing Blocking
> >
> > Thanks Sasha and Weston! The diagrams would be helpful!
> >
> > Would the new first class support in the scheduler be something similar to 
> > what's available currently in BackpressureMonitor? We are looking to 
> > implement some more custom backpressure schemes that depend on batch 
> > ordering/completion rather than memory size.
> >
> > Ivan
> >
> > -----Original Message-----
> > From: Weston Pace <weston.p...@gmail.com>
> > Sent: Wednesday, July 20, 2022 8:31 PM
> > To: dev@arrow.apache.org
> > Subject: Re: [C++] ResumeProducing Future Causing Blocking
> >
> > > 4) control is not returned to the processing thread
> >
> > Yes, it looks like the current implementation does not return control to 
> > the processing thread, but I think this is correct, or at least "as 
> > designed".  The thread will be used to continue iterating the source.
> >
> > > control is not returned to the processing thread, and instead blocks
> > > when marking the backpressure_future_ as finished.
> >
> > As Sasha said, the call to "MarkFinished" will then run callbacks.
> > One of those callbacks (the only one in this case) then continues to 
> > iterate from the source, doing the work that was originally started by the 
> > call to StartProducing.
> >
> > Generally, the code will only go so far before it creates a new thread task 
> > and then control will eventually return.  However, if you are running 
> > without an executor, then there are no thread tasks, all callbacks run 
> > immediately in the thread calling mark finished, and it can be rather hard 
> > to understand the logic.  I'll try and draw up some sequence diagrams as an 
> > example for this and Li Jin's earlier question regarding ordering.
> >
> > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing 
> > > unfortunately. It is currently not tested anywhere as far as I can tell 
> > > and ignored by a lot of nodes (such as HashJoinNode).
> >
> > We do test, and rely, on the PauseProducing/ResumeProducing mechanics
> > to implement back-pressure for the datasets API.  This limits plans to
> > scan->filter->project->sink and all of these nodes have been tested to
> > accurately work with backpressure.  I think you're free to experiment with 
> > it.  I agree however, that backpressure could maybe be a more minor concern 
> > until some of the scheduler improvements are available.
> >
> >
> > On Wed, Jul 20, 2022 at 3:13 PM Sasha Krassovsky 
> > <krassovskysa...@gmail.com> wrote:
> > >
> > > Hi,
> > > Futures run callbacks on the thread that marks then as finished. It seems 
> > > that inside of the Source node’s generator loop does add a callback 
> > > (https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130
> > >  
> > > <https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130>)
> > >  which continues the loop. I’m not entirely sure myself how this code 
> > > works (this generator + control flow thing is very opaque), but my guess 
> > > is that’s what’s causing it.
> > >
> > > One further note, copying a Future<> actually maintains a reference to 
> > > the same underlying future, which may also be unexpected at first. 
> > > Specifically in your code, doing Future<> to_finish = 
> > > backpressure_future_; to_finish.MarkFinished(); is equivalent to just 
> > > backpressure_future_.MarkFinished().
> > >
> > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing 
> > > unfortunately. It is currently not tested anywhere as far as I can tell 
> > > and ignored by a lot of nodes (such as HashJoinNode). Michal and I have 
> > > some work in progress involving a new scheduler with first-class support 
> > > for back pressure.
> > >
> > > Sasha
> > >
> > > > On Jul 20, 2022, at 1:49 PM, Ivan Chau <ivan.m.c...@gmail.com> wrote:
> > > >
> > > > backpressure_future_
> > >

Reply via email to