I am also implementing the ProcessWindowFunction and accessing the windowState to get data but how do i push data out every 5 mins during a 4 hr time window ?? I am adding a globalState to handle the 4 hr window ??? Or should I still use the context.windowState even for the 4 hr window ?
public class MGroupingAggregateClass extends ProcessWindowFunction<....> { > > private MapState<String, Object> timedGroupKeyState; > private MapState<String, Object> globalGroupKeyState; > private final MapStateDescriptor<String, Object> > timedMapKeyStateDescriptor = > new MapStateDescriptor<>("timedGroupKeyState", > String.class, Object.class); > private final MapStateDescriptor<String, Object> > globalMapKeyStateDescriptor = > new MapStateDescriptor<>("globalGroupKeyState", > String.class, Object.class); > > > public void open(Configuration ..) { > timedGroupKeyState = > getRuntimeContext().getMapState(timedMapKeyStateDescriptor); > globalGroupKeyState = > getRuntimeContext().getMapState(globalMapKeyStateDescriptor); > } > > public void process(MonitoringTuple currKey, Context context, > Iterable<Map<String, Object>> elements, > Collector<Map<String, Object>> out) throws > Exception { > logger.info("Entered MGroupingAggregateWindowProcessing - process > interval:{}, currKey:{}", interval, currKey); > timedGroupKeyState = > context.windowState().getMapState(timedMapKeyStateDescriptor); > globalGroupKeyState = > context.globalState().getMapState(globalMapKeyStateDescriptor); > ... > //get data fromm state > Object timedGroupStateObj = timedGroupKeyState.get(groupKey); > > //how do i push the data out every 5 mins to the sink during the 4 hr > window ?? > > } > On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Need to calculate a 4 hour time window for count, sum with current > calculated results being output every 5 mins. > How do i do that ? > Currently, I calculate results for 5 sec and 5 min time windows fine on > the KeyedStream. > > Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow = >> Time.seconds(timeIntervalL); >> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream = >> kinesisStream.keyBy(...); >> final WindowedStream<Map<String, Object>, ...., TimeWindow> windowStream = >> monitoringTupleKeyedStream >> .timeWindow(timeWindow); >> DataStream<....> enrichedMGStream = windowStream.aggregate( >> new MGroupingWindowAggregateClass(...), >> new MGroupingAggregateClass(....)) >> .map(new Monitoring...(...)); >> enrichedMGStream.addSink(..); >> > > > TIA, > Vijay >