The general design seems reasonable to me. However, I think the multithreading issue warrants a (perhaps separate) discussion, in view of the risk that Arrow's multithreading model would end up being hard to interoperate with that of other libraries used to implement UDFs. Such interoperability problem could lead to hard-to-debug programs and performance degradation; for example, they are known between user-level threads, MPI and OpenMP [1] [2]. Some questions to consider in this context:
1. How to allocate compute threads between Arrow and locally executing UDFs, avoiding unnecessarily competition over local compute resources? How does this look for CPU and GPU compute resources? 2. How to schedule compute threads as a set/unit for a locally running UDF that requires them? 3. How to express the number (and CPU/GPU kind) of local compute threads that a UDF requires? Is this number static or dynamic? 4. How to handle the case of a UDF that runs at a remote service and requires no local compute threads? 5. How to allocate local CPU resources between Arrow and UDFs running in local processes external to Arrow? Since this is quite a complex topic, instead of trying to support multiple UDF threading facilities within Arrow, I think it might be easier, at least as a first approach, to expose in Arrow a threading-model control API (I'm not aware of such an API, if one exists). An Arrow-with-UDFs user, who best knows the compute needs of the UDFs in the execution plan, would then be able to allocate compute resources to Arrow through this API and separately to these UDFs. [1] https://www.bolt-omp.org/ [2] https://www.openmp.org/wp-content/uploads/SC19-Iwasaki-Threads.pdf Yaron. ________________________________ From: Weston Pace <weston.p...@gmail.com> Sent: Thursday, May 5, 2022 4:30 PM To: dev@arrow.apache.org <dev@arrow.apache.org> Subject: Re: RFC: Out of Process Python UDFs in Arrow Compute Yes, I think you have the right understanding. That is what I was originally trying to say by pointing out that our current solution does not solve the serialization problem. I think there is probably room for multiple approaches to this problem. Each will have their own tradeoffs. For example, a recent email thread[1] also proposed or mentioned providing UDFs: * Through LLVM * Through WASM * Through Gandiva I think there is room for all of these things. From an execution engine perspective I think the main concern would be making sure we have the abstractions and extension points in place to enable development of all of the above models. Some of the concerns in your document are maybe beyond the jurisdiction of the execution engine (e.g. managing the lifecycle of UDF workers and processes). However, they could still be part of the Arrow-C++ library, I'm just thinking it makes sense to decouple them. At the moment, I'm not aware of much is needed in the execution engine beyond what we have. If you (or anyone) has things that we can do in the execution engine then it would be good to identify them early. This is the most important thing to figure out first while the details of the actual UDF harnesses can be worked out as people have need of them. The "function pointer" extension point is very generic. However, just brainstorming, I can think of some possible extensions and utilities that might enable UDF harness development, although I think some of these are still Substrait concerns and not execution plan concerns (though they may need work in the Substrait consumer): * The ability to register a UDF scoped to an execution plan, instead of globally * The ability to serialize a UDF definition (and not just the declaration / invocation) in a Substrait plan * A serialization format for UDF options (Substrait uses literal expressions for this, which are serializable, but the execution plan has POCOs which are not) * The ability to register a common function pointer for a whole class of functions (could use Substrait function URI for this). The "call" object would have to be sent to the handler in addition to the arguments. * Add a stop token to the context passed to the UDF. This can be used by a long running UDF to check and see if the user has cancelled the execution plan in which case the long running UDF should abort and clean up its work. If a UDF handler wants to delegate work to a different thread (or multiple threads or even multiple processes) then I think that is fine. Just block the calling thread until all the work is done (periodically checking to make sure a cancellation hasn't been requested). One of the execution engine's CPU threads will be blocked for the duration of this function call. If the UDF is not the bottleneck for the entire plan then this multithreading may be ineffective or actively harmful (e.g. if the UDF is lightweight then maybe a majority of the time is spent in a downstream hash-join or aggregation. By creating multiple threads you are just increasing the chance this downstream task will be forced to context switch). The execution engine will already call the UDF from multiple threads. By default you can expect the UDF to be called up to N times simultaneously where N is the # of cores on the machines (std::hardware_concurrency). From python there is some GIL concern here but most numpy or pandas operations that do any substantial amount of work are going to be releasing the GIL anyways. However, there could certainly be cases where multithreading within the UDF might be helpful or multiprocessing makes sense. I don't know of any reason the execution engine itself would need to be made aware that this was happening though. One could argue that the execution engine should not block a CPU thread if it is invoking a long-running UDF. It could use that CPU thread to do other work in the plan that needs doing. However, I think this is unlikely to be useful. Whatever is running the UDF is presumably using at least one core and so there is no reason for the execution plan thread to try and do any more work anyways. [1] https://lists.apache.org/thread/qvy89hxtl1gp8n6l6hdb65cpjohpxpfx