Re: Pre shuffle aggregation in flink is not working

2021-08-22 Thread JING ZHANG
Hi Suman, I've learned the providing code, and have some questions, 1. Why we do a WindowAggregate window(TumblingProcessingTimeWindows.of(Time.minutes(1))), then do a windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2); One uses `ProcessingTimeWindow`, the other uses `EventTimeWindow`.

Re: Pre shuffle aggregation in flink is not working

2021-08-21 Thread suman shil
Hi JING, Thanks for the pointers. 1) I am able to debug why the variable `*numOfElements` *was getting reset to 0. The following method of* AbstarctMapBundleOperator.java *was getting called which was resetting the variable to 0 before it could reach max count. *@Overridepublic v

Re: Pre shuffle aggregation in flink is not working

2021-08-20 Thread JING ZHANG
Hi Suman, > But I am always seeing the following code of ` *AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. It is weird, please set a breakpoint at line ` *bundleTrigger.onElement(input);*` in `*processElement*` method to see what happens when a record is processed by `*processEle

Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
Hi Jing, I tried using `*MapBundleOperator*` also (I am yet to test with LinkedHashMap) . But I am always seeing that the following code of ` *AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. It is never getting incremented. I replaced `*TaxiFareStream*` with ` *MapBundleOperator*`

Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread JING ZHANG
Hi Suman, Please try copy `*MapBundleOperator*`, update the `HashMap` to `LinkedHashMap` to keep the output sequence consistent with input sequence. Best, JING ZHANG suman shil 于2021年8月20日周五 上午2:23写道: > Hi Jing, > Thanks for looking into this. Here is the code of `TaxiFareStream'. I was > follo

Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
Hi Jing, Thanks for looking into this. Here is the code of `TaxiFareStream'. I was following this link http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html . Please let me know if there is any other way of aggregating elements locally. *public class

Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread JING ZHANG
Hi Suman, Would you please provide the code about `*TaxiFareStream*`? It seems we could use `MapBundleOperator` directly here. BTW, I have some concerns about using the solution to do local-aggregation for window aggregation because `MapBundleOperator` would save input data in a bundle which is a H

Pre shuffle aggregation in flink is not working

2021-08-18 Thread suman shil
I am trying to do pre shuffle aggregation in flink. Following is the MapBundle implementation. *public class TaxiFareMapBundleFunction extends MapBundleFunction {@Overridepublic TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { if (value ==