The general design seems reasonable to me. However, I think the multithreading 
issue warrants a (perhaps separate) discussion, in view of the risk that 
Arrow's multithreading model would end up being hard to interoperate with that 
of other libraries used to implement UDFs. Such interoperability problem could 
lead to hard-to-debug programs and performance degradation; for example, they 
are known between user-level threads, MPI and OpenMP [1] [2]. Some questions to 
consider in this context:

  1.  How to allocate compute threads between Arrow and locally executing UDFs, 
avoiding unnecessarily competition over local compute resources? How does this 
look for CPU and GPU compute resources?
  2.  How to schedule compute threads as a set/unit for a locally running UDF 
that requires them?
  3.  How to express the number (and CPU/GPU kind) of local compute threads 
that a UDF requires? Is this number static or dynamic?
  4.  How to handle the case of a UDF that runs at a remote service and 
requires no local compute threads?
  5.  How to allocate local CPU resources between Arrow and UDFs running in 
local processes external to Arrow?

Since this is quite a complex topic, instead of trying to support multiple UDF 
threading facilities within Arrow, I think it might be easier, at least as a 
first approach, to expose in Arrow a threading-model control API (I'm not aware 
of such an API, if one exists). An Arrow-with-UDFs user, who best knows the 
compute needs of the UDFs in the execution plan, would then be able to allocate 
compute resources to Arrow through this API and separately to these UDFs.


[1] https://www.bolt-omp.org/
[2] https://www.openmp.org/wp-content/uploads/SC19-Iwasaki-Threads.pdf


Yaron.

________________________________
From: Weston Pace <weston.p...@gmail.com>
Sent: Thursday, May 5, 2022 4:30 PM
To: dev@arrow.apache.org <dev@arrow.apache.org>
Subject: Re: RFC: Out of Process Python UDFs in Arrow Compute

Yes, I think you have the right understanding.  That is what I was
originally trying to say by pointing out that our current solution
does not solve the serialization problem.

I think there is probably room for multiple approaches to this
problem.  Each will have their own tradeoffs.  For example, a recent
email thread[1] also proposed or mentioned providing UDFs:
 * Through LLVM
 * Through WASM
 * Through Gandiva

I think there is room for all of these things.  From an execution
engine perspective I think the main concern would be making sure we
have the abstractions and extension points in place to enable
development of all of the above models.  Some of the concerns in your
document are maybe beyond the jurisdiction of the execution engine
(e.g. managing the lifecycle of UDF workers and processes).  However,
they could still be part of the Arrow-C++ library, I'm just thinking
it makes sense to decouple them.

At the moment, I'm not aware of much is needed in the execution engine
beyond what we have.  If you (or anyone) has things that we can do in
the execution engine then it would be good to identify them early.
This is the most important thing to figure out first while the details
of the actual UDF harnesses can be worked out as people have need of
them.  The "function pointer" extension point is very generic.
However, just brainstorming, I can think of some possible extensions
and utilities that might enable UDF harness development, although I
think some of these are still Substrait concerns and not execution
plan concerns (though they may need work in the Substrait consumer):

 * The ability to register a UDF scoped to an execution
    plan, instead of globally
 * The ability to serialize a UDF definition (and not just the
    declaration / invocation) in a Substrait plan
 * A serialization format for UDF options (Substrait uses literal
    expressions for this, which are serializable, but the execution
    plan has POCOs which are not)
 * The ability to register a common function pointer for a whole
    class of functions (could use Substrait function URI for this).
    The "call" object would have to be sent to the handler in addition
    to the arguments.
 * Add a stop token to the context passed to the UDF.  This
    can be used by a long running UDF to check and see if the user
    has cancelled the execution plan in which case the long running
    UDF should abort and clean up its work.

If a UDF handler wants to delegate work to a different thread (or
multiple threads or even multiple processes) then I think that is
fine.  Just block the calling thread until all the work is done
(periodically checking to make sure a cancellation hasn't been
requested).  One of the execution engine's CPU threads will be blocked
for the duration of this function call.

If the UDF is not the bottleneck for the entire plan then this
multithreading may be ineffective or actively harmful (e.g. if the UDF
is lightweight then maybe a majority of the time is spent in a
downstream hash-join or aggregation.  By creating multiple threads you
are just increasing the chance this downstream task will be forced to
context switch).  The execution engine will already call the UDF from
multiple threads.  By default you can expect the UDF to be called up
to N times simultaneously where N is the # of cores on the machines
(std::hardware_concurrency).  From python there is some GIL concern
here but most numpy or pandas operations that do any substantial
amount of work are going to be releasing the GIL anyways.  However,
there could certainly be cases where multithreading within the UDF
might be helpful or multiprocessing makes sense.  I don't know of any
reason the execution engine itself would need to be made aware that
this was happening though.

One could argue that the execution engine should not block a CPU
thread if it is invoking a long-running UDF.  It could use that CPU
thread to do other work in the plan that needs doing.  However, I
think this is unlikely to be useful.  Whatever is running the UDF is
presumably using at least one core and so there is no reason for the
execution plan thread to try and do any more work anyways.

[1] https://lists.apache.org/thread/qvy89hxtl1gp8n6l6hdb65cpjohpxpfx

Reply via email to