I am also implementing the ProcessWindowFunction and accessing the
windowState to get data but how do i push data out every 5 mins during a 4
hr time window ?? I am adding a globalState to handle the 4 hr window ???
Or should I still use the context.windowState even for the 4 hr window ?

public  class MGroupingAggregateClass extends ProcessWindowFunction<....> {
>
> private MapState<String, Object> timedGroupKeyState;
> private MapState<String, Object> globalGroupKeyState;
> private final MapStateDescriptor<String, Object>
> timedMapKeyStateDescriptor =
>        new MapStateDescriptor<>("timedGroupKeyState",
>                String.class, Object.class);
> private final MapStateDescriptor<String, Object>
> globalMapKeyStateDescriptor =
>            new MapStateDescriptor<>("globalGroupKeyState",
>                    String.class, Object.class);
>
>
> public void open(Configuration ..) {
> timedGroupKeyState =
> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
> globalGroupKeyState =
> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
> }
>
> public void process(MonitoringTuple currKey, Context context,
> Iterable<Map<String, Object>> elements,
>                        Collector<Map<String, Object>> out) throws
> Exception {
>        logger.info("Entered MGroupingAggregateWindowProcessing - process
> interval:{}, currKey:{}", interval, currKey);
>        timedGroupKeyState =
> context.windowState().getMapState(timedMapKeyStateDescriptor);
>        globalGroupKeyState =
> context.globalState().getMapState(globalMapKeyStateDescriptor);
> ...
> //get data fromm state
> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>
> //how do i push the data out every 5 mins to the sink during the 4 hr
> window ??
>
> }
>







On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> Need to calculate a 4 hour time window for count, sum with current
> calculated results being output every 5 mins.
> How do i do that ?
> Currently, I calculate results for 5 sec and 5 min time windows fine on
> the KeyedStream.
>
> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>> Time.seconds(timeIntervalL);
>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>> kinesisStream.keyBy(...);
>> final WindowedStream<Map<String, Object>, ...., TimeWindow> windowStream =
>>         monitoringTupleKeyedStream
>>                 .timeWindow(timeWindow);
>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>         new MGroupingWindowAggregateClass(...),
>>         new MGroupingAggregateClass(....))
>>         .map(new Monitoring...(...));
>> enrichedMGStream.addSink(..);
>>
>
>
> TIA,
> Vijay
>

Reply via email to