Hi Suman,
I've learned the providing code, and have some questions,
1. Why we do a
WindowAggregate window(TumblingProcessingTimeWindows.of(Time.minutes(1))),
then do a windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
One uses `ProcessingTimeWindow`, the other uses `EventTimeWindow`.
Hi JING,
Thanks for the pointers.
1) I am able to debug why the variable `*numOfElements` *was getting reset
to 0. The following method of* AbstarctMapBundleOperator.java *was getting
called which was resetting the variable to 0 before it could reach max
count.
*@Overridepublic v
Hi Suman,
> But I am always seeing the following code of `
*AbstractMapBundleOperator.java*` `*numOfElements` *is always 0.
It is weird, please set a breakpoint at line `
*bundleTrigger.onElement(input);*` in `*processElement*` method to see
what happens when a record is processed by `*processEle
Hi Jing,
I tried using `*MapBundleOperator*` also (I am yet to test with
LinkedHashMap) . But I am always seeing that the following code of `
*AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. It is
never getting incremented. I replaced `*TaxiFareStream*` with `
*MapBundleOperator*`
Hi Suman,
Please try copy `*MapBundleOperator*`, update the `HashMap` to
`LinkedHashMap` to keep the output sequence consistent with input sequence.
Best,
JING ZHANG
suman shil 于2021年8月20日周五 上午2:23写道:
> Hi Jing,
> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
> follo
Hi Jing,
Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
following this link
http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
. Please let me know if there is any other way of aggregating elements
locally.
*public class
Hi Suman,
Would you please provide the code about `*TaxiFareStream*`? It seems we
could use `MapBundleOperator` directly here.
BTW, I have some concerns about using the solution to do local-aggregation
for window aggregation because `MapBundleOperator`
would save input data in a bundle which is a H
I am trying to do pre shuffle aggregation in flink. Following is the
MapBundle implementation.
*public class TaxiFareMapBundleFunction extends MapBundleFunction {@Overridepublic TaxiFare
addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
if (value ==