Hi JING, Thanks for the pointers. 1) I am able to debug why the variable `*numOfElements` *was getting reset to 0. The following method of* AbstarctMapBundleOperator.java *was getting called which was resetting the variable to 0 before it could reach max count.
* @Override public void finishBundle() throws Exception { if (!bundle.isEmpty()) { numOfElements = 0; function.finishBundle(bundle, collector); bundle.clear(); } bundleTrigger.reset(); }* 2) To avoid this problem I created a simple aggregator which will accumulate the elements in a LinkedHashMap and output them when it reaches a max count. I can see now the bundle size is reaching the max and *output.collect *is getting called. But I still don't see output. Here is the new aggregator code. *public class MyAggregator<K, V, IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> { private int count; private Map<K, OUT> bundle = new LinkedHashMap<>(); private MapBundleFunction<K, OUT, IN, OUT> bundleFunction; private KeySelector<IN, K> keySelector; public MyAggregator(int count, MapBundleFunction<K, OUT, IN, OUT> bundleFunction, KeySelector<IN, K> keySelector) { this.count = count; this.bundleFunction = bundleFunction; this.keySelector = keySelector; } @Override public void open() throws Exception { bundle = new LinkedHashMap<>(); } @Override public void processElement(StreamRecord<IN> element) throws Exception { K key = getKey(element); OUT value = bundle.get(key); OUT newValue = bundleFunction.addInput(value, element.getValue()); bundle.put(key, newValue); if (bundle.size() > count) { for (Map.Entry<K, OUT> entry :bundle.entrySet()) { output.collect(new StreamRecord<>(entry.getValue())); } bundle.clear(); } } private K getKey(StreamRecord<IN> element) throws Exception { return keySelector.getKey(element.getValue()); }}* Following is the drive code *public class HourlyTipsSolution extends ExerciseBase { /** * Main method. * * @throws Exception which occurs during job execution. */ public static void main(String[] args) throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(ExerciseBase.parallelism); // start the data generator DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator())); // compute tips per hour for each driver MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction(); BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new KeySelector<TaxiFare, Long>() { @Override public Long getKey(TaxiFare value) throws Exception { return value.driverId; } }; MyAggregator<Long, TaxiFare, TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10, mapBundleFunction, taxiFareLongKeySelector); DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares.transform("preshuffle", TypeInformation.of(TaxiFare.class), aggregator) .keyBy((TaxiFare fare) -> fare.driverId) .window(TumblingProcessingTimeWindows.of(Time.hours(1))) .process(new AddTips()); DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2); printOrTest(hourlyMax); // execute the transformation pipeline env.execute("Hourly Tips (java)"); } /* * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. */ public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> { @Override public void process( Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) { float sumOfTips = 0F; for (TaxiFare f : fares) { sumOfTips += f.tip; } out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips)); } }}* But the following driver code works when I remove the aggregator *public class HourlyTipsSolution extends ExerciseBase {* * /** * Main method. * * @throws Exception which occurs during job execution. */ public static void main(String[] args) throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(ExerciseBase.parallelism); // start the data generator DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator())); // compute tips per hour for each driver MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction(); BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(2); KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new KeySelector<TaxiFare, Long>() { @Override public Long getKey(TaxiFare value) throws Exception { return value.driverId; } }; MyAggregator<Long, TaxiFare, TaxiFare, TaxiFare> aggregator = new MyAggregator<>(10, mapBundleFunction, taxiFareLongKeySelector); DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares.keyBy((TaxiFare fare) -> fare.driverId) .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());; DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).maxBy(2); printOrTest(hourlyMax); // execute the transformation pipeline env.execute("Hourly Tips (java)"); } /* * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. */ public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> { @Override public void process( Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) { float sumOfTips = 0F; for (TaxiFare f : fares) { sumOfTips += f.tip; } out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips)); } }}* On Fri, Aug 20, 2021 at 4:18 AM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Suman, > > But I am always seeing the following code of ` > *AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. > It is weird, please set a breakpoint at line ` > *bundleTrigger.onElement(input);*` in `*processElement*` method to see > what happens when a record is processed by `*processElement*`. > > > One more question, you mentioned that I need to test with ` > *LinkedHashMap*` instead of `*HashMap*`. Where should I make this change? > You could copy the class `AbstractMapBundleOperator`, and update the > bundle initialization code in the `open` method. > Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are > not marked as @public, they have no guarantee of compatibility. > You'd better copy them for your own use. > > Best, > JING ZHANG > > suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 下午2:18写道: > >> Hi Jing, >> I tried using `*MapBundleOperator*` also (I am yet to test with >> LinkedHashMap) . But I am always seeing that the following code of ` >> *AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. It is >> never getting incremented. I replaced `*TaxiFareStream*` with ` >> *MapBundleOperator*` in the above code. It should increment by 1 >> each time an element is processed but that is not happening. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> * public void processElement(StreamRecord<IN> element) throws >> Exception { // get the key and value for the map bundle final >> IN input = element.getValue(); final K bundleKey = getKey(input); >> final V bundleValue = bundle.get(bundleKey); // get a new value >> after adding this element to bundle final V newBundleValue = >> function.addInput(bundleValue, input); // update to map bundle >> bundle.put(bundleKey, newBundleValue); numOfElements++; >> bundleTrigger.onElement(input); }* >> >> One more question, you mentioned that I need to test with ` >> *LinkedHashMap*` instead of `*HashMap*`. Where should I make this >> change? Do I need to create a class which extends from `MapBundleOperator` >> and add it there? >> >> Thanks >> >> >> On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <beyond1...@gmail.com> wrote: >> >>> Hi Suman, >>> Please try copy `*MapBundleOperator*`, update the `HashMap` to >>> `LinkedHashMap` to keep the output sequence consistent with input sequence. >>> >>> Best, >>> JING ZHANG >>> >>> suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道: >>> >>>> Hi Jing, >>>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I >>>> was following this link >>>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html >>>> . Please let me know if there is any other way of aggregating elements >>>> locally. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, >>>> TaxiFare, TaxiFare> { private KeySelector<TaxiFare, Long> keySelector; >>>> public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare, >>>> TaxiFare> userFunction, BundleTrigger<TaxiFare> >>>> bundleTrigger, KeySelector<TaxiFare, Long> >>>> keySelector) { super(userFunction, bundleTrigger, keySelector); >>>> this.keySelector = keySelector; } @Override protected Long >>>> getKey(TaxiFare input) throws Exception { return >>>> keySelector.getKey(input); }}* >>>> >>>> Thanks >>>> >>>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Suman, >>>>> Would you please provide the code about `*TaxiFareStream*`? It seems >>>>> we could use `MapBundleOperator` directly here. >>>>> BTW, I have some concerns about using the solution to do >>>>> local-aggregation for window aggregation because `MapBundleOperator` >>>>> would save input data in a bundle which is a HashMap object which >>>>> could not keep the data input sequence. I'm afraid there exists >>>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it >>>>> is reasonable to assign a watermark based on an unordered >>>>> timestamp. >>>>> >>>>> Best, >>>>> JING ZHANG >>>>> >>>>> >>>>> >>>>> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道: >>>>> >>>>>> I am trying to do pre shuffle aggregation in flink. Following is the >>>>>> MapBundle implementation. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *public class TaxiFareMapBundleFunction extends >>>>>> MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> { @Override >>>>>> public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws >>>>>> Exception { if (value == null) { return input; } >>>>>> value.tip = value.tip + input.tip; return value; } >>>>>> @Override public void finishBundle(Map<Long, TaxiFare> buffer, >>>>>> Collector<TaxiFare> out) throws Exception { for (Map.Entry<Long, >>>>>> TaxiFare> entry : buffer.entrySet()) { >>>>>> out.collect(entry.getValue()); } }}* >>>>>> >>>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle >>>>>> aggregation is not working as the "*count*" variable is always 0. >>>>>> Please let me know If I am missing something. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> * @Override public void onElement(T element) throws Exception >>>>>> { count++; if (count >= maxCount) { >>>>>> callback.finishBundle(); reset(); } }* >>>>>> >>>>>> Here is the main code. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> * MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> >>>>>> mapBundleFunction = new TaxiFareMapBundleFunction(); >>>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); >>>>>> KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new >>>>>> KeySelector<TaxiFare, Long>() { @Override public >>>>>> Long >>>>>> getKey(TaxiFare value) throws Exception { return >>>>>> value.driverId; } }; DataStream<Tuple3<Long, >>>>>> Long, >>>>>> Float>> hourlyTips =// fares.keyBy((TaxiFare >>>>>> fare) -> fare.driverId)// >>>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new >>>>>> AddTips());; fares.transform("preshuffle", >>>>>> TypeInformation.of(TaxiFare.class), new >>>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector >>>>>> )) .assignTimestampsAndWatermarks(new >>>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) { >>>>>> @Override public long >>>>>> extractTimestamp(TaxiFare element) { >>>>>> return >>>>>> element.startTime.getEpochSecond(); } >>>>>> }) .keyBy((TaxiFare fare) -> >>>>>> fare.driverId) >>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>>>>> .process(new AddTips()); DataStream<Tuple3<Long, Long, >>>>>> Float>> >>>>>> hourlyMax = >>>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* >>>>>> >>>>>> Thanks >>>>>> Suman >>>>>> >>>>>