dianfu commented on a change in pull request #13421:
URL: https://github.com/apache/flink/pull/13421#discussion_r492448420



##########
File path: docs/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
##########
@@ -28,7 +28,7 @@ The performance of vectorized Python user-defined functions 
are usually much hig
 overhead and invocation overhead are much reduced. Besides, users could 
leverage the popular Python libraries such as Pandas, Numpy, etc for the 
vectorized Python user-defined functions implementation.
 These Python libraries are highly optimized and provide high-performance data 
structures and functions. It shares the similar way as the
 [non-vectorized user-defined functions]({% link 
dev/python/table-api-users-guide/udfs/python_udfs.md %}) on how to define 
vectorized user-defined functions.
-Users only need to add an extra parameter `udf_type="pandas"` in the decorator 
`udf` to mark it as a vectorized user-defined function.
+Users only need to add an extra parameter `function_type="pandas"` in the 
decorator `udf` to mark it as a vectorized user-defined function.

Review comment:
       I'm rethinking about the name. What about `func_type`? It's more short.

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -312,6 +422,54 @@ def _create_judtf(self):
             _get_python_env())
         return j_table_function
 
+    def _create_delegate_function(self) -> UserDefinedFunction:
+        return DelegationTableFunction(self._func)
+
+
+class UserDefinedAggregateFunctionWrapper(UserDefinedFunctionWrapper):
+    """
+    Wrapper for Python user-defined aggregate function.
+    """
+    def __init__(self, func, input_types, result_type, accumulator_type, 
function_type,
+                 deterministic, name):
+        super(UserDefinedAggregateFunctionWrapper, self).__init__(
+            func, input_types, function_type, deterministic, name)
+
+        if not isinstance(result_type, DataType):
+            raise TypeError(
+                "Invalid returnType: returnType should be DataType but is 
{}".format(result_type))
+        if accumulator_type is not None and not isinstance(accumulator_type, 
DataType):
+            raise TypeError(
+                "Invalid accumulator_type: accumulator_type should be DataType 
but is {}".format(
+                    accumulator_type))
+        self._result_type = result_type
+        self._accumulator_type = accumulator_type
+
+    def _create_judf(self, serialized_func, j_input_types, j_function_kind):
+        if self._function_type == "pandas" and not self._accumulator_type:
+            from pyflink.table.types import DataTypes
+            self._accumulator_type = DataTypes.ARRAY(self._result_type)
+
+        j_result_type = _to_java_type(self._result_type)
+        j_accumulator_type = _to_java_type(self._accumulator_type)
+
+        gateway = get_gateway()
+        PythonAggregateFunction = gateway.jvm \
+            .org.apache.flink.table.functions.python.PythonAggregateFunction
+        j_aggregate_function = PythonAggregateFunction(
+            self._name,
+            bytearray(serialized_func),
+            j_input_types,
+            j_result_type,
+            j_accumulator_type,
+            j_function_kind,
+            self._deterministic,
+            _get_python_env())
+        return j_aggregate_function
+
+    def _create_delegate_function(self) -> UserDefinedFunction:

Review comment:
       Add an assert that the function_type should be pandas?

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -361,23 +524,25 @@ def udf(f=None, input_types=None, result_type=None, 
deterministic=None, name=Non
     :type deterministic: bool
     :param name: the function name.
     :type name: str
-    :param udf_type: the type of the python function, available value: 
general, pandas,
+    :param function_type: the type of the python function, available value: 
general, pandas,
                      (default: general)
-    :type udf_type: str

Review comment:
       Could we still support the original udf_type to keep backward 
compatibility and add a warning?

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -312,6 +422,54 @@ def _create_judtf(self):
             _get_python_env())
         return j_table_function
 
+    def _create_delegate_function(self) -> UserDefinedFunction:
+        return DelegationTableFunction(self._func)
+
+
+class UserDefinedAggregateFunctionWrapper(UserDefinedFunctionWrapper):
+    """
+    Wrapper for Python user-defined aggregate function.
+    """
+    def __init__(self, func, input_types, result_type, accumulator_type, 
function_type,
+                 deterministic, name):
+        super(UserDefinedAggregateFunctionWrapper, self).__init__(
+            func, input_types, function_type, deterministic, name)
+
+        if not isinstance(result_type, DataType):
+            raise TypeError(
+                "Invalid returnType: returnType should be DataType but is 
{}".format(result_type))
+        if accumulator_type is not None and not isinstance(accumulator_type, 
DataType):
+            raise TypeError(
+                "Invalid accumulator_type: accumulator_type should be DataType 
but is {}".format(
+                    accumulator_type))
+        self._result_type = result_type
+        self._accumulator_type = accumulator_type
+
+    def _create_judf(self, serialized_func, j_input_types, j_function_kind):
+        if self._function_type == "pandas" and not self._accumulator_type:

Review comment:
       ```suggestion
           if self._function_type == "pandas":
   ```

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -421,3 +586,51 @@ def udtf(f=None, input_types=None, result_types=None, 
deterministic=None, name=N
                                  deterministic=deterministic, name=name)
     else:
         return _create_udtf(f, input_types, result_types, deterministic, name)
+
+
+def udaf(f=None, input_types=None, result_type=None, accumulator_type=None, 
deterministic=None,
+         name=None, function_type="pandas"):
+    """
+    Helper method for creating a user-defined aggregate function.
+
+    Example:
+        ::
+
+            >>> # The input_types is optional.
+            >>> @udaf(result_type=DataTypes.FLOAT(), function_type="pandas")
+            ... def mean_udaf(v):
+            ...     return v.mean()
+
+    :param f: user-defined aggregate function.
+    :type f: function or UserDefinedFunction or type
+    :param input_types: optional, the input data types.
+    :type input_types: list[DataType] or DataType
+    :param result_type: the result data type.
+    :type result_type: DataType
+    :param accumulator_type: the accumulator data type.
+    :type accumulator_type: DataType

Review comment:
       Use typehint if possible?

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -361,23 +524,25 @@ def udf(f=None, input_types=None, result_type=None, 
deterministic=None, name=Non
     :type deterministic: bool
     :param name: the function name.
     :type name: str
-    :param udf_type: the type of the python function, available value: 
general, pandas,
+    :param function_type: the type of the python function, available value: 
general, pandas,
                      (default: general)
-    :type udf_type: str

Review comment:
       Could we still support the original udf_type to keep backward 
compatibility and add a warning if udf_type is used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to