[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833605#comment-15833605 ]
ASF GitHub Bot commented on FLINK-5582: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227451 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * <p>Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * <p>An example how to use this interface is below: + * + * <pre>{@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * }</pre> + */ +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- My first feeling is to keep the name `add()` because it fits better together with the term `Accumulator`. One can view retractions as adding negative values. What do you think about that? > Add a general distributive aggregate function > --------------------------------------------- > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunction<IN, ACC, OUT> extends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction<Integer, > AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction<Datum, > AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)