Hi Jingsong,

Thanks a lot for the valuable feedback.

1. The configurations "python.fn-execution.bundle.size" and 
"python.fn-execution.arrow.batch.size" are used for separate purposes and I 
think they are both needed. If they are unified, the Python operator has to 
wait the execution results of the previous batch of elements before processing 
the next batch. This means that the Python UDF execution can not be pipelined 
between batches. With separate configuration, there will be no such problems.
2. It means that the Java operator will convert input elements to Arrow memory 
format and then send them to the Python worker, vice verse. Regarding to the 
zero-copy benefits provided by Arrow, we can gain them automatically using 
Arrow.
3. Good point! As all the classes of Python module is written in Java and it's 
not suggested to introduce new Scala classes, so I guess it's not easy to do so 
right now. But I think this is definitely a good improvement we can do in the 
future.
4. You're right and we will add a series of Arrow ColumnVectors for each type 
supported.

Thanks,
Dian

> 在 2020年2月5日,下午4:57,Jingsong Li <jingsongl...@gmail.com> 写道:
> 
> Hi Dian,
> 
> +1 for this, thanks driving.
> Documentation looks very good. I can imagine a huge performance improvement
> and better integration to other Python libraries.
> 
> A few thoughts:
> - About data split: "python.fn-execution.arrow.batch.size", can we unify it
> with "python.fn-execution.bundle.size"?
> - Use of Apache Arrow as the exchange format: Do you mean Arrow support
> zero-copy between Java and Python?
> - ArrowFieldWriter seems we can implement it by code generation. But it is
> OK to initial version with virtual function call.
> - ColumnarRow for vectorization reading seems that we need implement
> ArrowColumnVectors.
> 
> Best,
> Jingsong Lee
> 
> On Wed, Feb 5, 2020 at 12:45 PM dianfu <dia...@apache.org> wrote:
> 
>> Hi all,
>> 
>> Scalar Python UDF has already been supported in the coming release 1.10
>> (FLIP-58[1]). It operates one row at a time. It works in the way that the
>> Java operator serializes one input row to bytes and sends them to the
>> Python worker; the Python worker deserializes the input row and evaluates
>> the Python UDF with it; the result row is serialized and sent back to the
>> Java operator.
>> 
>> It suffers from the following problems:
>> 1) High serialization/deserialization overhead
>> 2) It’s difficult to leverage the popular Python libraries used by data
>> scientists, such as Pandas, Numpy, etc which provide high performance data
>> structure and functions.
>> 
>> Jincheng and I have discussed offline and we want to introduce vectorized
>> Python UDF to address the above problems. This feature has also been
>> mentioned in the discussion thread about the Python API plan[2]. For
>> vectorized Python UDF, a batch of rows are transferred between JVM and
>> Python VM in columnar format. The batch of rows will be converted to a
>> collection of Pandas.Series and given to the vectorized Python UDF which
>> could then leverage the popular Python libraries such as Pandas, Numpy, etc
>> for the Python UDF implementation.
>> 
>> Please refer the design doc[3] for more details and welcome any feedback.
>> 
>> Regards,
>> Dian
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
>> [3]
>> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
>> 
>> 
> 
> -- 
> Best, Jingsong Lee

Reply via email to