twalthr commented on a change in pull request #7059: [FLINK-10689][table] Port 
UDFs in Table API extension points to flink-table-common
URL: https://github.com/apache/flink/pull/7059#discussion_r235888819
 
 

 ##########
 File path: 
flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
 ##########
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Base class for User-Defined Aggregates.
+ *
+ * <p>The behavior of an {@link AggregateFunction} can be defined by 
implementing a series of custom
+ * methods. An {@link AggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - getValue.
+ *
+ * <p>There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator
+ *
+ * <p>All these methods must be declared publicly, not static and named 
exactly as the names
+ * mentioned above. The methods createAccumulator and getValue are defined in 
the
+ * {@link AggregateFunction} functions, while other methods are explained 
below.
+ *
+ * <p>Processes the input values and update the provided accumulator instance. 
The method
+ * accumulate can be overloaded with different custom types and arguments. An 
AggregateFunction
+ * requires at least one accumulate() method.
+ *
+ * {@code
+ * param accumulator           the accumulator which contains the current 
aggregated results
+ * param userDefinedInputs the input value (usually obtained from a new 
arrived data).
+ *
+ * def accumulate(accumulator: ACC, userDefinedInputs): Unit
+ * }
+ *
+ * <p>Retracts the input values from the accumulator instance. The current 
design assumes the
+ * inputs are the values that have been previously accumulated. The method 
retract can be
+ * overloaded with different custom types and arguments. This function must be 
implemented for
+ * datastream bounded over aggregate.
+ *
+ * {@code
+ * param accumulator           the accumulator which contains the current 
aggregated results
+ * param userDefinedInputs the input value (usually obtained from a new 
arrived data).
+ *
+ * def retract(accumulator: ACC, userDefinedInputs): Unit
+ * }
+ *
+ * <p>Merges a group of accumulator instances into one accumulator instance. 
This function must be
+ * implemented for datastream session window grouping aggregate and dataset 
grouping aggregate.
+ *
+ * {@code
+ * param accumulator  the accumulator which will keep the merged aggregate 
results. It should
+ *                     be noted that the accumulator may contain the previous 
aggregated
+ *                     results. Therefore user should not replace or clean 
this instance in the
+ *                     custom merge method.
+ * param its          an {@link java.lang.Iterable} pointed to a group of 
accumulators that will be
+ *                     merged.
+ *
+ * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit
+ * }
+ *
+ * <p>Resets the accumulator for this {@link AggregateFunction}. This function 
must be implemented for
+ * dataset grouping aggregate.
+ *
+ * {@code
+ * param accumulator  the accumulator which needs to be reset
+ *
+ * def resetAccumulator(accumulator: ACC): Unit
+ * }
+ *
+ *
+ * @param T   the type of the aggregation result
+ * @param ACC the type of the aggregation accumulator. The accumulator is used 
to keep the
+ *             aggregated values which are needed to compute an aggregation 
result.
+ *             AggregateFunction represents its state using accumulator, 
thereby the state of the
+ *             AggregateFunction must be put into the accumulator.
+ */
+public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
 
 Review comment:
   Add proper annotations to all classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to