Hi Jing, I tried using `*MapBundleOperator*` also (I am yet to test with LinkedHashMap) . But I am always seeing that the following code of ` *AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. It is never getting incremented. I replaced `*TaxiFareStream*` with ` *MapBundleOperator*` in the above code. It should increment by 1 each time an element is processed but that is not happening.
* public void processElement(StreamRecord<IN> element) throws Exception { // get the key and value for the map bundle final IN input = element.getValue(); final K bundleKey = getKey(input); final V bundleValue = bundle.get(bundleKey); // get a new value after adding this element to bundle final V newBundleValue = function.addInput(bundleValue, input); // update to map bundle bundle.put(bundleKey, newBundleValue); numOfElements++; bundleTrigger.onElement(input); }* One more question, you mentioned that I need to test with `*LinkedHashMap*` instead of `*HashMap*`. Where should I make this change? Do I need to create a class which extends from `MapBundleOperator` and add it there? Thanks On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Suman, > Please try copy `*MapBundleOperator*`, update the `HashMap` to > `LinkedHashMap` to keep the output sequence consistent with input sequence. > > Best, > JING ZHANG > > suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道: > >> Hi Jing, >> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was >> following this link >> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html >> . Please let me know if there is any other way of aggregating elements >> locally. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, >> TaxiFare, TaxiFare> { private KeySelector<TaxiFare, Long> keySelector; >> public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare, >> TaxiFare> userFunction, BundleTrigger<TaxiFare> >> bundleTrigger, KeySelector<TaxiFare, Long> >> keySelector) { super(userFunction, bundleTrigger, keySelector); >> this.keySelector = keySelector; } @Override protected Long >> getKey(TaxiFare input) throws Exception { return >> keySelector.getKey(input); }}* >> >> Thanks >> >> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote: >> >>> Hi Suman, >>> Would you please provide the code about `*TaxiFareStream*`? It seems we >>> could use `MapBundleOperator` directly here. >>> BTW, I have some concerns about using the solution to do >>> local-aggregation for window aggregation because `MapBundleOperator` >>> would save input data in a bundle which is a HashMap object which could >>> not keep the data input sequence. I'm afraid there exists >>> unorder in a bundle (in your case 10) problem. I'm not sure whether it >>> is reasonable to assign a watermark based on an unordered >>> timestamp. >>> >>> Best, >>> JING ZHANG >>> >>> >>> >>> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道: >>> >>>> I am trying to do pre shuffle aggregation in flink. Following is the >>>> MapBundle implementation. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, >>>> TaxiFare, TaxiFare, TaxiFare> { @Override public TaxiFare >>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { >>>> if (value == null) { return input; } value.tip = >>>> value.tip + input.tip; return value; } @Override public >>>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) >>>> throws Exception { for (Map.Entry<Long, TaxiFare> entry : >>>> buffer.entrySet()) { out.collect(entry.getValue()); } >>>> }}* >>>> >>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation >>>> is not working as the "*count*" variable is always 0. Please let me >>>> know If I am missing something. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> * @Override public void onElement(T element) throws Exception { >>>> count++; if (count >= maxCount) { >>>> callback.finishBundle(); reset(); } }* >>>> >>>> Here is the main code. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> * MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> >>>> mapBundleFunction = new TaxiFareMapBundleFunction(); >>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); >>>> KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new >>>> KeySelector<TaxiFare, Long>() { @Override public Long >>>> getKey(TaxiFare value) throws Exception { return >>>> value.driverId; } }; DataStream<Tuple3<Long, Long, >>>> Float>> hourlyTips =// fares.keyBy((TaxiFare >>>> fare) -> fare.driverId)// >>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new >>>> AddTips());; fares.transform("preshuffle", >>>> TypeInformation.of(TaxiFare.class), new >>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector >>>> )) .assignTimestampsAndWatermarks(new >>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) { >>>> @Override public long >>>> extractTimestamp(TaxiFare element) { return >>>> element.startTime.getEpochSecond(); } >>>> }) .keyBy((TaxiFare fare) -> >>>> fare.driverId) >>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>>> .process(new AddTips()); DataStream<Tuple3<Long, Long, Float>> >>>> hourlyMax = >>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* >>>> >>>> Thanks >>>> Suman >>>> >>>