Hi,
     I think you could use timer to achieve that. In processFunction you could 
register a timer at specific time (event time or processing time) and get 
callbacked at that point. It could be registered like 
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    More details on timer could be found in [1] and an example is in [2]. In 
this example, a timer is registered in the last line of the processElement 
method, and the callback is implemented by override the onTimer method.

   [1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
   [2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example



 ------------------Original Mail ------------------
Sender:aj <ajainje...@gmail.com>
Send Date:Fri May 29 02:07:33 2020
Recipients:Yun Gao <yungao...@aliyun.com>
CC:user <user@flink.apache.org>
Subject:Re: Re: Flink Window with multiple trigger condition

Hi,

I have implemented the below solution and its working fine but the biggest 
problem with this is if no event coming for the user after 30 min then I am not 
able to trigger because I am checking
time diff from upcoming events. So when the next event comes than only it 
triggers but I want it to trigger just after 30 mins. 

So please help me to improve this and how to solve the above problem.



public class DemandSessionFlatMap extends RichFlatMapFunction<Tuple2<Long, 
GenericRecord>, DemandSessionSummaryTuple> {

    private static final Logger LOGGER = 
LoggerFactory.getLogger(DemandSessionFlatMap.class);

    private transient ValueState<Tuple3<String, Long, Long>> timeState; // 
maintain session_id starttime and endtime 
    private transient MapState<String, DemandSessionSummaryTuple> 
sessionSummary; // map for hex9 and summarytuple

    @Override
    public void open(Configuration config) {

        ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor =
                new ValueStateDescriptor<>(
                        "time_state", // the state name
                        TypeInformation.of(new TypeHint<Tuple3<String, Long, 
Long>>() {
                        }), // type information
                        Tuple3.of(null, 0L, 0L)); // default value of the 
state, if nothing was set
        timeState = getRuntimeContext().getState(timeDescriptor);

        MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor =
                new MapStateDescriptor<String, 
DemandSessionSummaryTuple>("demand_session",
                        TypeInformation.of(new TypeHint<String>() {
                        }), TypeInformation.of(new 
TypeHint<DemandSessionSummaryTuple>() {
                }));
        sessionSummary = getRuntimeContext().getMapState(descriptor);

    }

    @Override
    public void flatMap(Tuple2<Long, GenericRecord> recordTuple2, 
Collector<DemandSessionSummaryTuple> collector) throws Exception {
        GenericRecord record = recordTuple2.f1;
        String event_name = record.get("event_name").toString();
        long event_ts = (Long) record.get("event_ts");
        Tuple3<String, Long, Long> currentTimeState = timeState.value();

        if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 
0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }

        long timeDiff = event_ts - currentTimeState.f1;

        if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) {
            Tuple3<String, Long, Long> finalCurrentTimeState = currentTimeState;
            sessionSummary.entries().forEach( tuple ->{
                String key = tuple.getKey();
                DemandSessionSummaryTuple sessionSummaryTuple = 
tuple.getValue();
                try {
                    sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
                    collector.collect(sessionSummaryTuple);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            });
            timeState.clear();
            sessionSummary.clear();
            currentTimeState = timeState.value();
        }

        if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 
0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }
        currentTimeState.f2 = event_ts;

        if (currentTimeState.f1 > 0) {
            String search_hex9 = record.get("search_hex9") != null ? 
record.get("search_hex9").toString() : null;
            DemandSessionSummaryTuple currentTuple = 
sessionSummary.get(search_hex9) != null ? sessionSummary.get(search_hex9) : new 
DemandSessionSummaryTuple();

            if (sessionSummary.get(search_hex9) == null) {
                currentTuple.setSearchHex9(search_hex9);
                currentTuple.setUserId(recordTuple2.f0);
                currentTuple.setStartTime(currentTimeState.f1);
                currentTuple.setDemandSessionId(currentTimeState.f0);
            }

            if (event_name.equals("search_list_keyless")) {
                currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
                SearchSummaryCalculation(record, currentTuple);
            }
            sessionSummary.put(search_hex9, currentTuple);
        }
        timeState.update(currentTimeState);
    }





On Sun, May 24, 2020 at 10:57 PM Yun Gao <yungao...@aliyun.com> wrote:

Hi,

       First sorry that I'm not expert on Window and please correct me if I'm 
wrong, but from my side, it seems the assigner might also be a problem in 
addition to the trigger: currently Flink window assigner should be all based on 
time (processing time or event time), and it might be hard to implement an 
event-driven window assigner that start to assign elements to a window after 
received some elements. 
      What comes to me is that a possible alternative method is to use the 
low-level KeyedProcessFunction directly:  you may register a timer 30 mins 
later when received the "search" event and write the time of search event into 
the state. Then for the following events, they will be saved to the state since 
the flag is set. After received the "start" event or the timer is triggered, 
you could load all the events from the states, do the aggregation and cancel 
the timer if it is triggered by "start" event. A simpler case is [1] and it 
does not consider stop the aggreation when received special event, but it seems 
that the logic could be added to the case.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example

Best,
 Yun




 ------------------Original Mail ------------------
Sender:aj <ajainje...@gmail.com>
Send Date:Sun May 24 01:10:55 2020
Recipients:Tzu-Li (Gordon) Tai <tzuli...@apache.org>
CC:user <user@flink.apache.org>
Subject:Re: Flink Window with multiple trigger condition


I am still not able to get much after reading the stuff. Please help with some 
basic code to start to build this window and trigger. 

Another option I am thinking is I just use a Richflatmap function and use the 
keyed state to build this logic. Is that the correct approach? 



On Fri, May 22, 2020 at 4:52 PM aj <ajainje...@gmail.com> wrote:


I was also thinking to have a processing time window but that will not work for 
me. I want to start the window when the user  "search" event arrives. So for 
each user window will start from the search event. 
 The Tumbling window has fixed start end time so that will not be suitable in 
my case. 




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07



Reply via email to