To add:
I am open to other potential solutions to solve the three steps above, but
just want to get on the same page of what are the minimal issues we need to
solve in order to "define UDF with ibis and execute with Arrow" and avoid
getting into too detailed discussions.

On Fri, May 6, 2022 at 2:58 PM Li Jin <ice.xell...@gmail.com> wrote:

> Weston - Thanks for the thoughtful reply. It was quite useful.
>
> RE:
> "
> At the moment, I'm not aware of much is needed in the execution engine
> beyond what we have.  If you (or anyone) has things that we can do in
> the execution engine then it would be good to identify them early.
> This is the most important thing to figure out first while the details
> of the actual UDF harnesses can be worked out as people have need of
> them.
> "
>
> From my perspective, the *minimal* functionality that we want to achieve
> is to define a Scalar UDF with the ibis API (Ibis already have an API to
> define Scalar UDFs), generate substrait query plan and execute it with
> Arrow compute. Or in other words, this is a "test program" that I want to
> run via Arrow compute:
>
> ```
> @ibis.vectorized_udf.elementwise(input_type=[dt.double], output_type=[dt.
> double])
> def add_one(s: pd.Series) -> pd.Series:
>      return s + 1
>  t = t.mutate(new_col=add_one(t[col]))
> ```
>
> Now I am not sure exactly what additional work needs to be done in order
> to enable this, but at the high level I think:
> (1) A way to serialize the user function (and function parameters) (in the
> doc I proposed to use cloudpickle based approach similar to how Spark does
> it)
> (2) Add a Substrait relation/expression that captures the the UDF
> (3) Deserialize the Substrait relation/expression in Arrow compute and
> execute the UDF (either using the approach in the current Scalar UDF
> prototype or do sth else)
>
> I think many of the concerns above (i.e. single thread vs multi threads)
> are valid but more of the next step IMO.
>
> I wonder if you also think these are the basic problems that need to be
> solved in order to enable "define udf in ibis and execute with Arrow
> compute"?
>
>
>
> On Fri, May 6, 2022 at 4:28 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
>> The general design seems reasonable to me. However, I think the
>> multithreading issue warrants a (perhaps separate) discussion, in view of
>> the risk that Arrow's multithreading model would end up being hard to
>> interoperate with that of other libraries used to implement UDFs. Such
>> interoperability problem could lead to hard-to-debug programs and
>> performance degradation; for example, they are known between user-level
>> threads, MPI and OpenMP [1] [2]. Some questions to consider in this context:
>>
>>   1.  How to allocate compute threads between Arrow and locally executing
>> UDFs, avoiding unnecessarily competition over local compute resources? How
>> does this look for CPU and GPU compute resources?
>>   2.  How to schedule compute threads as a set/unit for a locally running
>> UDF that requires them?
>>   3.  How to express the number (and CPU/GPU kind) of local compute
>> threads that a UDF requires? Is this number static or dynamic?
>>   4.  How to handle the case of a UDF that runs at a remote service and
>> requires no local compute threads?
>>   5.  How to allocate local CPU resources between Arrow and UDFs running
>> in local processes external to Arrow?
>>
>> Since this is quite a complex topic, instead of trying to support
>> multiple UDF threading facilities within Arrow, I think it might be easier,
>> at least as a first approach, to expose in Arrow a threading-model control
>> API (I'm not aware of such an API, if one exists). An Arrow-with-UDFs user,
>> who best knows the compute needs of the UDFs in the execution plan, would
>> then be able to allocate compute resources to Arrow through this API and
>> separately to these UDFs.
>>
>>
>> [1] https://www.bolt-omp.org/
>> [2] https://www.openmp.org/wp-content/uploads/SC19-Iwasaki-Threads.pdf
>>
>>
>> Yaron.
>>
>> ________________________________
>> From: Weston Pace <weston.p...@gmail.com>
>> Sent: Thursday, May 5, 2022 4:30 PM
>> To: dev@arrow.apache.org <dev@arrow.apache.org>
>> Subject: Re: RFC: Out of Process Python UDFs in Arrow Compute
>>
>> Yes, I think you have the right understanding.  That is what I was
>> originally trying to say by pointing out that our current solution
>> does not solve the serialization problem.
>>
>> I think there is probably room for multiple approaches to this
>> problem.  Each will have their own tradeoffs.  For example, a recent
>> email thread[1] also proposed or mentioned providing UDFs:
>>  * Through LLVM
>>  * Through WASM
>>  * Through Gandiva
>>
>> I think there is room for all of these things.  From an execution
>> engine perspective I think the main concern would be making sure we
>> have the abstractions and extension points in place to enable
>> development of all of the above models.  Some of the concerns in your
>> document are maybe beyond the jurisdiction of the execution engine
>> (e.g. managing the lifecycle of UDF workers and processes).  However,
>> they could still be part of the Arrow-C++ library, I'm just thinking
>> it makes sense to decouple them.
>>
>> At the moment, I'm not aware of much is needed in the execution engine
>> beyond what we have.  If you (or anyone) has things that we can do in
>> the execution engine then it would be good to identify them early.
>> This is the most important thing to figure out first while the details
>> of the actual UDF harnesses can be worked out as people have need of
>> them.  The "function pointer" extension point is very generic.
>> However, just brainstorming, I can think of some possible extensions
>> and utilities that might enable UDF harness development, although I
>> think some of these are still Substrait concerns and not execution
>> plan concerns (though they may need work in the Substrait consumer):
>>
>>  * The ability to register a UDF scoped to an execution
>>     plan, instead of globally
>>  * The ability to serialize a UDF definition (and not just the
>>     declaration / invocation) in a Substrait plan
>>  * A serialization format for UDF options (Substrait uses literal
>>     expressions for this, which are serializable, but the execution
>>     plan has POCOs which are not)
>>  * The ability to register a common function pointer for a whole
>>     class of functions (could use Substrait function URI for this).
>>     The "call" object would have to be sent to the handler in addition
>>     to the arguments.
>>  * Add a stop token to the context passed to the UDF.  This
>>     can be used by a long running UDF to check and see if the user
>>     has cancelled the execution plan in which case the long running
>>     UDF should abort and clean up its work.
>>
>> If a UDF handler wants to delegate work to a different thread (or
>> multiple threads or even multiple processes) then I think that is
>> fine.  Just block the calling thread until all the work is done
>> (periodically checking to make sure a cancellation hasn't been
>> requested).  One of the execution engine's CPU threads will be blocked
>> for the duration of this function call.
>>
>> If the UDF is not the bottleneck for the entire plan then this
>> multithreading may be ineffective or actively harmful (e.g. if the UDF
>> is lightweight then maybe a majority of the time is spent in a
>> downstream hash-join or aggregation.  By creating multiple threads you
>> are just increasing the chance this downstream task will be forced to
>> context switch).  The execution engine will already call the UDF from
>> multiple threads.  By default you can expect the UDF to be called up
>> to N times simultaneously where N is the # of cores on the machines
>> (std::hardware_concurrency).  From python there is some GIL concern
>> here but most numpy or pandas operations that do any substantial
>> amount of work are going to be releasing the GIL anyways.  However,
>> there could certainly be cases where multithreading within the UDF
>> might be helpful or multiprocessing makes sense.  I don't know of any
>> reason the execution engine itself would need to be made aware that
>> this was happening though.
>>
>> One could argue that the execution engine should not block a CPU
>> thread if it is invoking a long-running UDF.  It could use that CPU
>> thread to do other work in the plan that needs doing.  However, I
>> think this is unlikely to be useful.  Whatever is running the UDF is
>> presumably using at least one core and so there is no reason for the
>> execution plan thread to try and do any more work anyways.
>>
>> [1] https://lists.apache.org/thread/qvy89hxtl1gp8n6l6hdb65cpjohpxpfx
>>
>>

Reply via email to