Hi Vijay, I managed by using "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the processElement method and clearing the state on the onTimer method. This is my program [1].
[1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java Kind Regards, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Vijay, > > When using windows, you may use the 'trigger' to set a Custom Trigger > which would trigger your *ProcessWindowFunction* accordingly. > > In your case, you would probably use: > >> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))* >> > > Thanks, > Rafi > > > On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> I am also implementing the ProcessWindowFunction and accessing the >> windowState to get data but how do i push data out every 5 mins during a 4 >> hr time window ?? I am adding a globalState to handle the 4 hr window ??? >> Or should I still use the context.windowState even for the 4 hr window ? >> >> public class MGroupingAggregateClass extends ProcessWindowFunction<....> >>> { >>> >>> private MapState<String, Object> timedGroupKeyState; >>> private MapState<String, Object> globalGroupKeyState; >>> private final MapStateDescriptor<String, Object> >>> timedMapKeyStateDescriptor = >>> new MapStateDescriptor<>("timedGroupKeyState", >>> String.class, Object.class); >>> private final MapStateDescriptor<String, Object> >>> globalMapKeyStateDescriptor = >>> new MapStateDescriptor<>("globalGroupKeyState", >>> String.class, Object.class); >>> >>> >>> public void open(Configuration ..) { >>> timedGroupKeyState = >>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor); >>> globalGroupKeyState = >>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor); >>> } >>> >>> public void process(MonitoringTuple currKey, Context context, >>> Iterable<Map<String, Object>> elements, >>> Collector<Map<String, Object>> out) throws >>> Exception { >>> logger.info("Entered MGroupingAggregateWindowProcessing - >>> process interval:{}, currKey:{}", interval, currKey); >>> timedGroupKeyState = >>> context.windowState().getMapState(timedMapKeyStateDescriptor); >>> globalGroupKeyState = >>> context.globalState().getMapState(globalMapKeyStateDescriptor); >>> ... >>> //get data fromm state >>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey); >>> >>> //how do i push the data out every 5 mins to the sink during the 4 hr >>> window ?? >>> >>> } >>> >> >> >> >> >> >> >> >> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Hi, >>> Need to calculate a 4 hour time window for count, sum with current >>> calculated results being output every 5 mins. >>> How do i do that ? >>> Currently, I calculate results for 5 sec and 5 min time windows fine on >>> the KeyedStream. >>> >>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow = >>>> Time.seconds(timeIntervalL); >>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream = >>>> kinesisStream.keyBy(...); >>>> final WindowedStream<Map<String, Object>, ...., TimeWindow> >>>> windowStream = >>>> monitoringTupleKeyedStream >>>> .timeWindow(timeWindow); >>>> DataStream<....> enrichedMGStream = windowStream.aggregate( >>>> new MGroupingWindowAggregateClass(...), >>>> new MGroupingAggregateClass(....)) >>>> .map(new Monitoring...(...)); >>>> enrichedMGStream.addSink(..); >>>> >>> >>> >>> TIA, >>> Vijay >>> >>