No, there is no specific reason. I am using it because I am computing the HyperLogLog over a window. *--* *-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Mon, Jul 1, 2019 at 12:34 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi Felipe, > Thanks for the example. I will try a variation of that for mine. Is there > a specific reason to use the HyperLogLogState ? > > Vijay > > On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Hi Vijay, >> >> I managed by using >> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the >> processElement method and clearing the state on the onTimer method. This is >> my program [1]. >> >> [1] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java >> >> Kind Regards, >> Felipe >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: >> >>> Hi Vijay, >>> >>> When using windows, you may use the 'trigger' to set a Custom Trigger >>> which would trigger your *ProcessWindowFunction* accordingly. >>> >>> In your case, you would probably use: >>> >>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))* >>>> >>> >>> Thanks, >>> Rafi >>> >>> >>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com> >>> wrote: >>> >>>> I am also implementing the ProcessWindowFunction and accessing the >>>> windowState to get data but how do i push data out every 5 mins during a 4 >>>> hr time window ?? I am adding a globalState to handle the 4 hr window ??? >>>> Or should I still use the context.windowState even for the 4 hr window ? >>>> >>>> public class MGroupingAggregateClass extends >>>>> ProcessWindowFunction<....> { >>>>> >>>>> private MapState<String, Object> timedGroupKeyState; >>>>> private MapState<String, Object> globalGroupKeyState; >>>>> private final MapStateDescriptor<String, Object> >>>>> timedMapKeyStateDescriptor = >>>>> new MapStateDescriptor<>("timedGroupKeyState", >>>>> String.class, Object.class); >>>>> private final MapStateDescriptor<String, Object> >>>>> globalMapKeyStateDescriptor = >>>>> new MapStateDescriptor<>("globalGroupKeyState", >>>>> String.class, Object.class); >>>>> >>>>> >>>>> public void open(Configuration ..) { >>>>> timedGroupKeyState = >>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor); >>>>> globalGroupKeyState = >>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor); >>>>> } >>>>> >>>>> public void process(MonitoringTuple currKey, Context context, >>>>> Iterable<Map<String, Object>> elements, >>>>> Collector<Map<String, Object>> out) throws >>>>> Exception { >>>>> logger.info("Entered MGroupingAggregateWindowProcessing - >>>>> process interval:{}, currKey:{}", interval, currKey); >>>>> timedGroupKeyState = >>>>> context.windowState().getMapState(timedMapKeyStateDescriptor); >>>>> globalGroupKeyState = >>>>> context.globalState().getMapState(globalMapKeyStateDescriptor); >>>>> ... >>>>> //get data fromm state >>>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey); >>>>> >>>>> //how do i push the data out every 5 mins to the sink during the 4 hr >>>>> window ?? >>>>> >>>>> } >>>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> Need to calculate a 4 hour time window for count, sum with current >>>>> calculated results being output every 5 mins. >>>>> How do i do that ? >>>>> Currently, I calculate results for 5 sec and 5 min time windows fine >>>>> on the KeyedStream. >>>>> >>>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow >>>>>> = Time.seconds(timeIntervalL); >>>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream = >>>>>> kinesisStream.keyBy(...); >>>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow> >>>>>> windowStream = >>>>>> monitoringTupleKeyedStream >>>>>> .timeWindow(timeWindow); >>>>>> DataStream<....> enrichedMGStream = windowStream.aggregate( >>>>>> new MGroupingWindowAggregateClass(...), >>>>>> new MGroupingAggregateClass(....)) >>>>>> .map(new Monitoring...(...)); >>>>>> enrichedMGStream.addSink(..); >>>>>> >>>>> >>>>> >>>>> TIA, >>>>> Vijay >>>>> >>>>