[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833675#comment-15833675 ]
ASF GitHub Bot commented on FLINK-5582: --------------------------------------- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 @shaoxuan-wang I cannot reproduce the compile error you posted. The latest commit also gets a green light from Travis CI: https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397 > Add a general distributive aggregate function > --------------------------------------------- > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunction<IN, ACC, OUT> extends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction<Integer, > AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction<Datum, > AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)