Thanks both for the reply. To add a bit more context, I am trying to
implement an "asof join". Here I have one left table and n right table, and
all batches arrive in time order.

In order to produce a output for a left batch, I would need to wait until I
received enough batches from the right tables to cover all potential
matches (wait until I have seen right timestamps outside the matching range)

>From both replies it sounds like I should just do the check if I got enough
data in InputReceived function and do work there when I have enough data.

However, one thing that I am not sure about is how does the parallelization
comes into play - it sounds like InputReceived could be called by multiple
thread of the same input node for different batches? Currently I have just
trying to get a baseline implementation that has one thread doing the join
so if InputReceived is called by multiple thread I might ended up blocking
other threads unnecessarily.

If I have a dedicate thread/executor that does the join and InputReceived
just queue the batches and return immediately, I felt like it would be more
efficient.

Thoughts?

Thanks,
Li

On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <krassovskysa...@gmail.com>
wrote:

> If I understand correctly, on InputReceived you’ll be accumulating batches
> until you have enough to compute the next output? In that case, you have
> two options: you can either just immediately compute it using the same
> thread, or call the schedule_callback directly (not using the scheduler). I
> think your pseudocode is correct - since whether or not you can output the
> next batch can only change on InputReceived, that’s the only spot you need
> to check. I think an elaboration of your pseudocode could be something like:
>
> Status InputReceived(Batch b)
>     lock(accum_lock);
>     accum.push_back(b);
>     if(enough_inputs)
>         vector<Batch> batches = std::move(accum);
>         unlock(accum_lock);
>         compute_next_output(batches);
>     return Status::OK();
>
> Sasha
>
> > On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote:
> >
> > Thanks! That's super helpful.
> >
> > A follow up question on TaskScheduler - What's the correct way to define
> a
> > task that "do work if input batches are ready, otherwise try later"?
> >
> > Sth like
> >
> > Status try_process():
> > if enough_inputs_to _produce_next_output:
> > compute_and_produce_next_output();
> > return Status::OK()
> > else:
> > # Is this right?
> > # Exit and try later
> > return Status::OK();
> >
> > If I register this function with TaskScheduler, I think it only gets run
> > once, so I think I might need to schedule the next task when inputs are
> not
> > ready but I am not sure of the best way to do that. Any suggestions?
> >
> > Li
> >
> > On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky <
> krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>>
> > wrote:
> >
> >> Hi Li,
> >> I’ll answer the questions in order:
> >>
> >> 1. Your guess is correct! The Hash Join may be used standalone (mostly
> in
> >> testing or benchmarking for now) or as part of the ExecNode. The
> ExecNode
> >> will pass the task to the Executor to be scheduled, or will run it
> >> immediately if it’s in sync mode (i.e. no executor). Our Hash Join
> >> benchmark uses OpenMP to schedule things, and passes a lambda that does
> >> OpenMP things to the HashJoin.
> >>
> >> 2. We might not have an executor if we want to execute synchronously.
> This
> >> is set during construction of the ExecContext, which is given to the
> >> ExecPlan during creation. If the ExecContext has a nullptr Executor,
> then
> >> we are in async mode, otherwise we use the Executor to schedule. One
> >> confusing thing is that we also have a SerialExecutor - I’m actually not
> >> quite sure what the difference between using that and setting the
> Executor
> >> to nullptr is (might have something to do with testing?). @Weston
> probably
> >> knows
> >>
> >> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is
> >> the function that implements the work that needs to be split up,
> >> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl
> >> will receive the index of the task. If you’re familiar with OpenMP, it’s
> >> equivalent to this:
> >>
> >> #pragma omp parallel for
> >> for(int i = 0; i < 100; i++)
> >> TaskImpl(omp_get_thread_num(), i);
> >> TaskGroupContinuationImpl();
> >>
> >> Examples of the two are here:
> >>
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >> <
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416
> >
> >>>
> >>
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >
> >> <
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> <
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458
> >
> >>>
> >>
> >> Sasha
> >>
> >>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote:
> >>>
> >>> Hello!
> >>>
> >>> I am reading the use of TaskScheduler inside C++ compute code (reading
> >> hash
> >>> join) and have some questions about it, in particular:
> >>>
> >>> (1) What the purpose of SchedulerTaskCallback defined here:
> >>>
> >>
> https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428
> >>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to
> >>> provide an implementation of a task executor, and the implementation of
> >>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar
> >>> implementation)
> >>>
> >>> (2) When would this task context not have an executor?
> >>>
> >>
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581
> >>>
> >>> (3) What's the difference between TaskImpl and
> TaskGroupContinuationImpl
> >> in
> >>> TaskScheduler::RegisterTaskGroup? And how would one normally define
> >>> TaskGroupContinuationImpl?
> >>>
> >>> Sorry I am still learning the Arrow compute internals and appreciate
> help
> >>> on understanding these.
> >>>
> >>> Li
>
>

Reply via email to