After reading the above PR, I think I understand the approach a bit more now.
If I understand this correctly, the above UDF functionality is similar to what I have in mind. The main difference seems to be "where and how are the UDF executed" (1) In the PR above, the UDF is passed to the Compute engine as a function pointer and therefore doesn't need any serialization of the user function. And the Compute engine calls back into the user Python interpreter with that function pointer, along with input data. (2) In the proposal, the UDF is passed to the Compute engine as deserialized bytes. And the Compute engine starts the extra worker processes, passes the deserialized bytes and data to it. I am thinking one limitation of (1) is that this is not very compatible with the ibis -> substrait -> compute engine approach. If we want to use substrait then I think we need to find a way to serialize and deserialize the UDF pass through substrait (because we cannot pass function pointers if substrait is involved). Prior art of this is how PySpark does it - it basically cloudpickle the function, + some python includes and the python interpreter path. This works reasonably well for us even in a distributed environment - we do have to make sure the python interpreter exists on all workers and are the same.Another limitation that I can think of is how much data can the user Python interpreter process - I am not sure how currently parallel processing is considered in the above PR but my hunch is there could be perf issues with just a single Python process to run the UDF. I think that (2) can perhaps complement the two limitations above. Thoughts? Li On Thu, May 5, 2022 at 9:51 AM Li Jin <ice.xell...@gmail.com> wrote: > "def pandas_rank(ctx, arr): > series = arr.to_pandas() > rank = series.rank() > return pa.array(rank) > " > > Oh nice! I didn't get that from the original PR and does look this is > closer to the problem I am trying to solve. At this point I will understand > more about that PR and see if what I proposed is even needed :) > > Are there any high level docs/writings about how this works? (If not, I > can always read the PR but just hoping there is something quicker to > understand). > > Maybe a couple of high level questions that could help me understand? > (1) Where and how are the UDF executed? i.e. (Separate Python worker > process? The user Python process? Or somewhere else?) > (2) How is the data passed from/to between C++ and Python? (flight? sth > else?) > (3) Is there a way to use this without pyarrow compute API? Currently we > are planning to go from ibis -> subtrait -> c++ compute and we don't really > have pyarrow compute API in the picture. > > Thanks! > Li > Li > > On Wed, May 4, 2022 at 10:59 PM Weston Pace <weston.p...@gmail.com> wrote: > >> > However, if I >> > understand correctly, the UDF implemented in the PR above are still >> > "composition of existing C++ kernels" instead of >> > "arbitrary pandas/numpy" code, so it kind of resolves a >> > different problem IMO. >> >> That is not a correct understanding, though it is an understandable >> one as we don't have any examples of anything else at the moment (the >> tests use pyarrow compute simply to avoid requiring additional >> dependencies). The implementation we have today requires that you >> start with pyarrow.Array and end with pyarrow.Array. However, there >> are no rules about what you can do in between. So if you want to >> convert the array to pandas or numpy that's perfectly fine (might be >> good to have some cookbook examples of this). This should work with >> the latest master: >> >> ``` >> import pandas as pd >> import pyarrow as pa >> import pyarrow.compute as pc >> import pyarrow.dataset as ds >> >> def pandas_rank(ctx, arr): >> series = arr.to_pandas() >> rank = series.rank() >> return pa.array(rank) >> >> function_doc = { >> 'summary': 'Compute rank of array using pandas', >> 'description': "Compute rank of array using pandas (I'm a >> description)" >> } >> >> pc.register_scalar_function(pandas_rank, >> 'pandas_rank', >> function_doc, >> {'vals': pa.int64()}, >> # pandas rank returns double? >> pa.float64()) >> >> table = pa.Table.from_pydict({'values': [3, 1, 8, 5, 4, 9]}) >> dataset = ds.dataset(table) >> # TODO (ARROW-16475) use something better than _call >> projection_expr = pc.Expression._call('pandas_rank', [ds.field('values')]) >> print(dataset.to_table(columns={'rank': projection_expr})) >> # pyarrow.Table >> # rank: double >> # ---- >> # rank: [[2,1,5,4,3,6]] >> ``` >> >> In fact, you could even implement the separate worker server >> completely in python using the existing mechanism today (I don't know >> if that would be a good idea or not). >> >> > Sorry I am not sure what your question is - do you mean the >> > proposal here doesn't address serialization of UDFs and you >> > want to ask about that? >> >> Sorry, I was just pointing out that the PR that has already been >> implemented did not add any kind of support for this. >> >> > I am not sure either. This can perhaps be a kernel instead of a node. >> The >> > reason that I put this a node right now is because I think this would >> > involves life cycle management for python workers so >> > it doesn't feel like a "kernel" to me. But I am not strong on that. >> >> The lifecycle management for the workers could probably extend beyond >> any single plan. I would imagine it is something you want to start at >> the beginning of your application and then tear down only at the end >> but I don't know your use case as well so I could be wrong. The UDF >> then would only be calling into some kind of service. >> >> The challenge with a UDF node is that the main benefit of registering >> UDFs is to enable them for use in existing nodes. For example, the >> scalar UDFs we have today can be used in the project node and the >> filter node. If you were to create a separate UDF node you would need >> a "project UDF node" and a "filter UDF node". >> >> On Wed, May 4, 2022 at 10:55 AM Li Jin <ice.xell...@gmail.com> wrote: >> > >> > Weston - Yes I have seen the pull request above (very cool). However, >> if I >> > understand correctly, the UDF implemented in the PR above are still >> > "composition of existing C++ kernels" instead of "arbitrary >> pandas/numpy" >> > code, so it kind of resolves a different problem IMO. >> > >> > For example, if I want to compute "normal distribution percentile >> rank", I >> > cannot quite use the scalar UDF above to do that (we would need to add a >> > kernel similar to scipy.stats.norm.ppf). >> > >> > 1. Why do you want to run these UDFs in a separate process? Is this >> > for robustness (if the UDF crashes the process recovery is easier) or >> > performance (potentially working around the GIL if you have multiple >> > python processes)? Have you given much thought to how the data would >> > travel back and forth between the processes? For example, via sockets >> > (maybe flight) or shared memory? >> > >> > The main reason that I want to run UDFs in a separate process is that >> this >> > is the only way that I think can run any arbitrary pandas/numpy code >> > (assuming these are stateless pure functions). >> > It does have side benefits like it won't crash the compute engine if the >> > UDF is buggy. With trade off being less performant and involves managing >> > additional python worker process. >> > I don't have much though on data communication channels, but I think sth >> > like a local socket should work reasonably well, and bonus if we can >> have >> > less copy / shared memory. >> > From my experience the overhead that is OK considering these UDFs often >> > involves more complicated math or relation computations (otherwise it >> > doesn't need to be UDF at all). >> > >> > "2. The current implementation doesn't address serialization of UDFs." >> > Sorry I am not sure what your question is - do you mean the proposal >> here >> > doesn't address serialization of UDFs and you want to ask about that? >> > >> > "I'm not sure a separate ExecNode would be necessary. So far we've >> > implemented UDFs at the kernel function level and I think we can >> > continue to do that, even if we are calling out of process workers." >> > >> > I am not sure either. This can perhaps be a kernel instead of a node. >> The >> > reason that I put this a node right now is because I think this would >> > involves life cycle management for python workers so >> > it doesn't feel like a "kernel" to me. But I am not strong on that. >> > >> > >> > >> > On Wed, May 4, 2022 at 4:06 PM Weston Pace <weston.p...@gmail.com> >> wrote: >> > >> > > Hi Li, have you seen the python UDF prototype that we just recently >> > > merged into the execution engine at [1]? It adds support for scalar >> > > UDFs. >> > > >> > > Comparing your proposal to what we've done so far I would ask: >> > > >> > > 1. Why do you want to run these UDFs in a separate process? Is this >> > > for robustness (if the UDF crashes the process recovery is easier) or >> > > performance (potentially working around the GIL if you have multiple >> > > python processes)? Have you given much thought to how the data would >> > > travel back and forth between the processes? For example, via sockets >> > > (maybe flight) or shared memory? >> > > 2. The current implementation doesn't address serialization of UDFs. >> > > >> > > I'm not sure a separate ExecNode would be necessary. So far we've >> > > implemented UDFs at the kernel function level and I think we can >> > > continue to do that, even if we are calling out of process workers. >> > > >> > > [1] https://github.com/apache/arrow/pull/12590 >> > > >> > > On Wed, May 4, 2022 at 6:12 AM Li Jin <ice.xell...@gmail.com> wrote: >> > > > >> > > > Hello, >> > > > >> > > > I have a somewhat controversial idea to introduce a "bridge" >> solution for >> > > > Python UDFs in Arrow Compute and have write up my thoughts in this >> > > proposal: >> > > > >> > > > >> > > >> https://docs.google.com/document/d/1s7Gchq_LoNuiZO5bHq9PZx9RdoCWSavuS58KrTYXVMU/edit?usp=sharing >> > > > >> > > > I am curious to hear what the community thinks about this. (I am >> ready to >> > > > take criticism :) ) >> > > > >> > > > I wrote this in just 1-2 hours so I'm happy to explain anything >> that is >> > > > unclear. >> > > > >> > > > Appreciate your feedback, >> > > > Li >> > > >> >