Hi,

I have implemented the below solution and its working fine but the biggest
problem with this is if no event coming for the user after 30 min then I am
not able to trigger because I am checking
time diff from upcoming events. So when the next event comes than only it
triggers but I want it to trigger just after 30 mins.
So please help me to improve this and how to solve the above problem.



public class DemandSessionFlatMap extends
RichFlatMapFunction<Tuple2<Long, GenericRecord>,
DemandSessionSummaryTuple> {

    private static final Logger LOGGER =
LoggerFactory.getLogger(DemandSessionFlatMap.class);

    private transient ValueState<Tuple3<String, Long, Long>>
timeState; // maintain session_id starttime and endtime
    private transient MapState<String, DemandSessionSummaryTuple>
sessionSummary; // map for hex9 and summarytuple

    @Override
    public void open(Configuration config) {

        ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor =
                new ValueStateDescriptor<>(
                        "time_state", // the state name
                        TypeInformation.of(new TypeHint<Tuple3<String,
Long, Long>>() {
                        }), // type information
                        Tuple3.of(null, 0L, 0L)); // default value of
the state, if nothing was set
        timeState = getRuntimeContext().getState(timeDescriptor);

        MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor =
                new MapStateDescriptor<String,
DemandSessionSummaryTuple>("demand_session",
                        TypeInformation.of(new TypeHint<String>() {
                        }), TypeInformation.of(new
TypeHint<DemandSessionSummaryTuple>() {
                }));
        sessionSummary = getRuntimeContext().getMapState(descriptor);

    }

    @Override
    public void flatMap(Tuple2<Long, GenericRecord> recordTuple2,
Collector<DemandSessionSummaryTuple> collector) throws Exception {
        GenericRecord record = recordTuple2.f1;
        String event_name = record.get("event_name").toString();
        long event_ts = (Long) record.get("event_ts");
        Tuple3<String, Long, Long> currentTimeState = timeState.value();

        if (event_name.equals("search_list_keyless") &&
currentTimeState.f1 == 0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }

        long timeDiff = event_ts - currentTimeState.f1;

        if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) {
            Tuple3<String, Long, Long> finalCurrentTimeState = currentTimeState;
            sessionSummary.entries().forEach( tuple ->{
                String key = tuple.getKey();
                DemandSessionSummaryTuple sessionSummaryTuple =
tuple.getValue();
                try {
                    sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
                    collector.collect(sessionSummaryTuple);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            });
            timeState.clear();
            sessionSummary.clear();
            currentTimeState = timeState.value();
        }

        if (event_name.equals("search_list_keyless") &&
currentTimeState.f1 == 0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }
        currentTimeState.f2 = event_ts;

        if (currentTimeState.f1 > 0) {
            String search_hex9 = record.get("search_hex9") != null ?
record.get("search_hex9").toString() : null;
            DemandSessionSummaryTuple currentTuple =
sessionSummary.get(search_hex9) != null ?
sessionSummary.get(search_hex9) : new DemandSessionSummaryTuple();

            if (sessionSummary.get(search_hex9) == null) {
                currentTuple.setSearchHex9(search_hex9);
                currentTuple.setUserId(recordTuple2.f0);
                currentTuple.setStartTime(currentTimeState.f1);
                currentTuple.setDemandSessionId(currentTimeState.f0);
            }

            if (event_name.equals("search_list_keyless")) {
                currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
                SearchSummaryCalculation(record, currentTuple);
            }
            sessionSummary.put(search_hex9, currentTuple);
        }
        timeState.update(currentTimeState);
    }






On Sun, May 24, 2020 at 10:57 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi,
>
>        First sorry that I'm not expert on Window and please correct me if
> I'm wrong, but from my side, it seems the assigner might also be a problem
> in addition to the trigger: currently Flink window assigner should be all
> based on time (processing time or event time), and it might be hard to
> implement an event-driven window assigner that start to assign elements to
> a window after received some elements.
>
>       What comes to me is that a possible alternative method is to use the
> low-level *KeyedProcessFunction* directly:  you may register a timer 30
> mins later when received the "*search*" event and write the time of
> search event into the state. Then for the following events, they will be
> saved to the state since the flag is set. After received the "*start*"
> event or the timer is triggered, you could load all the events from the
> states, do the aggregation and cancel the timer if it is triggered by "
> *start*" event. A simpler case is [1] and it does not consider stop the
> aggreation when received special event, but it seems that the logic could
> be added to the case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example
>
> Best,
>  Yun
>
>
>
> ------------------Original Mail ------------------
> *Sender:*aj <ajainje...@gmail.com>
> *Send Date:*Sun May 24 01:10:55 2020
> *Recipients:*Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Flink Window with multiple trigger condition
>
>>
>> I am still not able to get much after reading the stuff. Please help with
>> some basic code to start to build this window and trigger.
>>
>> Another option I am thinking is I just use a Richflatmap function and use
>> the keyed state to build this logic. Is that the correct approach?
>>
>>
>>
>> On Fri, May 22, 2020 at 4:52 PM aj <ajainje...@gmail.com> wrote:
>>
>>>
>>>
>>> I was also thinking to have a processing time window but that will not
>>> work for me. I want to start the window when the user  "*search*" event
>>> arrives. So for each user window will start from the *search* event.
>>>  The Tumbling window has fixed start end time so that will not be
>>> suitable in my case.
>>>
>>>
>>>
>>>
>>> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> To achieve what you have in mind, I think what you have to do is to use
>>>> a
>>>> processing time window of 30 mins, and have a custom trigger that
>>>> matches
>>>> the "start" event in the `onElement` method and return
>>>> TriggerResult.FIRE_AND_PURGE.
>>>>
>>>> That way, the window fires either when the processing time has passed,
>>>> or
>>>> the start event was recieved.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to