There doesn't seem to be a built-in way to apply multiple aggregations to a window.

You could use an aggregate function that combines other aggregate functions, but admittedly this will get unwieldy as the number of functions increase:

public static class MultiAggregateFunction<IN, ACC1, OUT1, F1 extends AggregateFunction<IN, 
ACC1, OUT1>, ACC2, OUT2, F2 extends AggregateFunction<IN, ACC2, OUT2>>
   implements AggregateFunction<IN, Tuple2<ACC1, ACC2>, Tuple2<OUT1, OUT2>> {

   private final F1 f1; private final F2 f2; public MultiAggregateFunction(F1 
f1, F2 f2) {
      this.f1 = f1; this.f2 = f2; }

   @Override public Tuple2<ACC1, ACC2> createAccumulator() {
      return Tuple2.of(f1.createAccumulator(), f2.createAccumulator()); }

   @Override public Tuple2<ACC1, ACC2> add(IN value, Tuple2<ACC1, ACC2> 
accumulator) {
      f1.add(value, accumulator.f0); f2.add(value, accumulator.f1); return 
accumulator; }

   @Override public Tuple2<OUT1, OUT2> getResult(Tuple2<ACC1, ACC2> 
accumulator) {
      return Tuple2.of(f1.getResult(accumulator.f0), 
f2.getResult(accumulator.f1)); }

   @Override public Tuple2<ACC1, ACC2> merge(Tuple2<ACC1, ACC2> a, Tuple2<ACC1, 
ACC2> b) {
      return Tuple2.of(f1.merge(a.f0, b.f0), f2.merge(a.f1, b.f1)); }
}


On 08/10/2019 12:09, Frank Wilson wrote:
Hi,

In the datastream api is there a way to take two aggregate functions and apply them to the same window? The output would be a stream of 2-tuples containing the result of each aggregate function.

I feel it should be possible to combine previously written functions rather than writing a bespoke ‘god’ aggregate function for each pipeline.

Thanks,

Frank


Reply via email to