> However, if I
> understand correctly, the UDF implemented in the PR above are still
> "composition of existing C++ kernels" instead of
> "arbitrary pandas/numpy" code, so it kind of resolves a
> different problem IMO.

That is not a correct understanding, though it is an understandable
one as we don't have any examples of anything else at the moment (the
tests use pyarrow compute simply to avoid requiring additional
dependencies).  The implementation we have today requires that you
start with pyarrow.Array and end with pyarrow.Array.  However, there
are no rules about what you can do in between.  So if you want to
convert the array to pandas or numpy that's perfectly fine (might be
good to have some cookbook examples of this).  This should work with
the latest master:

```
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds

def pandas_rank(ctx, arr):
    series = arr.to_pandas()
    rank = series.rank()
    return pa.array(rank)

function_doc = {
      'summary': 'Compute rank of array using pandas',
      'description': "Compute rank of array using pandas (I'm a description)"
  }

pc.register_scalar_function(pandas_rank,
                            'pandas_rank',
                            function_doc,
                            {'vals': pa.int64()},
                            # pandas rank returns double?
                            pa.float64())

table = pa.Table.from_pydict({'values': [3, 1, 8, 5, 4, 9]})
dataset = ds.dataset(table)
# TODO (ARROW-16475) use something better than _call
projection_expr = pc.Expression._call('pandas_rank', [ds.field('values')])
print(dataset.to_table(columns={'rank': projection_expr}))
# pyarrow.Table
# rank: double
# ----
# rank: [[2,1,5,4,3,6]]
```

In fact, you could even implement the separate worker server
completely in python using the existing mechanism today (I don't know
if that would be a good idea or not).

> Sorry I am not sure what your question is - do you mean the
> proposal here doesn't address serialization of UDFs and you
> want to ask about that?

Sorry, I was just pointing out that the PR that has already been
implemented did not add any kind of support for this.

> I am not sure either. This can perhaps be a kernel instead of a node. The
> reason that I put this a node right now is because I think this would
> involves life cycle management for python workers so
> it doesn't feel like a "kernel" to me. But I am not strong on that.

The lifecycle management for the workers could probably extend beyond
any single plan.  I would imagine it is something you want to start at
the beginning of your application and then tear down only at the end
but I don't know your use case as well so I could be wrong.  The UDF
then would only be calling into some kind of service.

The challenge with a UDF node is that the main benefit of registering
UDFs is to enable them for use in existing nodes.  For example, the
scalar UDFs we have today can be used in the project node and the
filter node.  If you were to create a separate UDF node you would need
a "project UDF node" and a "filter UDF node".

On Wed, May 4, 2022 at 10:55 AM Li Jin <ice.xell...@gmail.com> wrote:
>
> Weston - Yes I have seen the pull request above (very cool). However, if I
> understand correctly, the UDF implemented in the PR above are still
> "composition of existing C++ kernels" instead of "arbitrary pandas/numpy"
> code, so it kind of resolves a different problem IMO.
>
> For example, if I want to compute "normal distribution percentile rank", I
> cannot quite use the scalar UDF above to do that (we would need to add a
> kernel similar to scipy.stats.norm.ppf).
>
>  1. Why do you want to run these UDFs in a separate process?  Is this
> for robustness (if the UDF crashes the process recovery is easier) or
> performance (potentially working around the GIL if you have multiple
> python processes)?  Have you given much thought to how the data would
> travel back and forth between the processes?  For example, via sockets
> (maybe flight) or shared memory?
>
> The main reason that I want to run UDFs in a separate process is that this
> is the only way that I think can run any arbitrary pandas/numpy code
> (assuming these are stateless pure functions).
> It does have side benefits like it won't crash the compute engine if the
> UDF is buggy. With trade off being less performant and involves managing
> additional python worker process.
> I don't have much though on data communication channels, but I think sth
> like a local socket should work reasonably well, and bonus if we can have
> less copy / shared memory.
> From my experience the overhead that is OK considering these UDFs often
> involves more complicated math or relation computations (otherwise it
> doesn't need to be UDF at all).
>
> "2. The current implementation doesn't address serialization of UDFs."
> Sorry I am not sure what your question is - do you mean the proposal here
> doesn't address serialization of UDFs and you want to ask about that?
>
> "I'm not sure a separate ExecNode would be necessary.  So far we've
> implemented UDFs at the kernel function level and I think we can
> continue to do that, even if we are calling out of process workers."
>
> I am not sure either. This can perhaps be a kernel instead of a node. The
> reason that I put this a node right now is because I think this would
> involves life cycle management for python workers so
> it doesn't feel like a "kernel" to me. But I am not strong on that.
>
>
>
> On Wed, May 4, 2022 at 4:06 PM Weston Pace <weston.p...@gmail.com> wrote:
>
> > Hi Li, have you seen the python UDF prototype that we just recently
> > merged into the execution engine at [1]?  It adds support for scalar
> > UDFs.
> >
> > Comparing your proposal to what we've done so far I would ask:
> >
> >  1. Why do you want to run these UDFs in a separate process?  Is this
> > for robustness (if the UDF crashes the process recovery is easier) or
> > performance (potentially working around the GIL if you have multiple
> > python processes)?  Have you given much thought to how the data would
> > travel back and forth between the processes?  For example, via sockets
> > (maybe flight) or shared memory?
> >  2. The current implementation doesn't address serialization of UDFs.
> >
> > I'm not sure a separate ExecNode would be necessary.  So far we've
> > implemented UDFs at the kernel function level and I think we can
> > continue to do that, even if we are calling out of process workers.
> >
> > [1] https://github.com/apache/arrow/pull/12590
> >
> > On Wed, May 4, 2022 at 6:12 AM Li Jin <ice.xell...@gmail.com> wrote:
> > >
> > > Hello,
> > >
> > > I have a somewhat controversial idea to introduce a "bridge" solution for
> > > Python UDFs in Arrow Compute and have write up my thoughts in this
> > proposal:
> > >
> > >
> > https://docs.google.com/document/d/1s7Gchq_LoNuiZO5bHq9PZx9RdoCWSavuS58KrTYXVMU/edit?usp=sharing
> > >
> > > I am curious to hear what the community thinks about this. (I am ready to
> > > take criticism :) )
> > >
> > > I wrote this in just 1-2 hours so I'm happy to explain anything that is
> > > unclear.
> > >
> > > Appreciate your feedback,
> > > Li
> >

Reply via email to