I see. Yeah, spill to disk seems to be a reasonable approach. Hard back
pressure does seem like it can lead to deadlocks.
On Wed, Apr 27, 2022 at 4:55 PM Weston Pace wrote:
> Our backpressure is best-effort. A push downstream will never
> fail/block. Eventually, when sinks (or pipeline breakers)
Our backpressure is best-effort. A push downstream will never
fail/block. Eventually, when sinks (or pipeline breakers) start to
fill up, a pause message is sent to the source nodes. However,
anything in progress will continue and should not be prevented from
completing and pushing results upwards.
Thanks both! The ExecPlan Sequencing doc is interesting and close to the
problem that we are trying to solve. (Ordered progressing)
One thought is that I can see some cases for deadlock if we are not
careful, for example (Filter Node -> Asof Join Node, assuming Asof Join
node requires ordered inpu
There was an old design document I proposed on this ML a while back.
I never got around to implementing it and I think it has aged somewhat
but it covers some of the points I brought up and it might be worth
reviewing.
https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs
An ExecPlan is composed of a bunch of implicit “pipelines”. Each node in a
pipeline (starting with a source node) implements `InputReceived` and
`InputFinished`. On `InputReceived`, it performs its computation and calls
`InputReceived` on its output. On `InputFinished`, it performs any cleanup a
I think this is doable. I think we want to introduce the concept of a
batch index. The scanner is then responsible for assigning a batch
index to each outgoing batch. Some ExecNode's would reset or destroy
the batch index (for example, you cannot generally do an asof join
after a hash join unles
Hey thanks again for the reply!
> I would suggest accumulating all batches just like in Hash Join
This is something I intentionally try to avoid because asof join (and many
other time series operations) can be performed in a streaming fashion to
reduce memory footprint.
> When you want to scale u
I would advise against relying on any specific ordering of batches coming in.
When you want to scale up to multiple threads, you will no longer be able to
rely on any order because scheduling is generally pretty nondeterministic. I
would suggest accumulating all batches just like in Hash Join, a
> In order to produce a output for a left batch, I would need to wait until
I received enough batches from the right tables to cover all potential
matches (wait until I have seen right timestamps outside the matching range)
Add a bit more explanation, let's say the time range of the current left
ba
Thanks both for the reply. To add a bit more context, I am trying to
implement an "asof join". Here I have one left table and n right table, and
all batches arrive in time order.
In order to produce a output for a left batch, I would need to wait until I
received enough batches from the right tabl
If I understand correctly, on InputReceived you’ll be accumulating batches
until you have enough to compute the next output? In that case, you have two
options: you can either just immediately compute it using the same thread, or
call the schedule_callback directly (not using the scheduler). I t
Thanks Sasha, your intuition on the SerialExecutor is correct. One of
the changes I am working on[1] will make it so that an executor is
always present. The behavior when you do not have an executor is
rather strange (sometimes I/O threads are used and sometimes the
calling thread is used) and th
Thanks! That's super helpful.
A follow up question on TaskScheduler - What's the correct way to define a
task that "do work if input batches are ready, otherwise try later"?
Sth like
Status try_process():
if enough_inputs_to _produce_next_output:
compute_and_produce_next_output();
Hi Li,
I’ll answer the questions in order:
1. Your guess is correct! The Hash Join may be used standalone (mostly in
testing or benchmarking for now) or as part of the ExecNode. The ExecNode will
pass the task to the Executor to be scheduled, or will run it immediately if
it’s in sync mode (i.e
Hello!
I am reading the use of TaskScheduler inside C++ compute code (reading hash
join) and have some questions about it, in particular:
(1) What the purpose of SchedulerTaskCallback defined here:
https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute
15 matches
Mail list logo