Hi Felipe,
Thanks for the example. I will try a variation of that for mine. Is there a
specific reason to use the HyperLogLogState ?

Vijay

On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Vijay,
>
> I managed by using
> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the
> processElement method and clearing the state on the onTimer method. This is
> my program [1].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>
> Kind Regards,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> When using windows, you may use the 'trigger' to set a Custom Trigger
>> which would trigger your *ProcessWindowFunction* accordingly.
>>
>> In your case, you would probably use:
>>
>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>>>
>>
>> Thanks,
>> Rafi
>>
>>
>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> I am also implementing the ProcessWindowFunction and accessing the
>>> windowState to get data but how do i push data out every 5 mins during a 4
>>> hr time window ?? I am adding a globalState to handle the 4 hr window ???
>>> Or should I still use the context.windowState even for the 4 hr window ?
>>>
>>> public  class MGroupingAggregateClass extends
>>>> ProcessWindowFunction<....> {
>>>>
>>>> private MapState<String, Object> timedGroupKeyState;
>>>> private MapState<String, Object> globalGroupKeyState;
>>>> private final MapStateDescriptor<String, Object>
>>>> timedMapKeyStateDescriptor =
>>>>        new MapStateDescriptor<>("timedGroupKeyState",
>>>>                String.class, Object.class);
>>>> private final MapStateDescriptor<String, Object>
>>>> globalMapKeyStateDescriptor =
>>>>            new MapStateDescriptor<>("globalGroupKeyState",
>>>>                    String.class, Object.class);
>>>>
>>>>
>>>> public void open(Configuration ..) {
>>>> timedGroupKeyState =
>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>>>> globalGroupKeyState =
>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>>>> }
>>>>
>>>> public void process(MonitoringTuple currKey, Context context,
>>>> Iterable<Map<String, Object>> elements,
>>>>                        Collector<Map<String, Object>> out) throws
>>>> Exception {
>>>>        logger.info("Entered MGroupingAggregateWindowProcessing -
>>>> process interval:{}, currKey:{}", interval, currKey);
>>>>        timedGroupKeyState =
>>>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>>>        globalGroupKeyState =
>>>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>>>> ...
>>>> //get data fromm state
>>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>>>
>>>> //how do i push the data out every 5 mins to the sink during the 4 hr
>>>> window ??
>>>>
>>>> }
>>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> Need to calculate a 4 hour time window for count, sum with current
>>>> calculated results being output every 5 mins.
>>>> How do i do that ?
>>>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>>>> the KeyedStream.
>>>>
>>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow
>>>>> = Time.seconds(timeIntervalL);
>>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>>>> kinesisStream.keyBy(...);
>>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow>
>>>>> windowStream =
>>>>>         monitoringTupleKeyedStream
>>>>>                 .timeWindow(timeWindow);
>>>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>>>         new MGroupingWindowAggregateClass(...),
>>>>>         new MGroupingAggregateClass(....))
>>>>>         .map(new Monitoring...(...));
>>>>> enrichedMGStream.addSink(..);
>>>>>
>>>>
>>>>
>>>> TIA,
>>>> Vijay
>>>>
>>>

Reply via email to