Hi Suman, Would you please provide the code about `*TaxiFareStream*`? It seems we could use `MapBundleOperator` directly here. BTW, I have some concerns about using the solution to do local-aggregation for window aggregation because `MapBundleOperator` would save input data in a bundle which is a HashMap object which could not keep the data input sequence. I'm afraid there exists unorder in a bundle (in your case 10) problem. I'm not sure whether it is reasonable to assign a watermark based on an unordered timestamp.
Best, JING ZHANG suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道: > I am trying to do pre shuffle aggregation in flink. Following is the > MapBundle implementation. > > > > > > > > > > > > > > > > > > > > *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, > TaxiFare, TaxiFare, TaxiFare> { @Override public TaxiFare > addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { > if (value == null) { return input; } value.tip = > value.tip + input.tip; return value; } @Override public > void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) > throws Exception { for (Map.Entry<Long, TaxiFare> entry : > buffer.entrySet()) { out.collect(entry.getValue()); } > }}* > > I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is > not working as the "*count*" variable is always 0. Please let me know If > I am missing something. > > > > > > > > > * @Override public void onElement(T element) throws Exception { > count++; if (count >= maxCount) { > callback.finishBundle(); reset(); } }* > > Here is the main code. > > > > > > > > > > > > > > > > > > > > > > > > > > > * MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> > mapBundleFunction = new TaxiFareMapBundleFunction(); > BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); > KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new > KeySelector<TaxiFare, Long>() { @Override public Long > getKey(TaxiFare value) throws Exception { return > value.driverId; } }; DataStream<Tuple3<Long, Long, > Float>> hourlyTips =// fares.keyBy((TaxiFare > fare) -> fare.driverId)// > .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new > AddTips());; fares.transform("preshuffle", > TypeInformation.of(TaxiFare.class), new > TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector > )) .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) { > @Override public long > extractTimestamp(TaxiFare element) { return > element.startTime.getEpochSecond(); } > }) .keyBy((TaxiFare fare) -> > fare.driverId) > .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) > .process(new AddTips()); DataStream<Tuple3<Long, Long, Float>> > hourlyMax = > hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* > > Thanks > Suman >