Yes, I think you have the right understanding. That is what I was originally trying to say by pointing out that our current solution does not solve the serialization problem.
I think there is probably room for multiple approaches to this problem. Each will have their own tradeoffs. For example, a recent email thread[1] also proposed or mentioned providing UDFs: * Through LLVM * Through WASM * Through Gandiva I think there is room for all of these things. From an execution engine perspective I think the main concern would be making sure we have the abstractions and extension points in place to enable development of all of the above models. Some of the concerns in your document are maybe beyond the jurisdiction of the execution engine (e.g. managing the lifecycle of UDF workers and processes). However, they could still be part of the Arrow-C++ library, I'm just thinking it makes sense to decouple them. At the moment, I'm not aware of much is needed in the execution engine beyond what we have. If you (or anyone) has things that we can do in the execution engine then it would be good to identify them early. This is the most important thing to figure out first while the details of the actual UDF harnesses can be worked out as people have need of them. The "function pointer" extension point is very generic. However, just brainstorming, I can think of some possible extensions and utilities that might enable UDF harness development, although I think some of these are still Substrait concerns and not execution plan concerns (though they may need work in the Substrait consumer): * The ability to register a UDF scoped to an execution plan, instead of globally * The ability to serialize a UDF definition (and not just the declaration / invocation) in a Substrait plan * A serialization format for UDF options (Substrait uses literal expressions for this, which are serializable, but the execution plan has POCOs which are not) * The ability to register a common function pointer for a whole class of functions (could use Substrait function URI for this). The "call" object would have to be sent to the handler in addition to the arguments. * Add a stop token to the context passed to the UDF. This can be used by a long running UDF to check and see if the user has cancelled the execution plan in which case the long running UDF should abort and clean up its work. If a UDF handler wants to delegate work to a different thread (or multiple threads or even multiple processes) then I think that is fine. Just block the calling thread until all the work is done (periodically checking to make sure a cancellation hasn't been requested). One of the execution engine's CPU threads will be blocked for the duration of this function call. If the UDF is not the bottleneck for the entire plan then this multithreading may be ineffective or actively harmful (e.g. if the UDF is lightweight then maybe a majority of the time is spent in a downstream hash-join or aggregation. By creating multiple threads you are just increasing the chance this downstream task will be forced to context switch). The execution engine will already call the UDF from multiple threads. By default you can expect the UDF to be called up to N times simultaneously where N is the # of cores on the machines (std::hardware_concurrency). From python there is some GIL concern here but most numpy or pandas operations that do any substantial amount of work are going to be releasing the GIL anyways. However, there could certainly be cases where multithreading within the UDF might be helpful or multiprocessing makes sense. I don't know of any reason the execution engine itself would need to be made aware that this was happening though. One could argue that the execution engine should not block a CPU thread if it is invoking a long-running UDF. It could use that CPU thread to do other work in the plan that needs doing. However, I think this is unlikely to be useful. Whatever is running the UDF is presumably using at least one core and so there is no reason for the execution plan thread to try and do any more work anyways. [1] https://lists.apache.org/thread/qvy89hxtl1gp8n6l6hdb65cpjohpxpfx On Thu, May 5, 2022 at 8:45 AM Li Jin <ice.xell...@gmail.com> wrote: > > After reading the above PR, I think I understand the approach a bit more > now. > > If I understand this correctly, the above UDF functionality is similar to > what I have in mind. The main difference seems to be "where and how are the > UDF executed" > > (1) In the PR above, the UDF is passed to the Compute engine as a function > pointer and therefore doesn't need any serialization of the user function. > And the Compute engine calls back into the user Python interpreter with > that function pointer, along with input data. > (2) In the proposal, the UDF is passed to the Compute engine as > deserialized bytes. And the Compute engine starts the extra worker > processes, passes the deserialized bytes and data to it. > > I am thinking one limitation of (1) is that this is not very compatible > with the ibis -> substrait -> compute engine approach. If we want to use > substrait then I think we need to find a way to serialize and deserialize > the UDF pass through substrait (because we cannot pass function pointers if > substrait is involved). Prior art of this is how PySpark does it - it > basically cloudpickle the function, + some python includes and the python > interpreter path. This works reasonably well for us even in a distributed > environment - we do have to make sure the python interpreter exists on all > workers and are the same.Another limitation that I can think of is how much > data can the user Python interpreter process - I am not sure how currently > parallel processing is considered in the above PR but my hunch is there > could be perf issues with just a single Python process to run the UDF. > > I think that (2) can perhaps complement the two limitations above. > > Thoughts? > Li > > > > > On Thu, May 5, 2022 at 9:51 AM Li Jin <ice.xell...@gmail.com> wrote: > > > "def pandas_rank(ctx, arr): > > series = arr.to_pandas() > > rank = series.rank() > > return pa.array(rank) > > " > > > > Oh nice! I didn't get that from the original PR and does look this is > > closer to the problem I am trying to solve. At this point I will understand > > more about that PR and see if what I proposed is even needed :) > > > > Are there any high level docs/writings about how this works? (If not, I > > can always read the PR but just hoping there is something quicker to > > understand). > > > > Maybe a couple of high level questions that could help me understand? > > (1) Where and how are the UDF executed? i.e. (Separate Python worker > > process? The user Python process? Or somewhere else?) > > (2) How is the data passed from/to between C++ and Python? (flight? sth > > else?) > > (3) Is there a way to use this without pyarrow compute API? Currently we > > are planning to go from ibis -> subtrait -> c++ compute and we don't really > > have pyarrow compute API in the picture. > > > > Thanks! > > Li > > Li > > > > On Wed, May 4, 2022 at 10:59 PM Weston Pace <weston.p...@gmail.com> wrote: > > > >> > However, if I > >> > understand correctly, the UDF implemented in the PR above are still > >> > "composition of existing C++ kernels" instead of > >> > "arbitrary pandas/numpy" code, so it kind of resolves a > >> > different problem IMO. > >> > >> That is not a correct understanding, though it is an understandable > >> one as we don't have any examples of anything else at the moment (the > >> tests use pyarrow compute simply to avoid requiring additional > >> dependencies). The implementation we have today requires that you > >> start with pyarrow.Array and end with pyarrow.Array. However, there > >> are no rules about what you can do in between. So if you want to > >> convert the array to pandas or numpy that's perfectly fine (might be > >> good to have some cookbook examples of this). This should work with > >> the latest master: > >> > >> ``` > >> import pandas as pd > >> import pyarrow as pa > >> import pyarrow.compute as pc > >> import pyarrow.dataset as ds > >> > >> def pandas_rank(ctx, arr): > >> series = arr.to_pandas() > >> rank = series.rank() > >> return pa.array(rank) > >> > >> function_doc = { > >> 'summary': 'Compute rank of array using pandas', > >> 'description': "Compute rank of array using pandas (I'm a > >> description)" > >> } > >> > >> pc.register_scalar_function(pandas_rank, > >> 'pandas_rank', > >> function_doc, > >> {'vals': pa.int64()}, > >> # pandas rank returns double? > >> pa.float64()) > >> > >> table = pa.Table.from_pydict({'values': [3, 1, 8, 5, 4, 9]}) > >> dataset = ds.dataset(table) > >> # TODO (ARROW-16475) use something better than _call > >> projection_expr = pc.Expression._call('pandas_rank', [ds.field('values')]) > >> print(dataset.to_table(columns={'rank': projection_expr})) > >> # pyarrow.Table > >> # rank: double > >> # ---- > >> # rank: [[2,1,5,4,3,6]] > >> ``` > >> > >> In fact, you could even implement the separate worker server > >> completely in python using the existing mechanism today (I don't know > >> if that would be a good idea or not). > >> > >> > Sorry I am not sure what your question is - do you mean the > >> > proposal here doesn't address serialization of UDFs and you > >> > want to ask about that? > >> > >> Sorry, I was just pointing out that the PR that has already been > >> implemented did not add any kind of support for this. > >> > >> > I am not sure either. This can perhaps be a kernel instead of a node. > >> The > >> > reason that I put this a node right now is because I think this would > >> > involves life cycle management for python workers so > >> > it doesn't feel like a "kernel" to me. But I am not strong on that. > >> > >> The lifecycle management for the workers could probably extend beyond > >> any single plan. I would imagine it is something you want to start at > >> the beginning of your application and then tear down only at the end > >> but I don't know your use case as well so I could be wrong. The UDF > >> then would only be calling into some kind of service. > >> > >> The challenge with a UDF node is that the main benefit of registering > >> UDFs is to enable them for use in existing nodes. For example, the > >> scalar UDFs we have today can be used in the project node and the > >> filter node. If you were to create a separate UDF node you would need > >> a "project UDF node" and a "filter UDF node". > >> > >> On Wed, May 4, 2022 at 10:55 AM Li Jin <ice.xell...@gmail.com> wrote: > >> > > >> > Weston - Yes I have seen the pull request above (very cool). However, > >> if I > >> > understand correctly, the UDF implemented in the PR above are still > >> > "composition of existing C++ kernels" instead of "arbitrary > >> pandas/numpy" > >> > code, so it kind of resolves a different problem IMO. > >> > > >> > For example, if I want to compute "normal distribution percentile > >> rank", I > >> > cannot quite use the scalar UDF above to do that (we would need to add a > >> > kernel similar to scipy.stats.norm.ppf). > >> > > >> > 1. Why do you want to run these UDFs in a separate process? Is this > >> > for robustness (if the UDF crashes the process recovery is easier) or > >> > performance (potentially working around the GIL if you have multiple > >> > python processes)? Have you given much thought to how the data would > >> > travel back and forth between the processes? For example, via sockets > >> > (maybe flight) or shared memory? > >> > > >> > The main reason that I want to run UDFs in a separate process is that > >> this > >> > is the only way that I think can run any arbitrary pandas/numpy code > >> > (assuming these are stateless pure functions). > >> > It does have side benefits like it won't crash the compute engine if the > >> > UDF is buggy. With trade off being less performant and involves managing > >> > additional python worker process. > >> > I don't have much though on data communication channels, but I think sth > >> > like a local socket should work reasonably well, and bonus if we can > >> have > >> > less copy / shared memory. > >> > From my experience the overhead that is OK considering these UDFs often > >> > involves more complicated math or relation computations (otherwise it > >> > doesn't need to be UDF at all). > >> > > >> > "2. The current implementation doesn't address serialization of UDFs." > >> > Sorry I am not sure what your question is - do you mean the proposal > >> here > >> > doesn't address serialization of UDFs and you want to ask about that? > >> > > >> > "I'm not sure a separate ExecNode would be necessary. So far we've > >> > implemented UDFs at the kernel function level and I think we can > >> > continue to do that, even if we are calling out of process workers." > >> > > >> > I am not sure either. This can perhaps be a kernel instead of a node. > >> The > >> > reason that I put this a node right now is because I think this would > >> > involves life cycle management for python workers so > >> > it doesn't feel like a "kernel" to me. But I am not strong on that. > >> > > >> > > >> > > >> > On Wed, May 4, 2022 at 4:06 PM Weston Pace <weston.p...@gmail.com> > >> wrote: > >> > > >> > > Hi Li, have you seen the python UDF prototype that we just recently > >> > > merged into the execution engine at [1]? It adds support for scalar > >> > > UDFs. > >> > > > >> > > Comparing your proposal to what we've done so far I would ask: > >> > > > >> > > 1. Why do you want to run these UDFs in a separate process? Is this > >> > > for robustness (if the UDF crashes the process recovery is easier) or > >> > > performance (potentially working around the GIL if you have multiple > >> > > python processes)? Have you given much thought to how the data would > >> > > travel back and forth between the processes? For example, via sockets > >> > > (maybe flight) or shared memory? > >> > > 2. The current implementation doesn't address serialization of UDFs. > >> > > > >> > > I'm not sure a separate ExecNode would be necessary. So far we've > >> > > implemented UDFs at the kernel function level and I think we can > >> > > continue to do that, even if we are calling out of process workers. > >> > > > >> > > [1] https://github.com/apache/arrow/pull/12590 > >> > > > >> > > On Wed, May 4, 2022 at 6:12 AM Li Jin <ice.xell...@gmail.com> wrote: > >> > > > > >> > > > Hello, > >> > > > > >> > > > I have a somewhat controversial idea to introduce a "bridge" > >> solution for > >> > > > Python UDFs in Arrow Compute and have write up my thoughts in this > >> > > proposal: > >> > > > > >> > > > > >> > > > >> https://docs.google.com/document/d/1s7Gchq_LoNuiZO5bHq9PZx9RdoCWSavuS58KrTYXVMU/edit?usp=sharing > >> > > > > >> > > > I am curious to hear what the community thinks about this. (I am > >> ready to > >> > > > take criticism :) ) > >> > > > > >> > > > I wrote this in just 1-2 hours so I'm happy to explain anything > >> that is > >> > > > unclear. > >> > > > > >> > > > Appreciate your feedback, > >> > > > Li > >> > > > >> > >