There doesn't seem to be a built-in way to apply multiple aggregations
to a window.
You could use an aggregate function that combines other aggregate
functions, but admittedly this will get unwieldy as the number of
functions increase:
public static class MultiAggregateFunction<IN, ACC1, OUT1, F1 extends AggregateFunction<IN,
ACC1, OUT1>, ACC2, OUT2, F2 extends AggregateFunction<IN, ACC2, OUT2>>
implements AggregateFunction<IN, Tuple2<ACC1, ACC2>, Tuple2<OUT1, OUT2>> {
private final F1 f1; private final F2 f2; public MultiAggregateFunction(F1
f1, F2 f2) {
this.f1 = f1; this.f2 = f2; }
@Override public Tuple2<ACC1, ACC2> createAccumulator() {
return Tuple2.of(f1.createAccumulator(), f2.createAccumulator()); }
@Override public Tuple2<ACC1, ACC2> add(IN value, Tuple2<ACC1, ACC2>
accumulator) {
f1.add(value, accumulator.f0); f2.add(value, accumulator.f1); return
accumulator; }
@Override public Tuple2<OUT1, OUT2> getResult(Tuple2<ACC1, ACC2>
accumulator) {
return Tuple2.of(f1.getResult(accumulator.f0),
f2.getResult(accumulator.f1)); }
@Override public Tuple2<ACC1, ACC2> merge(Tuple2<ACC1, ACC2> a, Tuple2<ACC1,
ACC2> b) {
return Tuple2.of(f1.merge(a.f0, b.f0), f2.merge(a.f1, b.f1)); }
}
On 08/10/2019 12:09, Frank Wilson wrote:
Hi,
In the datastream api is there a way to take two aggregate functions
and apply them to the same window? The output would be a stream of
2-tuples containing the result of each aggregate function.
I feel it should be possible to combine previously written functions
rather than writing a bespoke ‘god’ aggregate function for each pipeline.
Thanks,
Frank