Hi Vijay, When using windows, you may use the 'trigger' to set a Custom Trigger which would trigger your *ProcessWindowFunction* accordingly.
In your case, you would probably use: > *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))* > Thanks, Rafi On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com> wrote: > I am also implementing the ProcessWindowFunction and accessing the > windowState to get data but how do i push data out every 5 mins during a 4 > hr time window ?? I am adding a globalState to handle the 4 hr window ??? > Or should I still use the context.windowState even for the 4 hr window ? > > public class MGroupingAggregateClass extends ProcessWindowFunction<....> { >> >> private MapState<String, Object> timedGroupKeyState; >> private MapState<String, Object> globalGroupKeyState; >> private final MapStateDescriptor<String, Object> >> timedMapKeyStateDescriptor = >> new MapStateDescriptor<>("timedGroupKeyState", >> String.class, Object.class); >> private final MapStateDescriptor<String, Object> >> globalMapKeyStateDescriptor = >> new MapStateDescriptor<>("globalGroupKeyState", >> String.class, Object.class); >> >> >> public void open(Configuration ..) { >> timedGroupKeyState = >> getRuntimeContext().getMapState(timedMapKeyStateDescriptor); >> globalGroupKeyState = >> getRuntimeContext().getMapState(globalMapKeyStateDescriptor); >> } >> >> public void process(MonitoringTuple currKey, Context context, >> Iterable<Map<String, Object>> elements, >> Collector<Map<String, Object>> out) throws >> Exception { >> logger.info("Entered MGroupingAggregateWindowProcessing - process >> interval:{}, currKey:{}", interval, currKey); >> timedGroupKeyState = >> context.windowState().getMapState(timedMapKeyStateDescriptor); >> globalGroupKeyState = >> context.globalState().getMapState(globalMapKeyStateDescriptor); >> ... >> //get data fromm state >> Object timedGroupStateObj = timedGroupKeyState.get(groupKey); >> >> //how do i push the data out every 5 mins to the sink during the 4 hr >> window ?? >> >> } >> > > > > > > > > On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi, >> Need to calculate a 4 hour time window for count, sum with current >> calculated results being output every 5 mins. >> How do i do that ? >> Currently, I calculate results for 5 sec and 5 min time windows fine on >> the KeyedStream. >> >> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow = >>> Time.seconds(timeIntervalL); >>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream = >>> kinesisStream.keyBy(...); >>> final WindowedStream<Map<String, Object>, ...., TimeWindow> windowStream >>> = >>> monitoringTupleKeyedStream >>> .timeWindow(timeWindow); >>> DataStream<....> enrichedMGStream = windowStream.aggregate( >>> new MGroupingWindowAggregateClass(...), >>> new MGroupingAggregateClass(....)) >>> .map(new Monitoring...(...)); >>> enrichedMGStream.addSink(..); >>> >> >> >> TIA, >> Vijay >> >