"def pandas_rank(ctx, arr):
    series = arr.to_pandas()
    rank = series.rank()
    return pa.array(rank)
"

Oh nice! I didn't get that from the original PR and does look this is
closer to the problem I am trying to solve. At this point I will understand
more about that PR and see if what I proposed is even needed :)

Are there any high level docs/writings about how this works? (If not, I can
always read the PR but just hoping there is something quicker to
understand).

Maybe a couple of high level questions that could help me understand?
(1) Where and how are the UDF executed? i.e. (Separate Python worker
process? The user Python process? Or somewhere else?)
(2) How is the data passed from/to between C++ and Python? (flight? sth
else?)
(3) Is there a way to use this without pyarrow compute API? Currently we
are planning to go from ibis -> subtrait -> c++ compute and we don't really
have pyarrow compute API in the picture.

Thanks!
Li
Li

On Wed, May 4, 2022 at 10:59 PM Weston Pace <weston.p...@gmail.com> wrote:

> > However, if I
> > understand correctly, the UDF implemented in the PR above are still
> > "composition of existing C++ kernels" instead of
> > "arbitrary pandas/numpy" code, so it kind of resolves a
> > different problem IMO.
>
> That is not a correct understanding, though it is an understandable
> one as we don't have any examples of anything else at the moment (the
> tests use pyarrow compute simply to avoid requiring additional
> dependencies).  The implementation we have today requires that you
> start with pyarrow.Array and end with pyarrow.Array.  However, there
> are no rules about what you can do in between.  So if you want to
> convert the array to pandas or numpy that's perfectly fine (might be
> good to have some cookbook examples of this).  This should work with
> the latest master:
>
> ```
> import pandas as pd
> import pyarrow as pa
> import pyarrow.compute as pc
> import pyarrow.dataset as ds
>
> def pandas_rank(ctx, arr):
>     series = arr.to_pandas()
>     rank = series.rank()
>     return pa.array(rank)
>
> function_doc = {
>       'summary': 'Compute rank of array using pandas',
>       'description': "Compute rank of array using pandas (I'm a
> description)"
>   }
>
> pc.register_scalar_function(pandas_rank,
>                             'pandas_rank',
>                             function_doc,
>                             {'vals': pa.int64()},
>                             # pandas rank returns double?
>                             pa.float64())
>
> table = pa.Table.from_pydict({'values': [3, 1, 8, 5, 4, 9]})
> dataset = ds.dataset(table)
> # TODO (ARROW-16475) use something better than _call
> projection_expr = pc.Expression._call('pandas_rank', [ds.field('values')])
> print(dataset.to_table(columns={'rank': projection_expr}))
> # pyarrow.Table
> # rank: double
> # ----
> # rank: [[2,1,5,4,3,6]]
> ```
>
> In fact, you could even implement the separate worker server
> completely in python using the existing mechanism today (I don't know
> if that would be a good idea or not).
>
> > Sorry I am not sure what your question is - do you mean the
> > proposal here doesn't address serialization of UDFs and you
> > want to ask about that?
>
> Sorry, I was just pointing out that the PR that has already been
> implemented did not add any kind of support for this.
>
> > I am not sure either. This can perhaps be a kernel instead of a node. The
> > reason that I put this a node right now is because I think this would
> > involves life cycle management for python workers so
> > it doesn't feel like a "kernel" to me. But I am not strong on that.
>
> The lifecycle management for the workers could probably extend beyond
> any single plan.  I would imagine it is something you want to start at
> the beginning of your application and then tear down only at the end
> but I don't know your use case as well so I could be wrong.  The UDF
> then would only be calling into some kind of service.
>
> The challenge with a UDF node is that the main benefit of registering
> UDFs is to enable them for use in existing nodes.  For example, the
> scalar UDFs we have today can be used in the project node and the
> filter node.  If you were to create a separate UDF node you would need
> a "project UDF node" and a "filter UDF node".
>
> On Wed, May 4, 2022 at 10:55 AM Li Jin <ice.xell...@gmail.com> wrote:
> >
> > Weston - Yes I have seen the pull request above (very cool). However, if
> I
> > understand correctly, the UDF implemented in the PR above are still
> > "composition of existing C++ kernels" instead of "arbitrary pandas/numpy"
> > code, so it kind of resolves a different problem IMO.
> >
> > For example, if I want to compute "normal distribution percentile rank",
> I
> > cannot quite use the scalar UDF above to do that (we would need to add a
> > kernel similar to scipy.stats.norm.ppf).
> >
> >  1. Why do you want to run these UDFs in a separate process?  Is this
> > for robustness (if the UDF crashes the process recovery is easier) or
> > performance (potentially working around the GIL if you have multiple
> > python processes)?  Have you given much thought to how the data would
> > travel back and forth between the processes?  For example, via sockets
> > (maybe flight) or shared memory?
> >
> > The main reason that I want to run UDFs in a separate process is that
> this
> > is the only way that I think can run any arbitrary pandas/numpy code
> > (assuming these are stateless pure functions).
> > It does have side benefits like it won't crash the compute engine if the
> > UDF is buggy. With trade off being less performant and involves managing
> > additional python worker process.
> > I don't have much though on data communication channels, but I think sth
> > like a local socket should work reasonably well, and bonus if we can have
> > less copy / shared memory.
> > From my experience the overhead that is OK considering these UDFs often
> > involves more complicated math or relation computations (otherwise it
> > doesn't need to be UDF at all).
> >
> > "2. The current implementation doesn't address serialization of UDFs."
> > Sorry I am not sure what your question is - do you mean the proposal here
> > doesn't address serialization of UDFs and you want to ask about that?
> >
> > "I'm not sure a separate ExecNode would be necessary.  So far we've
> > implemented UDFs at the kernel function level and I think we can
> > continue to do that, even if we are calling out of process workers."
> >
> > I am not sure either. This can perhaps be a kernel instead of a node. The
> > reason that I put this a node right now is because I think this would
> > involves life cycle management for python workers so
> > it doesn't feel like a "kernel" to me. But I am not strong on that.
> >
> >
> >
> > On Wed, May 4, 2022 at 4:06 PM Weston Pace <weston.p...@gmail.com>
> wrote:
> >
> > > Hi Li, have you seen the python UDF prototype that we just recently
> > > merged into the execution engine at [1]?  It adds support for scalar
> > > UDFs.
> > >
> > > Comparing your proposal to what we've done so far I would ask:
> > >
> > >  1. Why do you want to run these UDFs in a separate process?  Is this
> > > for robustness (if the UDF crashes the process recovery is easier) or
> > > performance (potentially working around the GIL if you have multiple
> > > python processes)?  Have you given much thought to how the data would
> > > travel back and forth between the processes?  For example, via sockets
> > > (maybe flight) or shared memory?
> > >  2. The current implementation doesn't address serialization of UDFs.
> > >
> > > I'm not sure a separate ExecNode would be necessary.  So far we've
> > > implemented UDFs at the kernel function level and I think we can
> > > continue to do that, even if we are calling out of process workers.
> > >
> > > [1] https://github.com/apache/arrow/pull/12590
> > >
> > > On Wed, May 4, 2022 at 6:12 AM Li Jin <ice.xell...@gmail.com> wrote:
> > > >
> > > > Hello,
> > > >
> > > > I have a somewhat controversial idea to introduce a "bridge"
> solution for
> > > > Python UDFs in Arrow Compute and have write up my thoughts in this
> > > proposal:
> > > >
> > > >
> > >
> https://docs.google.com/document/d/1s7Gchq_LoNuiZO5bHq9PZx9RdoCWSavuS58KrTYXVMU/edit?usp=sharing
> > > >
> > > > I am curious to hear what the community thinks about this. (I am
> ready to
> > > > take criticism :) )
> > > >
> > > > I wrote this in just 1-2 hours so I'm happy to explain anything that
> is
> > > > unclear.
> > > >
> > > > Appreciate your feedback,
> > > > Li
> > >
>

Reply via email to