Hi Dian, Thanks a lot for bringing up the discussion!
It is great to see the Pandas UDFs feature is going to be introduced. I think this would improve the performance and also the usability of user-defined functions (UDFs) in Python. One little suggestion: maybe it would be nice if we can add some performance explanation in the document? (I just very curious:)) +1 to create a FLIP for this big enhancement. Best, Hequn On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <sunjincheng...@gmail.com> wrote: > Hi Dian, > > Thanks for bring up this discussion. This is very important for the > ecological of PyFlink. Add support Pandas greatly enriches the available > UDF library of PyFlink and greatly improves the usability of PyFlink! > > +1 for Support scalar vectorized Python UDF. > > I think we should to create a FLIP for this big enhancements. :) > > What do you think? > > Best, > Jincheng > > > > dianfu <dia...@apache.org> 于2020年2月5日周三 下午6:01写道: > > > Hi Jingsong, > > > > Thanks a lot for the valuable feedback. > > > > 1. The configurations "python.fn-execution.bundle.size" and > > "python.fn-execution.arrow.batch.size" are used for separate purposes > and I > > think they are both needed. If they are unified, the Python operator has > to > > wait the execution results of the previous batch of elements before > > processing the next batch. This means that the Python UDF execution can > not > > be pipelined between batches. With separate configuration, there will be > no > > such problems. > > 2. It means that the Java operator will convert input elements to Arrow > > memory format and then send them to the Python worker, vice verse. > > Regarding to the zero-copy benefits provided by Arrow, we can gain them > > automatically using Arrow. > > 3. Good point! As all the classes of Python module is written in Java and > > it's not suggested to introduce new Scala classes, so I guess it's not > easy > > to do so right now. But I think this is definitely a good improvement we > > can do in the future. > > 4. You're right and we will add a series of Arrow ColumnVectors for each > > type supported. > > > > Thanks, > > Dian > > > > > 在 2020年2月5日,下午4:57,Jingsong Li <jingsongl...@gmail.com> 写道: > > > > > > Hi Dian, > > > > > > +1 for this, thanks driving. > > > Documentation looks very good. I can imagine a huge performance > > improvement > > > and better integration to other Python libraries. > > > > > > A few thoughts: > > > - About data split: "python.fn-execution.arrow.batch.size", can we > unify > > it > > > with "python.fn-execution.bundle.size"? > > > - Use of Apache Arrow as the exchange format: Do you mean Arrow support > > > zero-copy between Java and Python? > > > - ArrowFieldWriter seems we can implement it by code generation. But it > > is > > > OK to initial version with virtual function call. > > > - ColumnarRow for vectorization reading seems that we need implement > > > ArrowColumnVectors. > > > > > > Best, > > > Jingsong Lee > > > > > > On Wed, Feb 5, 2020 at 12:45 PM dianfu <dia...@apache.org> wrote: > > > > > >> Hi all, > > >> > > >> Scalar Python UDF has already been supported in the coming release > 1.10 > > >> (FLIP-58[1]). It operates one row at a time. It works in the way that > > the > > >> Java operator serializes one input row to bytes and sends them to the > > >> Python worker; the Python worker deserializes the input row and > > evaluates > > >> the Python UDF with it; the result row is serialized and sent back to > > the > > >> Java operator. > > >> > > >> It suffers from the following problems: > > >> 1) High serialization/deserialization overhead > > >> 2) It’s difficult to leverage the popular Python libraries used by > data > > >> scientists, such as Pandas, Numpy, etc which provide high performance > > data > > >> structure and functions. > > >> > > >> Jincheng and I have discussed offline and we want to introduce > > vectorized > > >> Python UDF to address the above problems. This feature has also been > > >> mentioned in the discussion thread about the Python API plan[2]. For > > >> vectorized Python UDF, a batch of rows are transferred between JVM and > > >> Python VM in columnar format. The batch of rows will be converted to a > > >> collection of Pandas.Series and given to the vectorized Python UDF > which > > >> could then leverage the popular Python libraries such as Pandas, > Numpy, > > etc > > >> for the Python UDF implementation. > > >> > > >> Please refer the design doc[3] for more details and welcome any > > feedback. > > >> > > >> Regards, > > >> Dian > > >> > > >> [1] > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table > > >> [2] > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html > > >> [3] > > >> > > > https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd > > >> > > >> > > > > > > -- > > > Best, Jingsong Lee > > > > >