I am trying to do pre shuffle aggregation in flink. Following is the MapBundle implementation.
*public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> { @Override public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { if (value == null) { return input; } value.tip = value.tip + input.tip; return value; } @Override public void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) throws Exception { for (Map.Entry<Long, TaxiFare> entry : buffer.entrySet()) { out.collect(entry.getValue()); } }}* I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is not working as the "*count*" variable is always 0. Please let me know If I am missing something. * @Override public void onElement(T element) throws Exception { count++; if (count >= maxCount) { callback.finishBundle(); reset(); } }* Here is the main code. * MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction(); BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new KeySelector<TaxiFare, Long>() { @Override public Long getKey(TaxiFare value) throws Exception { return value.driverId; } }; DataStream<Tuple3<Long, Long, Float>> hourlyTips =// fares.keyBy((TaxiFare fare) -> fare.driverId)// .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());; fares.transform("preshuffle", TypeInformation.of(TaxiFare.class), new TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector )) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) { @Override public long extractTimestamp(TaxiFare element) { return element.startTime.getEpochSecond(); } }) .keyBy((TaxiFare fare) -> fare.driverId) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .process(new AddTips()); DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* Thanks Suman