dianfu commented on a change in pull request #13421: URL: https://github.com/apache/flink/pull/13421#discussion_r492448420
########## File path: docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md ########## @@ -28,7 +28,7 @@ The performance of vectorized Python user-defined functions are usually much hig overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions. It shares the similar way as the [non-vectorized user-defined functions]({% link dev/python/table-api-users-guide/udfs/python_udfs.md %}) on how to define vectorized user-defined functions. -Users only need to add an extra parameter `udf_type="pandas"` in the decorator `udf` to mark it as a vectorized user-defined function. +Users only need to add an extra parameter `function_type="pandas"` in the decorator `udf` to mark it as a vectorized user-defined function. Review comment: I'm rethinking about the name. What about `func_type`? It's more short. ########## File path: flink-python/pyflink/table/udf.py ########## @@ -312,6 +422,54 @@ def _create_judtf(self): _get_python_env()) return j_table_function + def _create_delegate_function(self) -> UserDefinedFunction: + return DelegationTableFunction(self._func) + + +class UserDefinedAggregateFunctionWrapper(UserDefinedFunctionWrapper): + """ + Wrapper for Python user-defined aggregate function. + """ + def __init__(self, func, input_types, result_type, accumulator_type, function_type, + deterministic, name): + super(UserDefinedAggregateFunctionWrapper, self).__init__( + func, input_types, function_type, deterministic, name) + + if not isinstance(result_type, DataType): + raise TypeError( + "Invalid returnType: returnType should be DataType but is {}".format(result_type)) + if accumulator_type is not None and not isinstance(accumulator_type, DataType): + raise TypeError( + "Invalid accumulator_type: accumulator_type should be DataType but is {}".format( + accumulator_type)) + self._result_type = result_type + self._accumulator_type = accumulator_type + + def _create_judf(self, serialized_func, j_input_types, j_function_kind): + if self._function_type == "pandas" and not self._accumulator_type: + from pyflink.table.types import DataTypes + self._accumulator_type = DataTypes.ARRAY(self._result_type) + + j_result_type = _to_java_type(self._result_type) + j_accumulator_type = _to_java_type(self._accumulator_type) + + gateway = get_gateway() + PythonAggregateFunction = gateway.jvm \ + .org.apache.flink.table.functions.python.PythonAggregateFunction + j_aggregate_function = PythonAggregateFunction( + self._name, + bytearray(serialized_func), + j_input_types, + j_result_type, + j_accumulator_type, + j_function_kind, + self._deterministic, + _get_python_env()) + return j_aggregate_function + + def _create_delegate_function(self) -> UserDefinedFunction: Review comment: Add an assert that the function_type should be pandas? ########## File path: flink-python/pyflink/table/udf.py ########## @@ -361,23 +524,25 @@ def udf(f=None, input_types=None, result_type=None, deterministic=None, name=Non :type deterministic: bool :param name: the function name. :type name: str - :param udf_type: the type of the python function, available value: general, pandas, + :param function_type: the type of the python function, available value: general, pandas, (default: general) - :type udf_type: str Review comment: Could we still support the original udf_type to keep backward compatibility and add a warning? ########## File path: flink-python/pyflink/table/udf.py ########## @@ -312,6 +422,54 @@ def _create_judtf(self): _get_python_env()) return j_table_function + def _create_delegate_function(self) -> UserDefinedFunction: + return DelegationTableFunction(self._func) + + +class UserDefinedAggregateFunctionWrapper(UserDefinedFunctionWrapper): + """ + Wrapper for Python user-defined aggregate function. + """ + def __init__(self, func, input_types, result_type, accumulator_type, function_type, + deterministic, name): + super(UserDefinedAggregateFunctionWrapper, self).__init__( + func, input_types, function_type, deterministic, name) + + if not isinstance(result_type, DataType): + raise TypeError( + "Invalid returnType: returnType should be DataType but is {}".format(result_type)) + if accumulator_type is not None and not isinstance(accumulator_type, DataType): + raise TypeError( + "Invalid accumulator_type: accumulator_type should be DataType but is {}".format( + accumulator_type)) + self._result_type = result_type + self._accumulator_type = accumulator_type + + def _create_judf(self, serialized_func, j_input_types, j_function_kind): + if self._function_type == "pandas" and not self._accumulator_type: Review comment: ```suggestion if self._function_type == "pandas": ``` ########## File path: flink-python/pyflink/table/udf.py ########## @@ -421,3 +586,51 @@ def udtf(f=None, input_types=None, result_types=None, deterministic=None, name=N deterministic=deterministic, name=name) else: return _create_udtf(f, input_types, result_types, deterministic, name) + + +def udaf(f=None, input_types=None, result_type=None, accumulator_type=None, deterministic=None, + name=None, function_type="pandas"): + """ + Helper method for creating a user-defined aggregate function. + + Example: + :: + + >>> # The input_types is optional. + >>> @udaf(result_type=DataTypes.FLOAT(), function_type="pandas") + ... def mean_udaf(v): + ... return v.mean() + + :param f: user-defined aggregate function. + :type f: function or UserDefinedFunction or type + :param input_types: optional, the input data types. + :type input_types: list[DataType] or DataType + :param result_type: the result data type. + :type result_type: DataType + :param accumulator_type: the accumulator data type. + :type accumulator_type: DataType Review comment: Use typehint if possible? ########## File path: flink-python/pyflink/table/udf.py ########## @@ -361,23 +524,25 @@ def udf(f=None, input_types=None, result_type=None, deterministic=None, name=Non :type deterministic: bool :param name: the function name. :type name: str - :param udf_type: the type of the python function, available value: general, pandas, + :param function_type: the type of the python function, available value: general, pandas, (default: general) - :type udf_type: str Review comment: Could we still support the original udf_type to keep backward compatibility and add a warning if udf_type is used? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org