Hi all, Thanks you all participating this discussion and sharing your thoughts. It seems that we have reached consensus on the design now. I will start a VOTE thread if there are no other feedbacks.
Thanks, Dian On Tue, Feb 11, 2020 at 10:23 AM Dian Fu <dian0511...@gmail.com> wrote: > Hi Jingsong, > > You're right. I have updated the FLIP which reflects this. > > Thanks, > Dian > > > 在 2020年2月11日,上午10:03,Jingsong Li <jingsongl...@gmail.com> 写道: > > > > Hi Dian and Jincheng, > > > > Thanks for your explanation. Think again. Maybe most of users don't want > to > > modify this parameters. > > We all realize that "batch.size" should be a larger value, so > "bundle.size" > > must also be increased. Now the default value of "bundle.size" is only > 1000. > > I think you can update design to provide meaningful default value for > > "batch.size" and "bundle.size". > > > > Best, > > Jingsong Lee > > > > On Mon, Feb 10, 2020 at 4:36 PM Dian Fu <dian0511...@gmail.com> wrote: > > > >> Hi Jincheng, Hequn & Jingsong, > >> > >> Thanks a lot for your suggestions. I have created FLIP-97[1] for this > >> feature. > >> > >>> One little suggestion: maybe it would be nice if we can add some > >> performance explanation in the document? (I just very curious:)) > >> Thanks for the suggestion. I have updated the design doc in the > >> "BackGround" section about where the performance gains could be got > from. > >> > >>> It seems that a batch should always in a bundle. Bundle size should > >> always > >> bigger than batch size. (if a batch can not cross bundle). > >> Can you explain this relationship to the document? > >> I have updated the design doc explaining more about these two > >> configurations. > >> > >>> In the batch world, vectorization batch size is about 1024+. What do > you > >> think about the default value of "batch"? > >> Is there any link about where this value comes from? I have performed a > >> simple test for Pandas UDF which performs the simple +1 operation. The > >> performance is best when the batch size is set to 5000. I think it > depends > >> on the data type of each column, the functionality the Pandas UDF does, > >> etc. However I agree with you that we could give a meaningful default > value > >> for the "batch" size which works in most scenarios. > >> > >>> Can we only configure one parameter and calculate another > automatically? > >> For example, if we just want to "pipeline", "bundle.size" is twice as > much > >> as "batch.size", is this work? > >> I agree with Jincheng that this is not feasible. I think that giving an > >> meaningful default value for the "batch.size" which works in most > scenarios > >> is enough. What's your thought? > >> > >> Thanks, > >> Dian > >> > >> [1] > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink > >> > >> > >> On Mon, Feb 10, 2020 at 4:25 PM jincheng sun <sunjincheng...@gmail.com> > >> wrote: > >> > >>> Hi Jingsong, > >>> > >>> Thanks for your feedback! I would like to share my thoughts regarding > the > >>> follows question: > >>> > >>>>> - Can we only configure one parameter and calculate another > >>> automatically? For example, if we just want to "pipeline", > "bundle.size" > >> is > >>> twice as much as "batch.size", is this work? > >>> > >>> I don't think this works. These two configurations are used for > different > >>> purposes and there is no direct relationship between them and so I > guess > >> we > >>> cannot infer a configuration from the other configuration. > >>> > >>> Best, > >>> Jincheng > >>> > >>> > >>> Jingsong Li <jingsongl...@gmail.com> 于2020年2月10日周一 下午1:53写道: > >>> > >>>> Thanks Dian for your reply. > >>>> > >>>> +1 to create a FLIP too. > >>>> > >>>> About "python.fn-execution.bundle.size" and > >>>> "python.fn-execution.arrow.batch.size", I got what are you mean about > >>>> "pipeline". I agree. > >>>> It seems that a batch should always in a bundle. Bundle size should > >>> always > >>>> bigger than batch size. (if a batch can not cross bundle). > >>>> Can you explain this relationship to the document? > >>>> > >>>> I think default value is a very important thing, we can discuss: > >>>> - In the batch world, vectorization batch size is about 1024+. What do > >>> you > >>>> think about the default value of "batch"? > >>>> - Can we only configure one parameter and calculate another > >>> automatically? > >>>> For example, if we just want to "pipeline", "bundle.size" is twice as > >>> much > >>>> as "batch.size", is this work? > >>>> > >>>> Best, > >>>> Jingsong Lee > >>>> > >>>> On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <he...@apache.org> > wrote: > >>>> > >>>>> Hi Dian, > >>>>> > >>>>> Thanks a lot for bringing up the discussion! > >>>>> > >>>>> It is great to see the Pandas UDFs feature is going to be > >> introduced. I > >>>>> think this would improve the performance and also the usability of > >>>>> user-defined functions (UDFs) in Python. > >>>>> One little suggestion: maybe it would be nice if we can add some > >>>>> performance explanation in the document? (I just very curious:)) > >>>>> > >>>>> +1 to create a FLIP for this big enhancement. > >>>>> > >>>>> Best, > >>>>> Hequn > >>>>> > >>>>> On Mon, Feb 10, 2020 at 11:15 AM jincheng sun < > >>> sunjincheng...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Dian, > >>>>>> > >>>>>> Thanks for bring up this discussion. This is very important for the > >>>>>> ecological of PyFlink. Add support Pandas greatly enriches the > >>>> available > >>>>>> UDF library of PyFlink and greatly improves the usability of > >> PyFlink! > >>>>>> > >>>>>> +1 for Support scalar vectorized Python UDF. > >>>>>> > >>>>>> I think we should to create a FLIP for this big enhancements. :) > >>>>>> > >>>>>> What do you think? > >>>>>> > >>>>>> Best, > >>>>>> Jincheng > >>>>>> > >>>>>> > >>>>>> > >>>>>> dianfu <dia...@apache.org> 于2020年2月5日周三 下午6:01写道: > >>>>>> > >>>>>>> Hi Jingsong, > >>>>>>> > >>>>>>> Thanks a lot for the valuable feedback. > >>>>>>> > >>>>>>> 1. The configurations "python.fn-execution.bundle.size" and > >>>>>>> "python.fn-execution.arrow.batch.size" are used for separate > >>> purposes > >>>>>> and I > >>>>>>> think they are both needed. If they are unified, the Python > >>> operator > >>>>> has > >>>>>> to > >>>>>>> wait the execution results of the previous batch of elements > >> before > >>>>>>> processing the next batch. This means that the Python UDF > >> execution > >>>> can > >>>>>> not > >>>>>>> be pipelined between batches. With separate configuration, there > >>> will > >>>>> be > >>>>>> no > >>>>>>> such problems. > >>>>>>> 2. It means that the Java operator will convert input elements to > >>>> Arrow > >>>>>>> memory format and then send them to the Python worker, vice > >> verse. > >>>>>>> Regarding to the zero-copy benefits provided by Arrow, we can > >> gain > >>>> them > >>>>>>> automatically using Arrow. > >>>>>>> 3. Good point! As all the classes of Python module is written in > >>> Java > >>>>> and > >>>>>>> it's not suggested to introduce new Scala classes, so I guess > >> it's > >>>> not > >>>>>> easy > >>>>>>> to do so right now. But I think this is definitely a good > >>> improvement > >>>>> we > >>>>>>> can do in the future. > >>>>>>> 4. You're right and we will add a series of Arrow ColumnVectors > >> for > >>>>> each > >>>>>>> type supported. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Dian > >>>>>>> > >>>>>>>> 在 2020年2月5日,下午4:57,Jingsong Li <jingsongl...@gmail.com> 写道: > >>>>>>>> > >>>>>>>> Hi Dian, > >>>>>>>> > >>>>>>>> +1 for this, thanks driving. > >>>>>>>> Documentation looks very good. I can imagine a huge performance > >>>>>>> improvement > >>>>>>>> and better integration to other Python libraries. > >>>>>>>> > >>>>>>>> A few thoughts: > >>>>>>>> - About data split: "python.fn-execution.arrow.batch.size", can > >>> we > >>>>>> unify > >>>>>>> it > >>>>>>>> with "python.fn-execution.bundle.size"? > >>>>>>>> - Use of Apache Arrow as the exchange format: Do you mean Arrow > >>>>> support > >>>>>>>> zero-copy between Java and Python? > >>>>>>>> - ArrowFieldWriter seems we can implement it by code > >> generation. > >>>> But > >>>>> it > >>>>>>> is > >>>>>>>> OK to initial version with virtual function call. > >>>>>>>> - ColumnarRow for vectorization reading seems that we need > >>>> implement > >>>>>>>> ArrowColumnVectors. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Jingsong Lee > >>>>>>>> > >>>>>>>> On Wed, Feb 5, 2020 at 12:45 PM dianfu <dia...@apache.org> > >>> wrote: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> Scalar Python UDF has already been supported in the coming > >>> release > >>>>>> 1.10 > >>>>>>>>> (FLIP-58[1]). It operates one row at a time. It works in the > >> way > >>>>> that > >>>>>>> the > >>>>>>>>> Java operator serializes one input row to bytes and sends them > >>> to > >>>>> the > >>>>>>>>> Python worker; the Python worker deserializes the input row > >> and > >>>>>>> evaluates > >>>>>>>>> the Python UDF with it; the result row is serialized and sent > >>> back > >>>>> to > >>>>>>> the > >>>>>>>>> Java operator. > >>>>>>>>> > >>>>>>>>> It suffers from the following problems: > >>>>>>>>> 1) High serialization/deserialization overhead > >>>>>>>>> 2) It’s difficult to leverage the popular Python libraries > >> used > >>> by > >>>>>> data > >>>>>>>>> scientists, such as Pandas, Numpy, etc which provide high > >>>>> performance > >>>>>>> data > >>>>>>>>> structure and functions. > >>>>>>>>> > >>>>>>>>> Jincheng and I have discussed offline and we want to introduce > >>>>>>> vectorized > >>>>>>>>> Python UDF to address the above problems. This feature has > >> also > >>>> been > >>>>>>>>> mentioned in the discussion thread about the Python API > >> plan[2]. > >>>> For > >>>>>>>>> vectorized Python UDF, a batch of rows are transferred between > >>> JVM > >>>>> and > >>>>>>>>> Python VM in columnar format. The batch of rows will be > >>> converted > >>>>> to a > >>>>>>>>> collection of Pandas.Series and given to the vectorized Python > >>> UDF > >>>>>> which > >>>>>>>>> could then leverage the popular Python libraries such as > >> Pandas, > >>>>>> Numpy, > >>>>>>> etc > >>>>>>>>> for the Python UDF implementation. > >>>>>>>>> > >>>>>>>>> Please refer the design doc[3] for more details and welcome > >> any > >>>>>>> feedback. > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Dian > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table > >>>>>>>>> [2] > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html > >>>>>>>>> [3] > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Best, Jingsong Lee > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> Best, Jingsong Lee > >>>> > >>> > >> > > > > > > -- > > Best, Jingsong Lee > >