Hi Suman, Please try copy `*MapBundleOperator*`, update the `HashMap` to `LinkedHashMap` to keep the output sequence consistent with input sequence.
Best, JING ZHANG suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道: > Hi Jing, > Thanks for looking into this. Here is the code of `TaxiFareStream'. I was > following this link > http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html > . Please let me know if there is any other way of aggregating elements > locally. > > > > > > > > > > > > > > > *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, > TaxiFare, TaxiFare> { private KeySelector<TaxiFare, Long> keySelector; > public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare, > TaxiFare> userFunction, BundleTrigger<TaxiFare> > bundleTrigger, KeySelector<TaxiFare, Long> > keySelector) { super(userFunction, bundleTrigger, keySelector); > this.keySelector = keySelector; } @Override protected Long > getKey(TaxiFare input) throws Exception { return > keySelector.getKey(input); }}* > > Thanks > > On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi Suman, >> Would you please provide the code about `*TaxiFareStream*`? It seems we >> could use `MapBundleOperator` directly here. >> BTW, I have some concerns about using the solution to do >> local-aggregation for window aggregation because `MapBundleOperator` >> would save input data in a bundle which is a HashMap object which could >> not keep the data input sequence. I'm afraid there exists >> unorder in a bundle (in your case 10) problem. I'm not sure whether it is >> reasonable to assign a watermark based on an unordered >> timestamp. >> >> Best, >> JING ZHANG >> >> >> >> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道: >> >>> I am trying to do pre shuffle aggregation in flink. Following is the >>> MapBundle implementation. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, >>> TaxiFare, TaxiFare, TaxiFare> { @Override public TaxiFare >>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { >>> if (value == null) { return input; } value.tip = >>> value.tip + input.tip; return value; } @Override public >>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) >>> throws Exception { for (Map.Entry<Long, TaxiFare> entry : >>> buffer.entrySet()) { out.collect(entry.getValue()); } >>> }}* >>> >>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation >>> is not working as the "*count*" variable is always 0. Please let me >>> know If I am missing something. >>> >>> >>> >>> >>> >>> >>> >>> >>> * @Override public void onElement(T element) throws Exception { >>> count++; if (count >= maxCount) { >>> callback.finishBundle(); reset(); } }* >>> >>> Here is the main code. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> * MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> >>> mapBundleFunction = new TaxiFareMapBundleFunction(); >>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); >>> KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new >>> KeySelector<TaxiFare, Long>() { @Override public Long >>> getKey(TaxiFare value) throws Exception { return >>> value.driverId; } }; DataStream<Tuple3<Long, Long, >>> Float>> hourlyTips =// fares.keyBy((TaxiFare >>> fare) -> fare.driverId)// >>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new >>> AddTips());; fares.transform("preshuffle", >>> TypeInformation.of(TaxiFare.class), new >>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector >>> )) .assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) { >>> @Override public long >>> extractTimestamp(TaxiFare element) { return >>> element.startTime.getEpochSecond(); } >>> }) .keyBy((TaxiFare fare) -> >>> fare.driverId) >>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>> .process(new AddTips()); DataStream<Tuple3<Long, Long, Float>> >>> hourlyMax = >>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* >>> >>> Thanks >>> Suman >>> >>