@Li appreciate your thoughts on these important pieces. Let me walk through
one by one.

>  Numeric code written in numpy/pandas + some relational logic (e.g.,
> np.where to select rows). People like this type of UDFs because they are
> very familiar with pandas/numpy and can be immediately productive when
> these are exposed in a system.

Are these computations computationally intensive? To quantify it, in general
do these workloads occupy majority of the time compared to the overall
dataflow
problem's execution time?

And in terms of computation, if it can be done serially, the current UDFs
could
support this kind of a setting. The only limitation/requirement for now is
Arrow arrays comes
and Arrow arrays go out. But we will be working on adding
more features for UDFs as we evolve with this feature.

> (B) Business logic related code written in numpy/pandas and/or invoke RPC.

I think this is one of the most common and vital pieces for UDFs. Maybe
here most
of the calls are blocking calls? Could you explain the general nature of
the usage?


> The inputs to these UDFs are things like "a str id indicating the data you
> are reading" and "two python timestamps indicating the begin/end of the
data
> you read". And these UDFs output streams of Arrow Batches.

This is interesting. To make the current system more usable I think we can
help the users to keep
metadata in the `ctx` object. We could make it editable and carry the
metadata throughout the data pipeline.
This is not something that has been tested or designed yet. And passing
metadata using an Array
doesn't sound like a sustainable solution. But for now, that is of course
possible until the system can
support this kind of use cases.

> Mapping these back to the three categories you have, I believe these are
> mostly (1) and (3). Although, it is not uncommon to see type (A) use cases
> ends up being in category (2) because the numeric logic is complicated
> (still single thread numpy/pandas and not to the degree of
multi-progress).

I think the current format of UDFs can support this kind of programme. It
would be great
to try this on a real-world application and evaluate the usage limitations,
performance bottlenecks, etc.

> First option is close to what we want, and I think the gap is not
"performance" or "parallel
> execution", but the "interface", i.e., how do user define UDF in ibis
> (which, in my understanding, is what we decided as the primary interface
> for using Arrow compute in Python), serialize the UDF via substrait, and
> then execute it in Arrow compute (I mentioned above as the "3 steps'',
> repharsing here:)

Yes this is a good point. We are working on Substrait-Python [1]
integration and later on
we could support the usage of UDFs with Substrait too. And this would allow
us to develop applications
in Python smoothly across different systems.

And regarding the suggested methods, I think you are correct about the
usage of Substrait to enable
the seamless integration to query custom logics.

Appreciate your thoughts on this. I think we will eventually converge
towards that.

[1] https://github.com/apache/arrow/pull/12672


On Mon, May 9, 2022 at 8:59 PM Li Jin <ice.xell...@gmail.com> wrote:

> @Vibhatha
>
> Wow appreciate the deep thoughts. One thing I'd like to clarify is that we
> are not trying to run a complicated program inside any kind of UDF. In
> fact, in our research environment we do not allow people to use
> MPI/Spark/Flint/Dask inside a UDF. Personally I think it is a bad idea and
> should never be a solution.
>
> Now to clarify a bit of the types of UDFs that we see in our current
> research system:
> (A) Numeric code written in numpy/pandas + some relational logic (e.g.,
> np.where to select rows). People like this type of UDFs because they are
> very familiar with pandas/numpy and can be immediately productive when
> these are exposed in a system.
> (B) Business logic related code written in numpy/pandas and/or invoke RPC.
> By business logic, I meant logic that has not much to do with "Numeric" but
> related to the specific domain logic. In our case, these business logics
> (in finance) are like "compute the number of trading days between date A
> and date B in London stock exchange", or "map the ticker AAPL to the
> internal identifier for Apple Inc". Historically these logic are written in
> Python. (In the RPC case, there is a Python API that wraps the RPC, the
> data is sent to the remote service and response back).
> (C) Data Source UDFs. This is a bridge solution to reading data into a
> non-Python execution engine (we use this for Spark). Most of our data
> sources are provided via a Python API (we do have non-standard storage and
> data sources in addition to Parquet, hdfs, s3, etc). In order to read these
> data source into a Java/C++ execution engine, we wrap these data sources
> into UDFs that doesn't take columnar inputs. The inputs to these UDFs are
> things like "a str id indicating the data you are reading" and "two python
> timestamps indicating the begin/end of the data you read". And these UDFs
> output streams of Arrow Batches. Under the hood these are RPCs and similar
> to the RPC use case in B but are just more I/O intensive. Again, these are
> bridge solutions because we don't have native Java/C++ API/readers for many
> of our storage systems.
>
> Mapping these back to the three categories you have, I believe these are
> mostly (1) and (3). Although, it is not uncommon to see type (A) use cases
> ends up being in category (2) because the numeric logic is complicated
> (still single thread numpy/pandas and not to the degree of multi-progress).
>
> Now going back to the two options you recommended (I assumed that first
> option is what is currently UDF that is implemented and second option being
> workflow engine). Second option is not relevant to us because we don't try
> to do this kind of distribution computation in UDF. First option is close
> to what we want, and I think the gap is not "performance" or "parallel
> execution", but the "interface", i.e., how do user define UDF in ibis
> (which, in my understanding, is what we decided as the primary interface
> for using Arrow compute in Python), serialize the UDF via substrait, and
> then execute it in Arrow compute (I mentioned above as the "3 steps'',
> repharsing here:)
>
> (1) A way to serialize the user function (and function parameters) (in the
> doc I proposed to use cloudpickle based approach similar to how Spark does
> it)
> (2) Add a Substrait relation/expression that captures the the UDF
> (3) Deserialize the Substrait relation/expression in Arrow compute and
> execute the UDF (either using the approach in the current Scalar UDF
> prototype or do sth else)
>
> Once we solved these, I think we are pretty close to be able to use UDFs
> with Arrow (at least for the majority use cases that we have).
>
>
> On Sat, May 7, 2022 at 12:27 AM Vibhatha Abeykoon <vibha...@gmail.com>
> wrote:
>
> > @Li
> >
> > Thank you for the proposal and this discussion is very interesting. And
> > also @Yaron, thanks for the note on the distributed execution and
> > hard-to-debug nature when threads and processes from two systems try to
> > perform a task seamlessly.
> >
> > There are very important points that are raised in this discussion.
> > I would like to present a few facts and elaborate on each of them.
> >
> > 1. The objective of the UDFs and the scope
> > 2. The parallel execution
> > 3. Ways to integrate the sequential libraries with a distributed
> framework.
> >
> >
> > I think in the very initial discussions, I also thought about the facts
> > like forking from the UDF process and running parallel tasks, resource
> > management, etc. This is a very important thing as far as we think about
> > what is executed within the UDF scope. But there is another aspect to
> this.
> > Does that execution need to take place in the same processing unit itself
> > or can it be executed remotely? So addressing the first point, I am more
> > inclined towards the idea that the tasks we should execute using a UDF
> > should be limited tasks which may require more or less the default
> > resources that can be allocated to it. But what if we need more resources
> > to execute a user-defined task which
> > is required by the data pipeline to process data in a way that
> no-existing
> > kernels in the Arrow library can support it cleverly.
> >
> > I think this is your point of an advanced UDF.
> >
> > Looking into the limitations of UDFs, we need to determine how much
> freedom
> > should be given to the UDF?
> >
> > Looking into the proposed ideas, some of the key components are;
> >
> > - Allowing running separate programmes within the UDF
> > - These programs could be sequential or distributed in nature
> > - Handling resources and handling possible outcomes well.
> > - Gracefully shutdown/tear down resources when runtime errors occur, etc
> >
> > Is there a way to manage the UDF life-cycle better? This is a very good
> > point. Let me elaborate a bit on my thought process.
> >
> > I believe the reasoning for such requirements for UDF comes from the fact
> > that we need to run computationally intensive workloads which could
> consume
> > more resources and time. And basically, the default allocated resources
> > won't be enough and the user should be able to configure it as required.
> > For instance, let's assume we have a data pre-processing problem, where
> we
> > extract features and finally we have
> > to calculate a matrix which is generated by a computationally-intensive
> > process (assume a numerical computation).
> > The feature extraction involves the usual data engineering operators
> > supported by Apache Arrow, but the numerical
> > Computation may require an advanced Physics library written in
> > MPI/PGAS/UPC/ or even let's simply say Apache Spark/Apache Flink. Such a
> > program is not a function, it becomes a program itself which needs to
> > manage resources well. Or in a simple sense, it could be a simple program
> > written using a Python parallel execution framework (I doubt the
> > performance in such a case).
> >
> > Consider the feature extraction time (T1) and Numerical computation time
> > (T2).
> >
> > 1. T1 >>> T2 (feature extraction dominant)
> > 2. T1 <<< T2 (numeric computation dominant)
> > 3. T1 ~= T2 (approximately equally dominant)
> >
> > I guess, for categories 1 and 3, we can more or less use the existing
> UDFs.
> > There could be limitations, but it would be feasible if we can allocate
> > more resources to the execution engine itself. But it won't be a perfect
> > optimization of resource allocation. That is noted and it may not be
> > feasible unless the life-cycle of a UDF needs to be handled separately.
> We
> > can discuss this further.
> >
> > For category 2, assuming the program is executed in multi-processes in
> one
> > machine or more machines, the following problems spring up.
> >
> > 1. Resource management
> > 2. Communication (MPI, PGAS, etc)
> > 3. Life-cycle management
> >
> > If it comes to this kind of situation, I think it is better to run a
> > Remote-Procedure-Call (RPC) rather than running the whole program within
> > the UDF. The UDF can wait for the RPC to return the values.
> >
> > The main issue with this approach is that we have to move the data. Yes,
> of
> > course, the data needs to be moved. But if we are to run complex
> programmes
> > within an existing process and ask it to spawn processes, it is going to
> be
> > very complex. In such a situation debugging, and performance optimization
> > is a very hard thing to do. I think if the goal is running a heavy
> program,
> > we should have started it with that intention or offloaded it to remote
> > execution.
> >
> > We could bear the cost of moving data and execute the program remotely
> > (Maybe Arrow Flight could help this - Need to verify) and bring back the
> > results to the UDF. If things are Arrow-friendly we can minimize the
> > serialization costs too. This is one plausible scenario. I will explain
> why
> > this could be favourable rather than managing the UDF life-cycle
> ourselves.
> >
> > The other aspect is to assume there is a distributed system written on
> top
> > of Apache Arrow which can run programmes in distributed memory or in a
> > cloud environment where it needs to auto-scale or manage the resources
> > considering the required resources. Apache Arrow can still provide the
> > usual UDFs it provides and let the distributed system itself handle the
> > communication, etc.
> >
> > Note: Naively in Python, we can use mpi4py for parallel execution (write
> > shuffle to support Arrow data, use MPI internals by converting Arrow
> arrays
> > to Numpy is feasible), Arrow to run local operators and use MPI
> collectives
> > for synchronization. This is not an optimized solution in any way, but it
> > is possible to do very quickly rather than waiting to build an external
> > system which supports Arrow.
> >
> > Now let's elaborate on the time-consuming application. Assume a program
> > like this, (I am going to write a BSP: Bulk Synchronous Parallel
> > Programme) Please note that this is not something we support or
> developed,
> > just an idea on how the UDFs can be executed with a distributed runtime.
> > Plus the code is just to showcase the possibility of using Arrow UDFs in
> a
> > distributed environment.
> >
> >
> > Run the programme:
> >
> > ```
> > mpirun -np 4 python3 arrow_dist_sample.py
> > ```
> >
> > Some Parallel code...
> >
> > ```
> > # not real just assume
> >
> > from pyarrow.compute import HashJoin, Sort
> > from distributed_arrow.comm import ArrowDistributedContext
> > from distributed_arrow.data import DistributedDataLoader
> >
> >
> > ctx = ArrowDistributedContext(...)
> >
> > rank = ctx.get_rank() # current process id
> > size = ctx.get_world_size() # number of processes
> >
> > # load data
> >
> > ds_loader = DistributedDataLoader(ctx, ...)
> > # returns data for the current process
> > ds_data = ds_loader.data(rank)
> >
> > # shuffle the data based on the key
> > ctx.Shuffle(ds_loader, keys={join_keys})
> > # do local join
> > final_table = HashJoin(ctx, {join_keys}, ds_loader, ...)
> >
> > # run some other programmes
> > ....
> >
> > # UDF
> >
> > def my_udf(ctx, array):
> > # do some math computation on array
> > ...
> > # synchronize
> > res_col = ctx.AllReduce(col1, ...)
> >
> > return res_col
> >
> >
> > # applying udf on the 0-th column of the table
> > res = final_table.apply_udf(my_udf, ctx, [0])
> > ....
> >
> > ```
> >
> > In this approach, there is a distributed system designed to favour
> > digesting Arrow data that can serialize data, and run in parallel in
> > exascale.
> >
> > In such a scenario, I think the resources can be managed externally and
> > scale as needed by examining the bottlenecks. Also, note that when
> threads
> > are involved with an MPI-like programme it would be very hard to tune
> > performance. If we want to parallelize within the UDF, even in the
> > distributed setting is going to be a challenging task. For such a
> scenario,
> > I feel like the best thing is to execute an RPC call and execute the
> logic
> > remotely in a
> > cloud or supercomputer and bring the results back. But this is assuming
> we
> > are dealing with just
> > a dataflow system.
> >
> > But if we zoom out and think about the problem, what we have is a
> > particular data processing task
> > which has a few stages required to perform very specific tasks. In
> general,
> > we can deploy a set of
> > dataflow programmes by encapsulating this within a workflow system. The
> > workflow system can manage
> > to move the data from one workflow task to the other and automate the
> > process. That way we manage the
> > resource much better and only allocate resources as needed. But this is
> > open for discussion and just
> > my thought process on the usage of UDFs.
> >
> > These two options could be limited and not satisfy the answer you are
> > looking for. But there lies a slight implementation choice when deciding
> > what UDFs should do and shouldn't do. And it would be very hard to limit
> > users when using UDFs. One thing we are working on is to improve the
> > existing UDFs and provide a best practice guideline to help the users.
> > Again, this is a very valuable discussion and appreciate your time in
> > looking into this. We can discuss this further.
> >
> > On Sat, May 7, 2022 at 6:58 AM Weston Pace <weston.p...@gmail.com>
> wrote:
> >
> > > @Yaron
> > >
> > > > 1.  How to allocate compute threads between Arrow and locally
> executing
> > > UDFs, avoiding unnecessarily competition over local compute resources?
> > >
> > > In general, there is no reason for UDFs to allocate threads.  The
> > > project & filter nodes are trivially parallelizable.  As long as there
> > > is sufficient data then almost any project/filter pipeline will either
> > > be I/O bound or accumulate enough batches to run at full parallelism.
> > >
> > > However, if for some reason you wanted to share, this could be done by
> > > using a custom executor.  Every arrow::compute::ExecPlan instance is
> > > created with an arrow::compute::ExecContext instance.  Part of
> > > arrow::compute::ExecContext is an arrow::internal::Executor*.  The
> > > Executor is an interface and users can supply their own custom
> > > implementation.  By default we use Arrow's CPU thread pool which is
> > > allocated with static storage duration.  However, a user that has
> > > their own thread pool that Arrow needs to share can supply their own
> > > implementation of this interface.
> > >
> > > > How does this look for CPU and GPU compute resources?
> > >
> > > That's a good question and one I don't have the expertise to answer
> > > though I am interested in learning more about it.  I think it is
> > > important to design the execution engine to allow for this.
> > >
> > > >  2.  How to schedule compute threads as a set/unit for a locally
> > running
> > > UDF that requires them?
> > >
> > > I'm not sure I understand this question.
> > >
> > > >  3.  How to express the number (and CPU/GPU kind) of local compute
> > > threads that a UDF requires? Is this number static or dynamic?
> > >
> > > This seems like a UDF concern.
> > >
> > > >  4.  How to handle the case of a UDF that runs at a remote service
> and
> > > requires no local compute threads?
> > >
> > > This is venturing into distributed execution territory.  For example,
> > > at this point you might want to implement this at the node level with
> > > some kind of shuffle-style operator instead of trying to implement
> > > this at the function level.  Maybe something that could help is a
> > > TableUdfNode which has its own registry of function pointers that take
> > > in ExecBatch and output Future<ExecBatch>.  The node could accumulate
> > > data until it has some meaningful amount of rows to justify the call.
> > > Then it could make the call and, since the call returns a future, the
> > > compute thread would not be blocked.  When the function finally
> > > returns we could slice up the received batch into L2 sized batches for
> > > downstream computation.  We would need to implement some kind of
> > > backpressure queue here, similar to what we have in the sink node (and
> > > will eventually need in hash/join and aggregate as well).  While this
> > > node would not be a pipeline breaker in the "doesn't emit until it
> > > receives all data" sense it would be a pipeline breaker in the "the
> > > thread task on the left side will be different than the thread task on
> > > the right side" sense.
> > >
> > > >  5.  How to allocate local CPU resources between Arrow and UDFs
> running
> > > in local processes external to Arrow?
> > >
> > > I think the answer here is the same as #1 but I don't know how much
> > > you can really do here outside of maybe something like CPU affinity
> > > and shrinking the size of Arrow's CPU thread pool.
> > >
> > > @Li
> > >
> > > > I wonder if you also think these are the basic problems that need to
> be
> > > > solved in order to enable "define udf in ibis and execute with Arrow
> > > > compute"?
> > >
> > > Yes, I think you're right.  We're probably getting a little ahead of
> > > ourselves in terms of details.  Your MVP and steps listed sound like a
> > > good place to start.
> > >
> > > > I am open to other potential solutions to solve the three steps
> above,
> > > but
> > > > just want to get on the same page of what are the minimal issues we
> > need
> > > to
> > > > solve in order to "define UDF with ibis and execute with Arrow" and
> > avoid
> > > > getting into too detailed discussions.
> > >
> > > Sounds good.  I appreciate your proposal and input.  This has been a
> > > very fun discussion and it is very useful to get more input into how
> > > the system might be used as it helps with implementation.
> > >
> > > On Fri, May 6, 2022 at 9:02 AM Li Jin <ice.xell...@gmail.com> wrote:
> > > >
> > > > To add:
> > > > I am open to other potential solutions to solve the three steps
> above,
> > > but
> > > > just want to get on the same page of what are the minimal issues we
> > need
> > > to
> > > > solve in order to "define UDF with ibis and execute with Arrow" and
> > avoid
> > > > getting into too detailed discussions.
> > > >
> > > > On Fri, May 6, 2022 at 2:58 PM Li Jin <ice.xell...@gmail.com> wrote:
> > > >
> > > > > Weston - Thanks for the thoughtful reply. It was quite useful.
> > > > >
> > > > > RE:
> > > > > "
> > > > > At the moment, I'm not aware of much is needed in the execution
> > engine
> > > > > beyond what we have.  If you (or anyone) has things that we can do
> in
> > > > > the execution engine then it would be good to identify them early.
> > > > > This is the most important thing to figure out first while the
> > details
> > > > > of the actual UDF harnesses can be worked out as people have need
> of
> > > > > them.
> > > > > "
> > > > >
> > > > > From my perspective, the *minimal* functionality that we want to
> > > achieve
> > > > > is to define a Scalar UDF with the ibis API (Ibis already have an
> API
> > > to
> > > > > define Scalar UDFs), generate substrait query plan and execute it
> > with
> > > > > Arrow compute. Or in other words, this is a "test program" that I
> > want
> > > to
> > > > > run via Arrow compute:
> > > > >
> > > > > ```
> > > > > @ibis.vectorized_udf.elementwise(input_type=[dt.double],
> > > output_type=[dt.
> > > > > double])
> > > > > def add_one(s: pd.Series) -> pd.Series:
> > > > >      return s + 1
> > > > >  t = t.mutate(new_col=add_one(t[col]))
> > > > > ```
> > > > >
> > > > > Now I am not sure exactly what additional work needs to be done in
> > > order
> > > > > to enable this, but at the high level I think:
> > > > > (1) A way to serialize the user function (and function parameters)
> > (in
> > > the
> > > > > doc I proposed to use cloudpickle based approach similar to how
> Spark
> > > does
> > > > > it)
> > > > > (2) Add a Substrait relation/expression that captures the the UDF
> > > > > (3) Deserialize the Substrait relation/expression in Arrow compute
> > and
> > > > > execute the UDF (either using the approach in the current Scalar
> UDF
> > > > > prototype or do sth else)
> > > > >
> > > > > I think many of the concerns above (i.e. single thread vs multi
> > > threads)
> > > > > are valid but more of the next step IMO.
> > > > >
> > > > > I wonder if you also think these are the basic problems that need
> to
> > be
> > > > > solved in order to enable "define udf in ibis and execute with
> Arrow
> > > > > compute"?
> > > > >
> > > > >
> > > > >
> > > > > On Fri, May 6, 2022 at 4:28 AM Yaron Gvili <rt...@hotmail.com>
> > wrote:
> > > > >
> > > > >> The general design seems reasonable to me. However, I think the
> > > > >> multithreading issue warrants a (perhaps separate) discussion, in
> > > view of
> > > > >> the risk that Arrow's multithreading model would end up being hard
> > to
> > > > >> interoperate with that of other libraries used to implement UDFs.
> > Such
> > > > >> interoperability problem could lead to hard-to-debug programs and
> > > > >> performance degradation; for example, they are known between
> > > user-level
> > > > >> threads, MPI and OpenMP [1] [2]. Some questions to consider in
> this
> > > context:
> > > > >>
> > > > >>   1.  How to allocate compute threads between Arrow and locally
> > > executing
> > > > >> UDFs, avoiding unnecessarily competition over local compute
> > > resources? How
> > > > >> does this look for CPU and GPU compute resources?
> > > > >>   2.  How to schedule compute threads as a set/unit for a locally
> > > running
> > > > >> UDF that requires them?
> > > > >>   3.  How to express the number (and CPU/GPU kind) of local
> compute
> > > > >> threads that a UDF requires? Is this number static or dynamic?
> > > > >>   4.  How to handle the case of a UDF that runs at a remote
> service
> > > and
> > > > >> requires no local compute threads?
> > > > >>   5.  How to allocate local CPU resources between Arrow and UDFs
> > > running
> > > > >> in local processes external to Arrow?
> > > > >>
> > > > >> Since this is quite a complex topic, instead of trying to support
> > > > >> multiple UDF threading facilities within Arrow, I think it might
> be
> > > easier,
> > > > >> at least as a first approach, to expose in Arrow a threading-model
> > > control
> > > > >> API (I'm not aware of such an API, if one exists). An
> > Arrow-with-UDFs
> > > user,
> > > > >> who best knows the compute needs of the UDFs in the execution
> plan,
> > > would
> > > > >> then be able to allocate compute resources to Arrow through this
> API
> > > and
> > > > >> separately to these UDFs.
> > > > >>
> > > > >>
> > > > >> [1] https://www.bolt-omp.org/
> > > > >> [2]
> > > https://www.openmp.org/wp-content/uploads/SC19-Iwasaki-Threads.pdf
> > > > >>
> > > > >>
> > > > >> Yaron.
> > > > >>
> > > > >> ________________________________
> > > > >> From: Weston Pace <weston.p...@gmail.com>
> > > > >> Sent: Thursday, May 5, 2022 4:30 PM
> > > > >> To: dev@arrow.apache.org <dev@arrow.apache.org>
> > > > >> Subject: Re: RFC: Out of Process Python UDFs in Arrow Compute
> > > > >>
> > > > >> Yes, I think you have the right understanding.  That is what I was
> > > > >> originally trying to say by pointing out that our current solution
> > > > >> does not solve the serialization problem.
> > > > >>
> > > > >> I think there is probably room for multiple approaches to this
> > > > >> problem.  Each will have their own tradeoffs.  For example, a
> recent
> > > > >> email thread[1] also proposed or mentioned providing UDFs:
> > > > >>  * Through LLVM
> > > > >>  * Through WASM
> > > > >>  * Through Gandiva
> > > > >>
> > > > >> I think there is room for all of these things.  From an execution
> > > > >> engine perspective I think the main concern would be making sure
> we
> > > > >> have the abstractions and extension points in place to enable
> > > > >> development of all of the above models.  Some of the concerns in
> > your
> > > > >> document are maybe beyond the jurisdiction of the execution engine
> > > > >> (e.g. managing the lifecycle of UDF workers and processes).
> > However,
> > > > >> they could still be part of the Arrow-C++ library, I'm just
> thinking
> > > > >> it makes sense to decouple them.
> > > > >>
> > > > >> At the moment, I'm not aware of much is needed in the execution
> > engine
> > > > >> beyond what we have.  If you (or anyone) has things that we can do
> > in
> > > > >> the execution engine then it would be good to identify them early.
> > > > >> This is the most important thing to figure out first while the
> > details
> > > > >> of the actual UDF harnesses can be worked out as people have need
> of
> > > > >> them.  The "function pointer" extension point is very generic.
> > > > >> However, just brainstorming, I can think of some possible
> extensions
> > > > >> and utilities that might enable UDF harness development, although
> I
> > > > >> think some of these are still Substrait concerns and not execution
> > > > >> plan concerns (though they may need work in the Substrait
> consumer):
> > > > >>
> > > > >>  * The ability to register a UDF scoped to an execution
> > > > >>     plan, instead of globally
> > > > >>  * The ability to serialize a UDF definition (and not just the
> > > > >>     declaration / invocation) in a Substrait plan
> > > > >>  * A serialization format for UDF options (Substrait uses literal
> > > > >>     expressions for this, which are serializable, but the
> execution
> > > > >>     plan has POCOs which are not)
> > > > >>  * The ability to register a common function pointer for a whole
> > > > >>     class of functions (could use Substrait function URI for
> this).
> > > > >>     The "call" object would have to be sent to the handler in
> > addition
> > > > >>     to the arguments.
> > > > >>  * Add a stop token to the context passed to the UDF.  This
> > > > >>     can be used by a long running UDF to check and see if the user
> > > > >>     has cancelled the execution plan in which case the long
> running
> > > > >>     UDF should abort and clean up its work.
> > > > >>
> > > > >> If a UDF handler wants to delegate work to a different thread (or
> > > > >> multiple threads or even multiple processes) then I think that is
> > > > >> fine.  Just block the calling thread until all the work is done
> > > > >> (periodically checking to make sure a cancellation hasn't been
> > > > >> requested).  One of the execution engine's CPU threads will be
> > blocked
> > > > >> for the duration of this function call.
> > > > >>
> > > > >> If the UDF is not the bottleneck for the entire plan then this
> > > > >> multithreading may be ineffective or actively harmful (e.g. if the
> > UDF
> > > > >> is lightweight then maybe a majority of the time is spent in a
> > > > >> downstream hash-join or aggregation.  By creating multiple threads
> > you
> > > > >> are just increasing the chance this downstream task will be forced
> > to
> > > > >> context switch).  The execution engine will already call the UDF
> > from
> > > > >> multiple threads.  By default you can expect the UDF to be called
> up
> > > > >> to N times simultaneously where N is the # of cores on the
> machines
> > > > >> (std::hardware_concurrency).  From python there is some GIL
> concern
> > > > >> here but most numpy or pandas operations that do any substantial
> > > > >> amount of work are going to be releasing the GIL anyways.
> However,
> > > > >> there could certainly be cases where multithreading within the UDF
> > > > >> might be helpful or multiprocessing makes sense.  I don't know of
> > any
> > > > >> reason the execution engine itself would need to be made aware
> that
> > > > >> this was happening though.
> > > > >>
> > > > >> One could argue that the execution engine should not block a CPU
> > > > >> thread if it is invoking a long-running UDF.  It could use that
> CPU
> > > > >> thread to do other work in the plan that needs doing.  However, I
> > > > >> think this is unlikely to be useful.  Whatever is running the UDF
> is
> > > > >> presumably using at least one core and so there is no reason for
> the
> > > > >> execution plan thread to try and do any more work anyways.
> > > > >>
> > > > >> [1]
> > https://lists.apache.org/thread/qvy89hxtl1gp8n6l6hdb65cpjohpxpfx
> > > > >>
> > > > >>
> > >
> >
>

Reply via email to