dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000842
########## File path: docs/dev/table/tableApi.md ########## @@ -1889,6 +1889,57 @@ tableEnv.registerFunction("func", func); Table table = input .flatMap("func(c)").as("a, b") +{% endhighlight %} + </td> + </tr> + + <tr> + <td> + <strong>Aggregate</strong><br> + <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> + </td> + <td> + <p>Performs an aggregate operation with an aggregate function. You have to close the "aggregate" with a select statement. The output of aggregate will be flattened if the output type is a composite type.</p> +{% highlight java %} + public class MyMinMaxAcc { + public int min = 0; + public int max = 0; + } + + public class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> { + + public void accumulate(MyMinMaxAcc acc, int value) { + if (value < acc.min) { + acc.min = value; + } + if (value > acc.max) { + acc.max = value; + } + } + + @Override + public MyMinMaxAcc createAccumulator() { + return new MyMinMaxAcc(); + } + + @Override + public Row getValue(MyMinMaxAcc acc) { + return Row.of(acc.min, acc.max); + } + + @Override + public TypeInformation<Row> getResultType() { + return new RowTypeInfo(Types.INT, Types.INT); + } + } + +AggregateFunction myAggFunc = new MyMinMax(); + +tableEnv.registerFunction("myAggFunc", myAggFunc); +Table table = input + .groupBy("key") + .aggregate("myAggFunc(a, b) as (x, y, z)") Review comment: as (x, y) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services