Hi Jing, Thanks for looking into this. Here is the code of `TaxiFareStream'. I was following this link http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html . Please let me know if there is any other way of aggregating elements locally.
*public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, TaxiFare, TaxiFare> { private KeySelector<TaxiFare, Long> keySelector; public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> userFunction, BundleTrigger<TaxiFare> bundleTrigger, KeySelector<TaxiFare, Long> keySelector) { super(userFunction, bundleTrigger, keySelector); this.keySelector = keySelector; } @Override protected Long getKey(TaxiFare input) throws Exception { return keySelector.getKey(input); }}* Thanks On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Suman, > Would you please provide the code about `*TaxiFareStream*`? It seems we > could use `MapBundleOperator` directly here. > BTW, I have some concerns about using the solution to do local-aggregation > for window aggregation because `MapBundleOperator` > would save input data in a bundle which is a HashMap object which could > not keep the data input sequence. I'm afraid there exists > unorder in a bundle (in your case 10) problem. I'm not sure whether it is > reasonable to assign a watermark based on an unordered > timestamp. > > Best, > JING ZHANG > > > > suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道: > >> I am trying to do pre shuffle aggregation in flink. Following is the >> MapBundle implementation. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, >> TaxiFare, TaxiFare, TaxiFare> { @Override public TaxiFare >> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { >> if (value == null) { return input; } value.tip = >> value.tip + input.tip; return value; } @Override public >> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) >> throws Exception { for (Map.Entry<Long, TaxiFare> entry : >> buffer.entrySet()) { out.collect(entry.getValue()); } >> }}* >> >> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is >> not working as the "*count*" variable is always 0. Please let me know If >> I am missing something. >> >> >> >> >> >> >> >> >> * @Override public void onElement(T element) throws Exception { >> count++; if (count >= maxCount) { >> callback.finishBundle(); reset(); } }* >> >> Here is the main code. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> * MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> >> mapBundleFunction = new TaxiFareMapBundleFunction(); >> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); >> KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new >> KeySelector<TaxiFare, Long>() { @Override public Long >> getKey(TaxiFare value) throws Exception { return >> value.driverId; } }; DataStream<Tuple3<Long, Long, >> Float>> hourlyTips =// fares.keyBy((TaxiFare >> fare) -> fare.driverId)// >> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new >> AddTips());; fares.transform("preshuffle", >> TypeInformation.of(TaxiFare.class), new >> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector >> )) .assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) { >> @Override public long >> extractTimestamp(TaxiFare element) { return >> element.startTime.getEpochSecond(); } >> }) .keyBy((TaxiFare fare) -> >> fare.driverId) >> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >> .process(new AddTips()); DataStream<Tuple3<Long, Long, Float>> >> hourlyMax = >> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* >> >> Thanks >> Suman >> >