Re: [Compute][C++] Question on compute scheduler

2022-04-27 Thread Li Jin
I see. Yeah, spill to disk seems to be a reasonable approach. Hard back pressure does seem like it can lead to deadlocks. On Wed, Apr 27, 2022 at 4:55 PM Weston Pace wrote: > Our backpressure is best-effort. A push downstream will never > fail/block. Eventually, when sinks (or pipeline breakers)

Re: [Compute][C++] Question on compute scheduler

2022-04-27 Thread Weston Pace
Our backpressure is best-effort. A push downstream will never fail/block. Eventually, when sinks (or pipeline breakers) start to fill up, a pause message is sent to the source nodes. However, anything in progress will continue and should not be prevented from completing and pushing results upwards.

Re: [Compute][C++] Question on compute scheduler

2022-04-27 Thread Li Jin
Thanks both! The ExecPlan Sequencing doc is interesting and close to the problem that we are trying to solve. (Ordered progressing) One thought is that I can see some cases for deadlock if we are not careful, for example (Filter Node -> Asof Join Node, assuming Asof Join node requires ordered inpu

Re: [Compute][C++] Question on compute scheduler

2022-04-26 Thread Weston Pace
There was an old design document I proposed on this ML a while back. I never got around to implementing it and I think it has aged somewhat but it covers some of the points I brought up and it might be worth reviewing. https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs

Re: [Compute][C++] Question on compute scheduler

2022-04-26 Thread Sasha Krassovsky
An ExecPlan is composed of a bunch of implicit “pipelines”. Each node in a pipeline (starting with a source node) implements `InputReceived` and `InputFinished`. On `InputReceived`, it performs its computation and calls `InputReceived` on its output. On `InputFinished`, it performs any cleanup a

Re: [Compute][C++] Question on compute scheduler

2022-04-26 Thread Weston Pace
I think this is doable. I think we want to introduce the concept of a batch index. The scanner is then responsible for assigning a batch index to each outgoing batch. Some ExecNode's would reset or destroy the batch index (for example, you cannot generally do an asof join after a hash join unles

Re: [Compute][C++] Question on compute scheduler

2022-04-26 Thread Li Jin
Hey thanks again for the reply! > I would suggest accumulating all batches just like in Hash Join This is something I intentionally try to avoid because asof join (and many other time series operations) can be performed in a streaming fashion to reduce memory footprint. > When you want to scale u

Re: [Compute][C++] Question on compute scheduler

2022-04-26 Thread Sasha Krassovsky
I would advise against relying on any specific ordering of batches coming in. When you want to scale up to multiple threads, you will no longer be able to rely on any order because scheduling is generally pretty nondeterministic. I would suggest accumulating all batches just like in Hash Join, a

Re: [Compute][C++] Question on compute scheduler

2022-04-26 Thread Li Jin
> In order to produce a output for a left batch, I would need to wait until I received enough batches from the right tables to cover all potential matches (wait until I have seen right timestamps outside the matching range) Add a bit more explanation, let's say the time range of the current left ba

Re: [Compute][C++] Question on compute scheduler

2022-04-26 Thread Li Jin
Thanks both for the reply. To add a bit more context, I am trying to implement an "asof join". Here I have one left table and n right table, and all batches arrive in time order. In order to produce a output for a left batch, I would need to wait until I received enough batches from the right tabl

Re: [Compute][C++] Question on compute scheduler

2022-04-25 Thread Sasha Krassovsky
If I understand correctly, on InputReceived you’ll be accumulating batches until you have enough to compute the next output? In that case, you have two options: you can either just immediately compute it using the same thread, or call the schedule_callback directly (not using the scheduler). I t

Re: [Compute][C++] Question on compute scheduler

2022-04-25 Thread Weston Pace
Thanks Sasha, your intuition on the SerialExecutor is correct. One of the changes I am working on[1] will make it so that an executor is always present. The behavior when you do not have an executor is rather strange (sometimes I/O threads are used and sometimes the calling thread is used) and th

Re: [Compute][C++] Question on compute scheduler

2022-04-25 Thread Li Jin
Thanks! That's super helpful. A follow up question on TaskScheduler - What's the correct way to define a task that "do work if input batches are ready, otherwise try later"? Sth like Status try_process(): if enough_inputs_to _produce_next_output: compute_and_produce_next_output();

Re: [Compute][C++] Question on compute scheduler

2022-04-25 Thread Sasha Krassovsky
Hi Li, I’ll answer the questions in order: 1. Your guess is correct! The Hash Join may be used standalone (mostly in testing or benchmarking for now) or as part of the ExecNode. The ExecNode will pass the task to the Executor to be scheduled, or will run it immediately if it’s in sync mode (i.e

[Compute][C++] Question on compute scheduler

2022-04-25 Thread Li Jin
Hello! I am reading the use of TaskScheduler inside C++ compute code (reading hash join) and have some questions about it, in particular: (1) What the purpose of SchedulerTaskCallback defined here: https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute