@Li

Thank you for the proposal and this discussion is very interesting. And
also @Yaron, thanks for the note on the distributed execution and
hard-to-debug nature when threads and processes from two systems try to
perform a task seamlessly.

There are very important points that are raised in this discussion.
I would like to present a few facts and elaborate on each of them.

1. The objective of the UDFs and the scope
2. The parallel execution
3. Ways to integrate the sequential libraries with a distributed framework.


I think in the very initial discussions, I also thought about the facts
like forking from the UDF process and running parallel tasks, resource
management, etc. This is a very important thing as far as we think about
what is executed within the UDF scope. But there is another aspect to this.
Does that execution need to take place in the same processing unit itself
or can it be executed remotely? So addressing the first point, I am more
inclined towards the idea that the tasks we should execute using a UDF
should be limited tasks which may require more or less the default
resources that can be allocated to it. But what if we need more resources
to execute a user-defined task which
is required by the data pipeline to process data in a way that no-existing
kernels in the Arrow library can support it cleverly.

I think this is your point of an advanced UDF.

Looking into the limitations of UDFs, we need to determine how much freedom
should be given to the UDF?

Looking into the proposed ideas, some of the key components are;

- Allowing running separate programmes within the UDF
- These programs could be sequential or distributed in nature
- Handling resources and handling possible outcomes well.
- Gracefully shutdown/tear down resources when runtime errors occur, etc

Is there a way to manage the UDF life-cycle better? This is a very good
point. Let me elaborate a bit on my thought process.

I believe the reasoning for such requirements for UDF comes from the fact
that we need to run computationally intensive workloads which could consume
more resources and time. And basically, the default allocated resources
won't be enough and the user should be able to configure it as required.
For instance, let's assume we have a data pre-processing problem, where we
extract features and finally we have
to calculate a matrix which is generated by a computationally-intensive
process (assume a numerical computation).
The feature extraction involves the usual data engineering operators
supported by Apache Arrow, but the numerical
Computation may require an advanced Physics library written in
MPI/PGAS/UPC/ or even let's simply say Apache Spark/Apache Flink. Such a
program is not a function, it becomes a program itself which needs to
manage resources well. Or in a simple sense, it could be a simple program
written using a Python parallel execution framework (I doubt the
performance in such a case).

Consider the feature extraction time (T1) and Numerical computation time
(T2).

1. T1 >>> T2 (feature extraction dominant)
2. T1 <<< T2 (numeric computation dominant)
3. T1 ~= T2 (approximately equally dominant)

I guess, for categories 1 and 3, we can more or less use the existing UDFs.
There could be limitations, but it would be feasible if we can allocate
more resources to the execution engine itself. But it won't be a perfect
optimization of resource allocation. That is noted and it may not be
feasible unless the life-cycle of a UDF needs to be handled separately. We
can discuss this further.

For category 2, assuming the program is executed in multi-processes in one
machine or more machines, the following problems spring up.

1. Resource management
2. Communication (MPI, PGAS, etc)
3. Life-cycle management

If it comes to this kind of situation, I think it is better to run a
Remote-Procedure-Call (RPC) rather than running the whole program within
the UDF. The UDF can wait for the RPC to return the values.

The main issue with this approach is that we have to move the data. Yes, of
course, the data needs to be moved. But if we are to run complex programmes
within an existing process and ask it to spawn processes, it is going to be
very complex. In such a situation debugging, and performance optimization
is a very hard thing to do. I think if the goal is running a heavy program,
we should have started it with that intention or offloaded it to remote
execution.

We could bear the cost of moving data and execute the program remotely
(Maybe Arrow Flight could help this - Need to verify) and bring back the
results to the UDF. If things are Arrow-friendly we can minimize the
serialization costs too. This is one plausible scenario. I will explain why
this could be favourable rather than managing the UDF life-cycle ourselves.

The other aspect is to assume there is a distributed system written on top
of Apache Arrow which can run programmes in distributed memory or in a
cloud environment where it needs to auto-scale or manage the resources
considering the required resources. Apache Arrow can still provide the
usual UDFs it provides and let the distributed system itself handle the
communication, etc.

Note: Naively in Python, we can use mpi4py for parallel execution (write
shuffle to support Arrow data, use MPI internals by converting Arrow arrays
to Numpy is feasible), Arrow to run local operators and use MPI collectives
for synchronization. This is not an optimized solution in any way, but it
is possible to do very quickly rather than waiting to build an external
system which supports Arrow.

Now let's elaborate on the time-consuming application. Assume a program
like this, (I am going to write a BSP: Bulk Synchronous Parallel
Programme) Please note that this is not something we support or developed,
just an idea on how the UDFs can be executed with a distributed runtime.
Plus the code is just to showcase the possibility of using Arrow UDFs in a
distributed environment.


Run the programme:

```
mpirun -np 4 python3 arrow_dist_sample.py
```

Some Parallel code...

```
# not real just assume

from pyarrow.compute import HashJoin, Sort
from distributed_arrow.comm import ArrowDistributedContext
from distributed_arrow.data import DistributedDataLoader


ctx = ArrowDistributedContext(...)

rank = ctx.get_rank() # current process id
size = ctx.get_world_size() # number of processes

# load data

ds_loader = DistributedDataLoader(ctx, ...)
# returns data for the current process
ds_data = ds_loader.data(rank)

# shuffle the data based on the key
ctx.Shuffle(ds_loader, keys={join_keys})
# do local join
final_table = HashJoin(ctx, {join_keys}, ds_loader, ...)

# run some other programmes
....

# UDF

def my_udf(ctx, array):
# do some math computation on array
...
# synchronize
res_col = ctx.AllReduce(col1, ...)

return res_col


# applying udf on the 0-th column of the table
res = final_table.apply_udf(my_udf, ctx, [0])
....

```

In this approach, there is a distributed system designed to favour
digesting Arrow data that can serialize data, and run in parallel in
exascale.

In such a scenario, I think the resources can be managed externally and
scale as needed by examining the bottlenecks. Also, note that when threads
are involved with an MPI-like programme it would be very hard to tune
performance. If we want to parallelize within the UDF, even in the
distributed setting is going to be a challenging task. For such a scenario,
I feel like the best thing is to execute an RPC call and execute the logic
remotely in a
cloud or supercomputer and bring the results back. But this is assuming we
are dealing with just
a dataflow system.

But if we zoom out and think about the problem, what we have is a
particular data processing task
which has a few stages required to perform very specific tasks. In general,
we can deploy a set of
dataflow programmes by encapsulating this within a workflow system. The
workflow system can manage
to move the data from one workflow task to the other and automate the
process. That way we manage the
resource much better and only allocate resources as needed. But this is
open for discussion and just
my thought process on the usage of UDFs.

These two options could be limited and not satisfy the answer you are
looking for. But there lies a slight implementation choice when deciding
what UDFs should do and shouldn't do. And it would be very hard to limit
users when using UDFs. One thing we are working on is to improve the
existing UDFs and provide a best practice guideline to help the users.
Again, this is a very valuable discussion and appreciate your time in
looking into this. We can discuss this further.

On Sat, May 7, 2022 at 6:58 AM Weston Pace <weston.p...@gmail.com> wrote:

> @Yaron
>
> > 1.  How to allocate compute threads between Arrow and locally executing
> UDFs, avoiding unnecessarily competition over local compute resources?
>
> In general, there is no reason for UDFs to allocate threads.  The
> project & filter nodes are trivially parallelizable.  As long as there
> is sufficient data then almost any project/filter pipeline will either
> be I/O bound or accumulate enough batches to run at full parallelism.
>
> However, if for some reason you wanted to share, this could be done by
> using a custom executor.  Every arrow::compute::ExecPlan instance is
> created with an arrow::compute::ExecContext instance.  Part of
> arrow::compute::ExecContext is an arrow::internal::Executor*.  The
> Executor is an interface and users can supply their own custom
> implementation.  By default we use Arrow's CPU thread pool which is
> allocated with static storage duration.  However, a user that has
> their own thread pool that Arrow needs to share can supply their own
> implementation of this interface.
>
> > How does this look for CPU and GPU compute resources?
>
> That's a good question and one I don't have the expertise to answer
> though I am interested in learning more about it.  I think it is
> important to design the execution engine to allow for this.
>
> >  2.  How to schedule compute threads as a set/unit for a locally running
> UDF that requires them?
>
> I'm not sure I understand this question.
>
> >  3.  How to express the number (and CPU/GPU kind) of local compute
> threads that a UDF requires? Is this number static or dynamic?
>
> This seems like a UDF concern.
>
> >  4.  How to handle the case of a UDF that runs at a remote service and
> requires no local compute threads?
>
> This is venturing into distributed execution territory.  For example,
> at this point you might want to implement this at the node level with
> some kind of shuffle-style operator instead of trying to implement
> this at the function level.  Maybe something that could help is a
> TableUdfNode which has its own registry of function pointers that take
> in ExecBatch and output Future<ExecBatch>.  The node could accumulate
> data until it has some meaningful amount of rows to justify the call.
> Then it could make the call and, since the call returns a future, the
> compute thread would not be blocked.  When the function finally
> returns we could slice up the received batch into L2 sized batches for
> downstream computation.  We would need to implement some kind of
> backpressure queue here, similar to what we have in the sink node (and
> will eventually need in hash/join and aggregate as well).  While this
> node would not be a pipeline breaker in the "doesn't emit until it
> receives all data" sense it would be a pipeline breaker in the "the
> thread task on the left side will be different than the thread task on
> the right side" sense.
>
> >  5.  How to allocate local CPU resources between Arrow and UDFs running
> in local processes external to Arrow?
>
> I think the answer here is the same as #1 but I don't know how much
> you can really do here outside of maybe something like CPU affinity
> and shrinking the size of Arrow's CPU thread pool.
>
> @Li
>
> > I wonder if you also think these are the basic problems that need to be
> > solved in order to enable "define udf in ibis and execute with Arrow
> > compute"?
>
> Yes, I think you're right.  We're probably getting a little ahead of
> ourselves in terms of details.  Your MVP and steps listed sound like a
> good place to start.
>
> > I am open to other potential solutions to solve the three steps above,
> but
> > just want to get on the same page of what are the minimal issues we need
> to
> > solve in order to "define UDF with ibis and execute with Arrow" and avoid
> > getting into too detailed discussions.
>
> Sounds good.  I appreciate your proposal and input.  This has been a
> very fun discussion and it is very useful to get more input into how
> the system might be used as it helps with implementation.
>
> On Fri, May 6, 2022 at 9:02 AM Li Jin <ice.xell...@gmail.com> wrote:
> >
> > To add:
> > I am open to other potential solutions to solve the three steps above,
> but
> > just want to get on the same page of what are the minimal issues we need
> to
> > solve in order to "define UDF with ibis and execute with Arrow" and avoid
> > getting into too detailed discussions.
> >
> > On Fri, May 6, 2022 at 2:58 PM Li Jin <ice.xell...@gmail.com> wrote:
> >
> > > Weston - Thanks for the thoughtful reply. It was quite useful.
> > >
> > > RE:
> > > "
> > > At the moment, I'm not aware of much is needed in the execution engine
> > > beyond what we have.  If you (or anyone) has things that we can do in
> > > the execution engine then it would be good to identify them early.
> > > This is the most important thing to figure out first while the details
> > > of the actual UDF harnesses can be worked out as people have need of
> > > them.
> > > "
> > >
> > > From my perspective, the *minimal* functionality that we want to
> achieve
> > > is to define a Scalar UDF with the ibis API (Ibis already have an API
> to
> > > define Scalar UDFs), generate substrait query plan and execute it with
> > > Arrow compute. Or in other words, this is a "test program" that I want
> to
> > > run via Arrow compute:
> > >
> > > ```
> > > @ibis.vectorized_udf.elementwise(input_type=[dt.double],
> output_type=[dt.
> > > double])
> > > def add_one(s: pd.Series) -> pd.Series:
> > >      return s + 1
> > >  t = t.mutate(new_col=add_one(t[col]))
> > > ```
> > >
> > > Now I am not sure exactly what additional work needs to be done in
> order
> > > to enable this, but at the high level I think:
> > > (1) A way to serialize the user function (and function parameters) (in
> the
> > > doc I proposed to use cloudpickle based approach similar to how Spark
> does
> > > it)
> > > (2) Add a Substrait relation/expression that captures the the UDF
> > > (3) Deserialize the Substrait relation/expression in Arrow compute and
> > > execute the UDF (either using the approach in the current Scalar UDF
> > > prototype or do sth else)
> > >
> > > I think many of the concerns above (i.e. single thread vs multi
> threads)
> > > are valid but more of the next step IMO.
> > >
> > > I wonder if you also think these are the basic problems that need to be
> > > solved in order to enable "define udf in ibis and execute with Arrow
> > > compute"?
> > >
> > >
> > >
> > > On Fri, May 6, 2022 at 4:28 AM Yaron Gvili <rt...@hotmail.com> wrote:
> > >
> > >> The general design seems reasonable to me. However, I think the
> > >> multithreading issue warrants a (perhaps separate) discussion, in
> view of
> > >> the risk that Arrow's multithreading model would end up being hard to
> > >> interoperate with that of other libraries used to implement UDFs. Such
> > >> interoperability problem could lead to hard-to-debug programs and
> > >> performance degradation; for example, they are known between
> user-level
> > >> threads, MPI and OpenMP [1] [2]. Some questions to consider in this
> context:
> > >>
> > >>   1.  How to allocate compute threads between Arrow and locally
> executing
> > >> UDFs, avoiding unnecessarily competition over local compute
> resources? How
> > >> does this look for CPU and GPU compute resources?
> > >>   2.  How to schedule compute threads as a set/unit for a locally
> running
> > >> UDF that requires them?
> > >>   3.  How to express the number (and CPU/GPU kind) of local compute
> > >> threads that a UDF requires? Is this number static or dynamic?
> > >>   4.  How to handle the case of a UDF that runs at a remote service
> and
> > >> requires no local compute threads?
> > >>   5.  How to allocate local CPU resources between Arrow and UDFs
> running
> > >> in local processes external to Arrow?
> > >>
> > >> Since this is quite a complex topic, instead of trying to support
> > >> multiple UDF threading facilities within Arrow, I think it might be
> easier,
> > >> at least as a first approach, to expose in Arrow a threading-model
> control
> > >> API (I'm not aware of such an API, if one exists). An Arrow-with-UDFs
> user,
> > >> who best knows the compute needs of the UDFs in the execution plan,
> would
> > >> then be able to allocate compute resources to Arrow through this API
> and
> > >> separately to these UDFs.
> > >>
> > >>
> > >> [1] https://www.bolt-omp.org/
> > >> [2]
> https://www.openmp.org/wp-content/uploads/SC19-Iwasaki-Threads.pdf
> > >>
> > >>
> > >> Yaron.
> > >>
> > >> ________________________________
> > >> From: Weston Pace <weston.p...@gmail.com>
> > >> Sent: Thursday, May 5, 2022 4:30 PM
> > >> To: dev@arrow.apache.org <dev@arrow.apache.org>
> > >> Subject: Re: RFC: Out of Process Python UDFs in Arrow Compute
> > >>
> > >> Yes, I think you have the right understanding.  That is what I was
> > >> originally trying to say by pointing out that our current solution
> > >> does not solve the serialization problem.
> > >>
> > >> I think there is probably room for multiple approaches to this
> > >> problem.  Each will have their own tradeoffs.  For example, a recent
> > >> email thread[1] also proposed or mentioned providing UDFs:
> > >>  * Through LLVM
> > >>  * Through WASM
> > >>  * Through Gandiva
> > >>
> > >> I think there is room for all of these things.  From an execution
> > >> engine perspective I think the main concern would be making sure we
> > >> have the abstractions and extension points in place to enable
> > >> development of all of the above models.  Some of the concerns in your
> > >> document are maybe beyond the jurisdiction of the execution engine
> > >> (e.g. managing the lifecycle of UDF workers and processes).  However,
> > >> they could still be part of the Arrow-C++ library, I'm just thinking
> > >> it makes sense to decouple them.
> > >>
> > >> At the moment, I'm not aware of much is needed in the execution engine
> > >> beyond what we have.  If you (or anyone) has things that we can do in
> > >> the execution engine then it would be good to identify them early.
> > >> This is the most important thing to figure out first while the details
> > >> of the actual UDF harnesses can be worked out as people have need of
> > >> them.  The "function pointer" extension point is very generic.
> > >> However, just brainstorming, I can think of some possible extensions
> > >> and utilities that might enable UDF harness development, although I
> > >> think some of these are still Substrait concerns and not execution
> > >> plan concerns (though they may need work in the Substrait consumer):
> > >>
> > >>  * The ability to register a UDF scoped to an execution
> > >>     plan, instead of globally
> > >>  * The ability to serialize a UDF definition (and not just the
> > >>     declaration / invocation) in a Substrait plan
> > >>  * A serialization format for UDF options (Substrait uses literal
> > >>     expressions for this, which are serializable, but the execution
> > >>     plan has POCOs which are not)
> > >>  * The ability to register a common function pointer for a whole
> > >>     class of functions (could use Substrait function URI for this).
> > >>     The "call" object would have to be sent to the handler in addition
> > >>     to the arguments.
> > >>  * Add a stop token to the context passed to the UDF.  This
> > >>     can be used by a long running UDF to check and see if the user
> > >>     has cancelled the execution plan in which case the long running
> > >>     UDF should abort and clean up its work.
> > >>
> > >> If a UDF handler wants to delegate work to a different thread (or
> > >> multiple threads or even multiple processes) then I think that is
> > >> fine.  Just block the calling thread until all the work is done
> > >> (periodically checking to make sure a cancellation hasn't been
> > >> requested).  One of the execution engine's CPU threads will be blocked
> > >> for the duration of this function call.
> > >>
> > >> If the UDF is not the bottleneck for the entire plan then this
> > >> multithreading may be ineffective or actively harmful (e.g. if the UDF
> > >> is lightweight then maybe a majority of the time is spent in a
> > >> downstream hash-join or aggregation.  By creating multiple threads you
> > >> are just increasing the chance this downstream task will be forced to
> > >> context switch).  The execution engine will already call the UDF from
> > >> multiple threads.  By default you can expect the UDF to be called up
> > >> to N times simultaneously where N is the # of cores on the machines
> > >> (std::hardware_concurrency).  From python there is some GIL concern
> > >> here but most numpy or pandas operations that do any substantial
> > >> amount of work are going to be releasing the GIL anyways.  However,
> > >> there could certainly be cases where multithreading within the UDF
> > >> might be helpful or multiprocessing makes sense.  I don't know of any
> > >> reason the execution engine itself would need to be made aware that
> > >> this was happening though.
> > >>
> > >> One could argue that the execution engine should not block a CPU
> > >> thread if it is invoking a long-running UDF.  It could use that CPU
> > >> thread to do other work in the plan that needs doing.  However, I
> > >> think this is unlikely to be useful.  Whatever is running the UDF is
> > >> presumably using at least one core and so there is no reason for the
> > >> execution plan thread to try and do any more work anyways.
> > >>
> > >> [1] https://lists.apache.org/thread/qvy89hxtl1gp8n6l6hdb65cpjohpxpfx
> > >>
> > >>
>

Reply via email to