Thanks both for the reply. To add a bit more context, I am trying to implement an "asof join". Here I have one left table and n right table, and all batches arrive in time order.
In order to produce a output for a left batch, I would need to wait until I received enough batches from the right tables to cover all potential matches (wait until I have seen right timestamps outside the matching range) >From both replies it sounds like I should just do the check if I got enough data in InputReceived function and do work there when I have enough data. However, one thing that I am not sure about is how does the parallelization comes into play - it sounds like InputReceived could be called by multiple thread of the same input node for different batches? Currently I have just trying to get a baseline implementation that has one thread doing the join so if InputReceived is called by multiple thread I might ended up blocking other threads unnecessarily. If I have a dedicate thread/executor that does the join and InputReceived just queue the batches and return immediately, I felt like it would be more efficient. Thoughts? Thanks, Li On Mon, Apr 25, 2022 at 6:41 PM Sasha Krassovsky <krassovskysa...@gmail.com> wrote: > If I understand correctly, on InputReceived you’ll be accumulating batches > until you have enough to compute the next output? In that case, you have > two options: you can either just immediately compute it using the same > thread, or call the schedule_callback directly (not using the scheduler). I > think your pseudocode is correct - since whether or not you can output the > next batch can only change on InputReceived, that’s the only spot you need > to check. I think an elaboration of your pseudocode could be something like: > > Status InputReceived(Batch b) > lock(accum_lock); > accum.push_back(b); > if(enough_inputs) > vector<Batch> batches = std::move(accum); > unlock(accum_lock); > compute_next_output(batches); > return Status::OK(); > > Sasha > > > On Apr 25, 2022, at 3:29 PM, Li Jin <ice.xell...@gmail.com> wrote: > > > > Thanks! That's super helpful. > > > > A follow up question on TaskScheduler - What's the correct way to define > a > > task that "do work if input batches are ready, otherwise try later"? > > > > Sth like > > > > Status try_process(): > > if enough_inputs_to _produce_next_output: > > compute_and_produce_next_output(); > > return Status::OK() > > else: > > # Is this right? > > # Exit and try later > > return Status::OK(); > > > > If I register this function with TaskScheduler, I think it only gets run > > once, so I think I might need to schedule the next task when inputs are > not > > ready but I am not sure of the best way to do that. Any suggestions? > > > > Li > > > > On Mon, Apr 25, 2022 at 6:18 PM Sasha Krassovsky < > krassovskysa...@gmail.com <mailto:krassovskysa...@gmail.com>> > > wrote: > > > >> Hi Li, > >> I’ll answer the questions in order: > >> > >> 1. Your guess is correct! The Hash Join may be used standalone (mostly > in > >> testing or benchmarking for now) or as part of the ExecNode. The > ExecNode > >> will pass the task to the Executor to be scheduled, or will run it > >> immediately if it’s in sync mode (i.e. no executor). Our Hash Join > >> benchmark uses OpenMP to schedule things, and passes a lambda that does > >> OpenMP things to the HashJoin. > >> > >> 2. We might not have an executor if we want to execute synchronously. > This > >> is set during construction of the ExecContext, which is given to the > >> ExecPlan during creation. If the ExecContext has a nullptr Executor, > then > >> we are in async mode, otherwise we use the Executor to schedule. One > >> confusing thing is that we also have a SerialExecutor - I’m actually not > >> quite sure what the difference between using that and setting the > Executor > >> to nullptr is (might have something to do with testing?). @Weston > probably > >> knows > >> > >> 3. You can think of the TaskGroup as a “parallel for loop”. TaskImpl is > >> the function that implements the work that needs to be split up, > >> TaskGroupContinuationImpl is what gets run after the for loop. TaskImpl > >> will receive the index of the task. If you’re familiar with OpenMP, it’s > >> equivalent to this: > >> > >> #pragma omp parallel for > >> for(int i = 0; i < 100; i++) > >> TaskImpl(omp_get_thread_num(), i); > >> TaskGroupContinuationImpl(); > >> > >> Examples of the two are here: > >> > >> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > >> < > >> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > < > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L416 > > > >>> > >> > >> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > < > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > > >> < > >> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > < > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join.cc#L458 > > > >>> > >> > >> Sasha > >> > >>> On Apr 25, 2022, at 8:35 AM, Li Jin <ice.xell...@gmail.com> wrote: > >>> > >>> Hello! > >>> > >>> I am reading the use of TaskScheduler inside C++ compute code (reading > >> hash > >>> join) and have some questions about it, in particular: > >>> > >>> (1) What the purpose of SchedulerTaskCallback defined here: > >>> > >> > https://github.com/apache/arrow/blob/5a5d92928ccd438edf7ced8eae449fad05a7e71f/cpp/src/arrow/compute/exec/hash_join_node.cc#L428 > >>> (My guess is that the caller of TaskScheduler::StartTaskGroup needs to > >>> provide an implementation of a task executor, and the implementation of > >>> SchedulerTaskCallback inside hash_join_node.cc is just a vanillar > >>> implementation) > >>> > >>> (2) When would this task context not have an executor? > >>> > >> > https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/hash_join_node.cc#L581 > >>> > >>> (3) What's the difference between TaskImpl and > TaskGroupContinuationImpl > >> in > >>> TaskScheduler::RegisterTaskGroup? And how would one normally define > >>> TaskGroupContinuationImpl? > >>> > >>> Sorry I am still learning the Arrow compute internals and appreciate > help > >>> on understanding these. > >>> > >>> Li > >