Hi Felipe, Thanks for the example. I will try a variation of that for mine. Is there a specific reason to use the HyperLogLogState ?
Vijay On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Hi Vijay, > > I managed by using > "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the > processElement method and clearing the state on the onTimer method. This is > my program [1]. > > [1] > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java > > Kind Regards, > Felipe > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> Hi Vijay, >> >> When using windows, you may use the 'trigger' to set a Custom Trigger >> which would trigger your *ProcessWindowFunction* accordingly. >> >> In your case, you would probably use: >> >>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))* >>> >> >> Thanks, >> Rafi >> >> >> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> I am also implementing the ProcessWindowFunction and accessing the >>> windowState to get data but how do i push data out every 5 mins during a 4 >>> hr time window ?? I am adding a globalState to handle the 4 hr window ??? >>> Or should I still use the context.windowState even for the 4 hr window ? >>> >>> public class MGroupingAggregateClass extends >>>> ProcessWindowFunction<....> { >>>> >>>> private MapState<String, Object> timedGroupKeyState; >>>> private MapState<String, Object> globalGroupKeyState; >>>> private final MapStateDescriptor<String, Object> >>>> timedMapKeyStateDescriptor = >>>> new MapStateDescriptor<>("timedGroupKeyState", >>>> String.class, Object.class); >>>> private final MapStateDescriptor<String, Object> >>>> globalMapKeyStateDescriptor = >>>> new MapStateDescriptor<>("globalGroupKeyState", >>>> String.class, Object.class); >>>> >>>> >>>> public void open(Configuration ..) { >>>> timedGroupKeyState = >>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor); >>>> globalGroupKeyState = >>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor); >>>> } >>>> >>>> public void process(MonitoringTuple currKey, Context context, >>>> Iterable<Map<String, Object>> elements, >>>> Collector<Map<String, Object>> out) throws >>>> Exception { >>>> logger.info("Entered MGroupingAggregateWindowProcessing - >>>> process interval:{}, currKey:{}", interval, currKey); >>>> timedGroupKeyState = >>>> context.windowState().getMapState(timedMapKeyStateDescriptor); >>>> globalGroupKeyState = >>>> context.globalState().getMapState(globalMapKeyStateDescriptor); >>>> ... >>>> //get data fromm state >>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey); >>>> >>>> //how do i push the data out every 5 mins to the sink during the 4 hr >>>> window ?? >>>> >>>> } >>>> >>> >>> >>> >>> >>> >>> >>> >>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> Need to calculate a 4 hour time window for count, sum with current >>>> calculated results being output every 5 mins. >>>> How do i do that ? >>>> Currently, I calculate results for 5 sec and 5 min time windows fine on >>>> the KeyedStream. >>>> >>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow >>>>> = Time.seconds(timeIntervalL); >>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream = >>>>> kinesisStream.keyBy(...); >>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow> >>>>> windowStream = >>>>> monitoringTupleKeyedStream >>>>> .timeWindow(timeWindow); >>>>> DataStream<....> enrichedMGStream = windowStream.aggregate( >>>>> new MGroupingWindowAggregateClass(...), >>>>> new MGroupingAggregateClass(....)) >>>>> .map(new Monitoring...(...)); >>>>> enrichedMGStream.addSink(..); >>>>> >>>> >>>> >>>> TIA, >>>> Vijay >>>> >>>