Thanks Yun. I have converted the code to use a keyed-processed function rather than a flatMap and using register timer it worked.
On Fri, May 29, 2020 at 11:13 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi, > > I think you could use *timer* to achieve that. In *processFunction* > you could register a timer at specific time (event time or processing time) > and get callbacked at that point. It could be registered like > > > ctx.timerService().registerEventTimeTimer(current.lastModified + 60000); > > > More details on timer could be found in [1] and an example is in [2]. > In this example, a timer is registered in the last line of the > *processElement* method, and the callback is implemented by override the > *onTimer* method. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example > > > ------------------Original Mail ------------------ > *Sender:*aj <ajainje...@gmail.com> > *Send Date:*Fri May 29 02:07:33 2020 > *Recipients:*Yun Gao <yungao...@aliyun.com> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Re: Flink Window with multiple trigger condition > >> Hi, >> >> I have implemented the below solution and its working fine but the >> biggest problem with this is if no event coming for the user after 30 min >> then I am not able to trigger because I am checking >> time diff from upcoming events. So when the next event comes than only it >> triggers but I want it to trigger just after 30 mins. >> >> So please help me to improve this and how to solve the above problem. >> >> >> >> public class DemandSessionFlatMap extends RichFlatMapFunction<Tuple2<Long, >> GenericRecord>, DemandSessionSummaryTuple> { >> >> private static final Logger LOGGER = >> LoggerFactory.getLogger(DemandSessionFlatMap.class); >> >> private transient ValueState<Tuple3<String, Long, Long>> timeState; // >> maintain session_id starttime and endtime >> private transient MapState<String, DemandSessionSummaryTuple> >> sessionSummary; // map for hex9 and summarytuple >> >> @Override >> public void open(Configuration config) { >> >> ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor = >> new ValueStateDescriptor<>( >> "time_state", // the state name >> TypeInformation.of(new TypeHint<Tuple3<String, Long, >> Long>>() { >> }), // type information >> Tuple3.of(null, 0L, 0L)); // default value of the >> state, if nothing was set >> timeState = getRuntimeContext().getState(timeDescriptor); >> >> MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor = >> new MapStateDescriptor<String, >> DemandSessionSummaryTuple>("demand_session", >> TypeInformation.of(new TypeHint<String>() { >> }), TypeInformation.of(new >> TypeHint<DemandSessionSummaryTuple>() { >> })); >> sessionSummary = getRuntimeContext().getMapState(descriptor); >> >> } >> >> @Override >> public void flatMap(Tuple2<Long, GenericRecord> recordTuple2, >> Collector<DemandSessionSummaryTuple> collector) throws Exception { >> GenericRecord record = recordTuple2.f1; >> String event_name = record.get("event_name").toString(); >> long event_ts = (Long) record.get("event_ts"); >> Tuple3<String, Long, Long> currentTimeState = timeState.value(); >> >> if (event_name.equals("search_list_keyless") && currentTimeState.f1 >> == 0) { >> currentTimeState.f1 = event_ts; >> String demandSessionId = UUID.randomUUID().toString(); >> currentTimeState.f0 = demandSessionId; >> } >> >> long timeDiff = event_ts - currentTimeState.f1; >> >> if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) { >> Tuple3<String, Long, Long> finalCurrentTimeState = >> currentTimeState; >> sessionSummary.entries().forEach( tuple ->{ >> String key = tuple.getKey(); >> DemandSessionSummaryTuple sessionSummaryTuple = >> tuple.getValue(); >> try { >> sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2); >> collector.collect(sessionSummaryTuple); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> >> }); >> timeState.clear(); >> sessionSummary.clear(); >> currentTimeState = timeState.value(); >> } >> >> if (event_name.equals("search_list_keyless") && currentTimeState.f1 >> == 0) { >> currentTimeState.f1 = event_ts; >> String demandSessionId = UUID.randomUUID().toString(); >> currentTimeState.f0 = demandSessionId; >> } >> currentTimeState.f2 = event_ts; >> >> if (currentTimeState.f1 > 0) { >> String search_hex9 = record.get("search_hex9") != null ? >> record.get("search_hex9").toString() : null; >> DemandSessionSummaryTuple currentTuple = >> sessionSummary.get(search_hex9) != null ? sessionSummary.get(search_hex9) : >> new DemandSessionSummaryTuple(); >> >> if (sessionSummary.get(search_hex9) == null) { >> currentTuple.setSearchHex9(search_hex9); >> currentTuple.setUserId(recordTuple2.f0); >> currentTuple.setStartTime(currentTimeState.f1); >> currentTuple.setDemandSessionId(currentTimeState.f0); >> } >> >> if (event_name.equals("search_list_keyless")) { >> currentTuple.setTotalSearch(currentTuple.getTotalSearch() + >> 1); >> SearchSummaryCalculation(record, currentTuple); >> } >> sessionSummary.put(search_hex9, currentTuple); >> } >> timeState.update(currentTimeState); >> } >> >> >> >> >> >> >> On Sun, May 24, 2020 at 10:57 PM Yun Gao <yungao...@aliyun.com> wrote: >> >>> Hi, >>> >>> First sorry that I'm not expert on Window and please correct me >>> if I'm wrong, but from my side, it seems the assigner might also be a >>> problem in addition to the trigger: currently Flink window assigner should >>> be all based on time (processing time or event time), and it might be hard >>> to implement an event-driven window assigner that start to assign elements >>> to a window after received some elements. >>> >>> What comes to me is that a possible alternative method is to use >>> the low-level *KeyedProcessFunction* directly: you may register a >>> timer 30 mins later when received the "*search*" event and write the >>> time of search event into the state. Then for the following events, they >>> will be saved to the state since the flag is set. After received the " >>> *start*" event or the timer is triggered, you could load all the events >>> from the states, do the aggregation and cancel the timer if it is triggered >>> by "*start*" event. A simpler case is [1] and it does not consider stop >>> the aggreation when received special event, but it seems that the logic >>> could be added to the case. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example >>> >>> Best, >>> Yun >>> >>> >>> >>> ------------------Original Mail ------------------ >>> *Sender:*aj <ajainje...@gmail.com> >>> *Send Date:*Sun May 24 01:10:55 2020 >>> *Recipients:*Tzu-Li (Gordon) Tai <tzuli...@apache.org> >>> *CC:*user <user@flink.apache.org> >>> *Subject:*Re: Flink Window with multiple trigger condition >>> >>>> >>>> I am still not able to get much after reading the stuff. Please help >>>> with some basic code to start to build this window and trigger. >>>> >>>> Another option I am thinking is I just use a Richflatmap function and >>>> use the keyed state to build this logic. Is that the correct approach? >>>> >>>> >>>> >>>> On Fri, May 22, 2020 at 4:52 PM aj <ajainje...@gmail.com> wrote: >>>> >>>>> >>>>> >>>>> I was also thinking to have a processing time window but that will not >>>>> work for me. I want to start the window when the user "*search*" >>>>> event arrives. So for each user window will start from the *search* >>>>> event. >>>>> The Tumbling window has fixed start end time so that will not be >>>>> suitable in my case. >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai < >>>>> tzuli...@apache.org> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> To achieve what you have in mind, I think what you have to do is to >>>>>> use a >>>>>> processing time window of 30 mins, and have a custom trigger that >>>>>> matches >>>>>> the "start" event in the `onElement` method and return >>>>>> TriggerResult.FIRE_AND_PURGE. >>>>>> >>>>>> That way, the window fires either when the processing time has >>>>>> passed, or >>>>>> the start event was recieved. >>>>>> >>>>>> Cheers, >>>>>> Gordon >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks & Regards, >>>>> Anuj Jain >>>>> Mob. : +91- 8588817877 >>>>> Skype : anuj.jain07 >>>>> <http://www.oracle.com/> >>>>> >>>>> >>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>> >>>> >>>> >>>> -- >>>> Thanks & Regards, >>>> Anuj Jain >>>> Mob. : +91- 8588817877 >>>> Skype : anuj.jain07 >>>> <http://www.oracle.com/> >>>> >>>> >>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>> >>> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>