Hi Vijay,

I managed by using
"ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
processElement method and clearing the state on the onTimer method. This is
my program [1].

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java

Kind Regards,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:

> Hi Vijay,
>
> When using windows, you may use the 'trigger' to set a Custom Trigger
> which would trigger your *ProcessWindowFunction* accordingly.
>
> In your case, you would probably use:
>
>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>
>
> Thanks,
> Rafi
>
>
> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> I am also implementing the ProcessWindowFunction and accessing the
>> windowState to get data but how do i push data out every 5 mins during a 4
>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>> Or should I still use the context.windowState even for the 4 hr window ?
>>
>> public  class MGroupingAggregateClass extends ProcessWindowFunction<....>
>>> {
>>>
>>> private MapState<String, Object> timedGroupKeyState;
>>> private MapState<String, Object> globalGroupKeyState;
>>> private final MapStateDescriptor<String, Object>
>>> timedMapKeyStateDescriptor =
>>>        new MapStateDescriptor<>("timedGroupKeyState",
>>>                String.class, Object.class);
>>> private final MapStateDescriptor<String, Object>
>>> globalMapKeyStateDescriptor =
>>>            new MapStateDescriptor<>("globalGroupKeyState",
>>>                    String.class, Object.class);
>>>
>>>
>>> public void open(Configuration ..) {
>>> timedGroupKeyState =
>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>> globalGroupKeyState =
>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>> }
>>>
>>> public void process(MonitoringTuple currKey, Context context,
>>> Iterable<Map<String, Object>> elements,
>>>                        Collector<Map<String, Object>> out) throws
>>> Exception {
>>>        logger.info("Entered MGroupingAggregateWindowProcessing -
>>> process interval:{}, currKey:{}", interval, currKey);
>>>        timedGroupKeyState =
>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>        globalGroupKeyState =
>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>> ...
>>> //get data fromm state
>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>
>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>> window ??
>>>
>>> }
>>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Need to calculate a 4 hour time window for count, sum with current
>>> calculated results being output every 5 mins.
>>> How do i do that ?
>>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>>> the KeyedStream.
>>>
>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>>>> Time.seconds(timeIntervalL);
>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>>> kinesisStream.keyBy(...);
>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow>
>>>> windowStream =
>>>>         monitoringTupleKeyedStream
>>>>                 .timeWindow(timeWindow);
>>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>>         new MGroupingWindowAggregateClass(...),
>>>>         new MGroupingAggregateClass(....))
>>>>         .map(new Monitoring...(...));
>>>> enrichedMGStream.addSink(..);
>>>>
>>>
>>>
>>> TIA,
>>> Vijay
>>>
>>

Reply via email to