@Yaron > 1. How to allocate compute threads between Arrow and locally executing UDFs, > avoiding unnecessarily competition over local compute resources?
In general, there is no reason for UDFs to allocate threads. The project & filter nodes are trivially parallelizable. As long as there is sufficient data then almost any project/filter pipeline will either be I/O bound or accumulate enough batches to run at full parallelism. However, if for some reason you wanted to share, this could be done by using a custom executor. Every arrow::compute::ExecPlan instance is created with an arrow::compute::ExecContext instance. Part of arrow::compute::ExecContext is an arrow::internal::Executor*. The Executor is an interface and users can supply their own custom implementation. By default we use Arrow's CPU thread pool which is allocated with static storage duration. However, a user that has their own thread pool that Arrow needs to share can supply their own implementation of this interface. > How does this look for CPU and GPU compute resources? That's a good question and one I don't have the expertise to answer though I am interested in learning more about it. I think it is important to design the execution engine to allow for this. > 2. How to schedule compute threads as a set/unit for a locally running UDF > that requires them? I'm not sure I understand this question. > 3. How to express the number (and CPU/GPU kind) of local compute threads > that a UDF requires? Is this number static or dynamic? This seems like a UDF concern. > 4. How to handle the case of a UDF that runs at a remote service and > requires no local compute threads? This is venturing into distributed execution territory. For example, at this point you might want to implement this at the node level with some kind of shuffle-style operator instead of trying to implement this at the function level. Maybe something that could help is a TableUdfNode which has its own registry of function pointers that take in ExecBatch and output Future<ExecBatch>. The node could accumulate data until it has some meaningful amount of rows to justify the call. Then it could make the call and, since the call returns a future, the compute thread would not be blocked. When the function finally returns we could slice up the received batch into L2 sized batches for downstream computation. We would need to implement some kind of backpressure queue here, similar to what we have in the sink node (and will eventually need in hash/join and aggregate as well). While this node would not be a pipeline breaker in the "doesn't emit until it receives all data" sense it would be a pipeline breaker in the "the thread task on the left side will be different than the thread task on the right side" sense. > 5. How to allocate local CPU resources between Arrow and UDFs running in > local processes external to Arrow? I think the answer here is the same as #1 but I don't know how much you can really do here outside of maybe something like CPU affinity and shrinking the size of Arrow's CPU thread pool. @Li > I wonder if you also think these are the basic problems that need to be > solved in order to enable "define udf in ibis and execute with Arrow > compute"? Yes, I think you're right. We're probably getting a little ahead of ourselves in terms of details. Your MVP and steps listed sound like a good place to start. > I am open to other potential solutions to solve the three steps above, but > just want to get on the same page of what are the minimal issues we need to > solve in order to "define UDF with ibis and execute with Arrow" and avoid > getting into too detailed discussions. Sounds good. I appreciate your proposal and input. This has been a very fun discussion and it is very useful to get more input into how the system might be used as it helps with implementation. On Fri, May 6, 2022 at 9:02 AM Li Jin <ice.xell...@gmail.com> wrote: > > To add: > I am open to other potential solutions to solve the three steps above, but > just want to get on the same page of what are the minimal issues we need to > solve in order to "define UDF with ibis and execute with Arrow" and avoid > getting into too detailed discussions. > > On Fri, May 6, 2022 at 2:58 PM Li Jin <ice.xell...@gmail.com> wrote: > > > Weston - Thanks for the thoughtful reply. It was quite useful. > > > > RE: > > " > > At the moment, I'm not aware of much is needed in the execution engine > > beyond what we have. If you (or anyone) has things that we can do in > > the execution engine then it would be good to identify them early. > > This is the most important thing to figure out first while the details > > of the actual UDF harnesses can be worked out as people have need of > > them. > > " > > > > From my perspective, the *minimal* functionality that we want to achieve > > is to define a Scalar UDF with the ibis API (Ibis already have an API to > > define Scalar UDFs), generate substrait query plan and execute it with > > Arrow compute. Or in other words, this is a "test program" that I want to > > run via Arrow compute: > > > > ``` > > @ibis.vectorized_udf.elementwise(input_type=[dt.double], output_type=[dt. > > double]) > > def add_one(s: pd.Series) -> pd.Series: > > return s + 1 > > t = t.mutate(new_col=add_one(t[col])) > > ``` > > > > Now I am not sure exactly what additional work needs to be done in order > > to enable this, but at the high level I think: > > (1) A way to serialize the user function (and function parameters) (in the > > doc I proposed to use cloudpickle based approach similar to how Spark does > > it) > > (2) Add a Substrait relation/expression that captures the the UDF > > (3) Deserialize the Substrait relation/expression in Arrow compute and > > execute the UDF (either using the approach in the current Scalar UDF > > prototype or do sth else) > > > > I think many of the concerns above (i.e. single thread vs multi threads) > > are valid but more of the next step IMO. > > > > I wonder if you also think these are the basic problems that need to be > > solved in order to enable "define udf in ibis and execute with Arrow > > compute"? > > > > > > > > On Fri, May 6, 2022 at 4:28 AM Yaron Gvili <rt...@hotmail.com> wrote: > > > >> The general design seems reasonable to me. However, I think the > >> multithreading issue warrants a (perhaps separate) discussion, in view of > >> the risk that Arrow's multithreading model would end up being hard to > >> interoperate with that of other libraries used to implement UDFs. Such > >> interoperability problem could lead to hard-to-debug programs and > >> performance degradation; for example, they are known between user-level > >> threads, MPI and OpenMP [1] [2]. Some questions to consider in this > >> context: > >> > >> 1. How to allocate compute threads between Arrow and locally executing > >> UDFs, avoiding unnecessarily competition over local compute resources? How > >> does this look for CPU and GPU compute resources? > >> 2. How to schedule compute threads as a set/unit for a locally running > >> UDF that requires them? > >> 3. How to express the number (and CPU/GPU kind) of local compute > >> threads that a UDF requires? Is this number static or dynamic? > >> 4. How to handle the case of a UDF that runs at a remote service and > >> requires no local compute threads? > >> 5. How to allocate local CPU resources between Arrow and UDFs running > >> in local processes external to Arrow? > >> > >> Since this is quite a complex topic, instead of trying to support > >> multiple UDF threading facilities within Arrow, I think it might be easier, > >> at least as a first approach, to expose in Arrow a threading-model control > >> API (I'm not aware of such an API, if one exists). An Arrow-with-UDFs user, > >> who best knows the compute needs of the UDFs in the execution plan, would > >> then be able to allocate compute resources to Arrow through this API and > >> separately to these UDFs. > >> > >> > >> [1] https://www.bolt-omp.org/ > >> [2] https://www.openmp.org/wp-content/uploads/SC19-Iwasaki-Threads.pdf > >> > >> > >> Yaron. > >> > >> ________________________________ > >> From: Weston Pace <weston.p...@gmail.com> > >> Sent: Thursday, May 5, 2022 4:30 PM > >> To: dev@arrow.apache.org <dev@arrow.apache.org> > >> Subject: Re: RFC: Out of Process Python UDFs in Arrow Compute > >> > >> Yes, I think you have the right understanding. That is what I was > >> originally trying to say by pointing out that our current solution > >> does not solve the serialization problem. > >> > >> I think there is probably room for multiple approaches to this > >> problem. Each will have their own tradeoffs. For example, a recent > >> email thread[1] also proposed or mentioned providing UDFs: > >> * Through LLVM > >> * Through WASM > >> * Through Gandiva > >> > >> I think there is room for all of these things. From an execution > >> engine perspective I think the main concern would be making sure we > >> have the abstractions and extension points in place to enable > >> development of all of the above models. Some of the concerns in your > >> document are maybe beyond the jurisdiction of the execution engine > >> (e.g. managing the lifecycle of UDF workers and processes). However, > >> they could still be part of the Arrow-C++ library, I'm just thinking > >> it makes sense to decouple them. > >> > >> At the moment, I'm not aware of much is needed in the execution engine > >> beyond what we have. If you (or anyone) has things that we can do in > >> the execution engine then it would be good to identify them early. > >> This is the most important thing to figure out first while the details > >> of the actual UDF harnesses can be worked out as people have need of > >> them. The "function pointer" extension point is very generic. > >> However, just brainstorming, I can think of some possible extensions > >> and utilities that might enable UDF harness development, although I > >> think some of these are still Substrait concerns and not execution > >> plan concerns (though they may need work in the Substrait consumer): > >> > >> * The ability to register a UDF scoped to an execution > >> plan, instead of globally > >> * The ability to serialize a UDF definition (and not just the > >> declaration / invocation) in a Substrait plan > >> * A serialization format for UDF options (Substrait uses literal > >> expressions for this, which are serializable, but the execution > >> plan has POCOs which are not) > >> * The ability to register a common function pointer for a whole > >> class of functions (could use Substrait function URI for this). > >> The "call" object would have to be sent to the handler in addition > >> to the arguments. > >> * Add a stop token to the context passed to the UDF. This > >> can be used by a long running UDF to check and see if the user > >> has cancelled the execution plan in which case the long running > >> UDF should abort and clean up its work. > >> > >> If a UDF handler wants to delegate work to a different thread (or > >> multiple threads or even multiple processes) then I think that is > >> fine. Just block the calling thread until all the work is done > >> (periodically checking to make sure a cancellation hasn't been > >> requested). One of the execution engine's CPU threads will be blocked > >> for the duration of this function call. > >> > >> If the UDF is not the bottleneck for the entire plan then this > >> multithreading may be ineffective or actively harmful (e.g. if the UDF > >> is lightweight then maybe a majority of the time is spent in a > >> downstream hash-join or aggregation. By creating multiple threads you > >> are just increasing the chance this downstream task will be forced to > >> context switch). The execution engine will already call the UDF from > >> multiple threads. By default you can expect the UDF to be called up > >> to N times simultaneously where N is the # of cores on the machines > >> (std::hardware_concurrency). From python there is some GIL concern > >> here but most numpy or pandas operations that do any substantial > >> amount of work are going to be releasing the GIL anyways. However, > >> there could certainly be cases where multithreading within the UDF > >> might be helpful or multiprocessing makes sense. I don't know of any > >> reason the execution engine itself would need to be made aware that > >> this was happening though. > >> > >> One could argue that the execution engine should not block a CPU > >> thread if it is invoking a long-running UDF. It could use that CPU > >> thread to do other work in the plan that needs doing. However, I > >> think this is unlikely to be useful. Whatever is running the UDF is > >> presumably using at least one core and so there is no reason for the > >> execution plan thread to try and do any more work anyways. > >> > >> [1] https://lists.apache.org/thread/qvy89hxtl1gp8n6l6hdb65cpjohpxpfx > >> > >>