Hi, I have implemented the below solution and its working fine but the biggest problem with this is if no event coming for the user after 30 min then I am not able to trigger because I am checking time diff from upcoming events. So when the next event comes than only it triggers but I want it to trigger just after 30 mins.
So please help me to improve this and how to solve the above problem. public class DemandSessionFlatMap extends RichFlatMapFunction<Tuple2<Long, GenericRecord>, DemandSessionSummaryTuple> { private static final Logger LOGGER = LoggerFactory.getLogger(DemandSessionFlatMap.class); private transient ValueState<Tuple3<String, Long, Long>> timeState; // maintain session_id starttime and endtime private transient MapState<String, DemandSessionSummaryTuple> sessionSummary; // map for hex9 and summarytuple @Override public void open(Configuration config) { ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor = new ValueStateDescriptor<>( "time_state", // the state name TypeInformation.of(new TypeHint<Tuple3<String, Long, Long>>() { }), // type information Tuple3.of(null, 0L, 0L)); // default value of the state, if nothing was set timeState = getRuntimeContext().getState(timeDescriptor); MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor = new MapStateDescriptor<String, DemandSessionSummaryTuple>("demand_session", TypeInformation.of(new TypeHint<String>() { }), TypeInformation.of(new TypeHint<DemandSessionSummaryTuple>() { })); sessionSummary = getRuntimeContext().getMapState(descriptor); } @Override public void flatMap(Tuple2<Long, GenericRecord> recordTuple2, Collector<DemandSessionSummaryTuple> collector) throws Exception { GenericRecord record = recordTuple2.f1; String event_name = record.get("event_name").toString(); long event_ts = (Long) record.get("event_ts"); Tuple3<String, Long, Long> currentTimeState = timeState.value(); if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 0) { currentTimeState.f1 = event_ts; String demandSessionId = UUID.randomUUID().toString(); currentTimeState.f0 = demandSessionId; } long timeDiff = event_ts - currentTimeState.f1; if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) { Tuple3<String, Long, Long> finalCurrentTimeState = currentTimeState; sessionSummary.entries().forEach( tuple ->{ String key = tuple.getKey(); DemandSessionSummaryTuple sessionSummaryTuple = tuple.getValue(); try { sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2); collector.collect(sessionSummaryTuple); } catch (Exception e) { e.printStackTrace(); } }); timeState.clear(); sessionSummary.clear(); currentTimeState = timeState.value(); } if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 0) { currentTimeState.f1 = event_ts; String demandSessionId = UUID.randomUUID().toString(); currentTimeState.f0 = demandSessionId; } currentTimeState.f2 = event_ts; if (currentTimeState.f1 > 0) { String search_hex9 = record.get("search_hex9") != null ? record.get("search_hex9").toString() : null; DemandSessionSummaryTuple currentTuple = sessionSummary.get(search_hex9) != null ? sessionSummary.get(search_hex9) : new DemandSessionSummaryTuple(); if (sessionSummary.get(search_hex9) == null) { currentTuple.setSearchHex9(search_hex9); currentTuple.setUserId(recordTuple2.f0); currentTuple.setStartTime(currentTimeState.f1); currentTuple.setDemandSessionId(currentTimeState.f0); } if (event_name.equals("search_list_keyless")) { currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1); SearchSummaryCalculation(record, currentTuple); } sessionSummary.put(search_hex9, currentTuple); } timeState.update(currentTimeState); } On Sun, May 24, 2020 at 10:57 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi, > > First sorry that I'm not expert on Window and please correct me if > I'm wrong, but from my side, it seems the assigner might also be a problem > in addition to the trigger: currently Flink window assigner should be all > based on time (processing time or event time), and it might be hard to > implement an event-driven window assigner that start to assign elements to > a window after received some elements. > > What comes to me is that a possible alternative method is to use the > low-level *KeyedProcessFunction* directly: you may register a timer 30 > mins later when received the "*search*" event and write the time of > search event into the state. Then for the following events, they will be > saved to the state since the flag is set. After received the "*start*" > event or the timer is triggered, you could load all the events from the > states, do the aggregation and cancel the timer if it is triggered by " > *start*" event. A simpler case is [1] and it does not consider stop the > aggreation when received special event, but it seems that the logic could > be added to the case. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example > > Best, > Yun > > > > ------------------Original Mail ------------------ > *Sender:*aj <ajainje...@gmail.com> > *Send Date:*Sun May 24 01:10:55 2020 > *Recipients:*Tzu-Li (Gordon) Tai <tzuli...@apache.org> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Flink Window with multiple trigger condition > >> >> I am still not able to get much after reading the stuff. Please help with >> some basic code to start to build this window and trigger. >> >> Another option I am thinking is I just use a Richflatmap function and use >> the keyed state to build this logic. Is that the correct approach? >> >> >> >> On Fri, May 22, 2020 at 4:52 PM aj <ajainje...@gmail.com> wrote: >> >>> >>> >>> I was also thinking to have a processing time window but that will not >>> work for me. I want to start the window when the user "*search*" event >>> arrives. So for each user window will start from the *search* event. >>> The Tumbling window has fixed start end time so that will not be >>> suitable in my case. >>> >>> >>> >>> >>> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai < >>> tzuli...@apache.org> wrote: >>> >>>> Hi, >>>> >>>> To achieve what you have in mind, I think what you have to do is to use >>>> a >>>> processing time window of 30 mins, and have a custom trigger that >>>> matches >>>> the "start" event in the `onElement` method and return >>>> TriggerResult.FIRE_AND_PURGE. >>>> >>>> That way, the window fires either when the processing time has passed, >>>> or >>>> the start event was recieved. >>>> >>>> Cheers, >>>> Gordon >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> Mob. : +91- 8588817877 >>> Skype : anuj.jain07 >>> <http://www.oracle.com/> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>