Thanks Yun.

I have converted the code to use a keyed-processed function rather than a
flatMap and using register timer it worked.

On Fri, May 29, 2020 at 11:13 AM Yun Gao <yungao...@aliyun.com> wrote:

> Hi,
>
>      I think you could use *timer* to achieve that. In *processFunction*
> you could register a timer at specific time (event time or processing time)
> and get callbacked at that point. It could be registered like
>
>
> ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
>
>
>     More details on timer could be found in [1] and an example is in [2].
> In this example, a timer is registered in the last line of the
> *processElement* method, and the callback is implemented by override the
> *onTimer* method.
>
>    [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
>    [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example
>
>
> ------------------Original Mail ------------------
> *Sender:*aj <ajainje...@gmail.com>
> *Send Date:*Fri May 29 02:07:33 2020
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Re: Flink Window with multiple trigger condition
>
>> Hi,
>>
>> I have implemented the below solution and its working fine but the
>> biggest problem with this is if no event coming for the user after 30 min
>> then I am not able to trigger because I am checking
>> time diff from upcoming events. So when the next event comes than only it
>> triggers but I want it to trigger just after 30 mins.
>>
>> So please help me to improve this and how to solve the above problem.
>>
>>
>>
>> public class DemandSessionFlatMap extends RichFlatMapFunction<Tuple2<Long, 
>> GenericRecord>, DemandSessionSummaryTuple> {
>>
>>     private static final Logger LOGGER = 
>> LoggerFactory.getLogger(DemandSessionFlatMap.class);
>>
>>     private transient ValueState<Tuple3<String, Long, Long>> timeState; // 
>> maintain session_id starttime and endtime
>>     private transient MapState<String, DemandSessionSummaryTuple> 
>> sessionSummary; // map for hex9 and summarytuple
>>
>>     @Override
>>     public void open(Configuration config) {
>>
>>         ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor =
>>                 new ValueStateDescriptor<>(
>>                         "time_state", // the state name
>>                         TypeInformation.of(new TypeHint<Tuple3<String, Long, 
>> Long>>() {
>>                         }), // type information
>>                         Tuple3.of(null, 0L, 0L)); // default value of the 
>> state, if nothing was set
>>         timeState = getRuntimeContext().getState(timeDescriptor);
>>
>>         MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor =
>>                 new MapStateDescriptor<String, 
>> DemandSessionSummaryTuple>("demand_session",
>>                         TypeInformation.of(new TypeHint<String>() {
>>                         }), TypeInformation.of(new 
>> TypeHint<DemandSessionSummaryTuple>() {
>>                 }));
>>         sessionSummary = getRuntimeContext().getMapState(descriptor);
>>
>>     }
>>
>>     @Override
>>     public void flatMap(Tuple2<Long, GenericRecord> recordTuple2, 
>> Collector<DemandSessionSummaryTuple> collector) throws Exception {
>>         GenericRecord record = recordTuple2.f1;
>>         String event_name = record.get("event_name").toString();
>>         long event_ts = (Long) record.get("event_ts");
>>         Tuple3<String, Long, Long> currentTimeState = timeState.value();
>>
>>         if (event_name.equals("search_list_keyless") && currentTimeState.f1 
>> == 0) {
>>             currentTimeState.f1 = event_ts;
>>             String demandSessionId = UUID.randomUUID().toString();
>>             currentTimeState.f0 = demandSessionId;
>>         }
>>
>>         long timeDiff = event_ts - currentTimeState.f1;
>>
>>         if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) {
>>             Tuple3<String, Long, Long> finalCurrentTimeState = 
>> currentTimeState;
>>             sessionSummary.entries().forEach( tuple ->{
>>                 String key = tuple.getKey();
>>                 DemandSessionSummaryTuple sessionSummaryTuple = 
>> tuple.getValue();
>>                 try {
>>                     sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
>>                     collector.collect(sessionSummaryTuple);
>>                 } catch (Exception e) {
>>                     e.printStackTrace();
>>                 }
>>
>>             });
>>             timeState.clear();
>>             sessionSummary.clear();
>>             currentTimeState = timeState.value();
>>         }
>>
>>         if (event_name.equals("search_list_keyless") && currentTimeState.f1 
>> == 0) {
>>             currentTimeState.f1 = event_ts;
>>             String demandSessionId = UUID.randomUUID().toString();
>>             currentTimeState.f0 = demandSessionId;
>>         }
>>         currentTimeState.f2 = event_ts;
>>
>>         if (currentTimeState.f1 > 0) {
>>             String search_hex9 = record.get("search_hex9") != null ? 
>> record.get("search_hex9").toString() : null;
>>             DemandSessionSummaryTuple currentTuple = 
>> sessionSummary.get(search_hex9) != null ? sessionSummary.get(search_hex9) : 
>> new DemandSessionSummaryTuple();
>>
>>             if (sessionSummary.get(search_hex9) == null) {
>>                 currentTuple.setSearchHex9(search_hex9);
>>                 currentTuple.setUserId(recordTuple2.f0);
>>                 currentTuple.setStartTime(currentTimeState.f1);
>>                 currentTuple.setDemandSessionId(currentTimeState.f0);
>>             }
>>
>>             if (event_name.equals("search_list_keyless")) {
>>                 currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 
>> 1);
>>                 SearchSummaryCalculation(record, currentTuple);
>>             }
>>             sessionSummary.put(search_hex9, currentTuple);
>>         }
>>         timeState.update(currentTimeState);
>>     }
>>
>>
>>
>>
>>
>>
>> On Sun, May 24, 2020 at 10:57 PM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>> Hi,
>>>
>>>        First sorry that I'm not expert on Window and please correct me
>>> if I'm wrong, but from my side, it seems the assigner might also be a
>>> problem in addition to the trigger: currently Flink window assigner should
>>> be all based on time (processing time or event time), and it might be hard
>>> to implement an event-driven window assigner that start to assign elements
>>> to a window after received some elements.
>>>
>>>       What comes to me is that a possible alternative method is to use
>>> the low-level *KeyedProcessFunction* directly:  you may register a
>>> timer 30 mins later when received the "*search*" event and write the
>>> time of search event into the state. Then for the following events, they
>>> will be saved to the state since the flag is set. After received the "
>>> *start*" event or the timer is triggered, you could load all the events
>>> from the states, do the aggregation and cancel the timer if it is triggered
>>> by "*start*" event. A simpler case is [1] and it does not consider stop
>>> the aggreation when received special event, but it seems that the logic
>>> could be added to the case.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*aj <ajainje...@gmail.com>
>>> *Send Date:*Sun May 24 01:10:55 2020
>>> *Recipients:*Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>>> *CC:*user <user@flink.apache.org>
>>> *Subject:*Re: Flink Window with multiple trigger condition
>>>
>>>>
>>>> I am still not able to get much after reading the stuff. Please help
>>>> with some basic code to start to build this window and trigger.
>>>>
>>>> Another option I am thinking is I just use a Richflatmap function and
>>>> use the keyed state to build this logic. Is that the correct approach?
>>>>
>>>>
>>>>
>>>> On Fri, May 22, 2020 at 4:52 PM aj <ajainje...@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> I was also thinking to have a processing time window but that will not
>>>>> work for me. I want to start the window when the user  "*search*"
>>>>> event arrives. So for each user window will start from the *search*
>>>>> event.
>>>>>  The Tumbling window has fixed start end time so that will not be
>>>>> suitable in my case.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <
>>>>> tzuli...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> To achieve what you have in mind, I think what you have to do is to
>>>>>> use a
>>>>>> processing time window of 30 mins, and have a custom trigger that
>>>>>> matches
>>>>>> the "start" event in the `onElement` method and return
>>>>>> TriggerResult.FIRE_AND_PURGE.
>>>>>>
>>>>>> That way, the window fires either when the processing time has
>>>>>> passed, or
>>>>>> the start event was recieved.
>>>>>>
>>>>>> Cheers,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to