Please help me to implement the above logic. On Mon, Mar 2, 2020 at 4:47 PM aj <ajainje...@gmail.com> wrote:
> Hi, > Is using the session window to implement the above logic is good idea or i > should use process function. > > On Sun, Mar 1, 2020 at 11:39 AM aj <ajainje...@gmail.com> wrote: > >> Hi , >> >> I am working on a use case where i have a stream of events. I want to >> attach a unique id to all the events happened in a session. >> Below is the logis that i am trying to implement. - >> >> 1. session_started >> 2 whenevr a event_name=search generate a unique search_id and attch this >> id to all the following events in session until a new "search" event >> encountered in session. >> >> Example : >> *user-1. session-1 event_name- search (generate searchid --1)* >> user-1. session-1 event_name- x (attach above search id -1) >> user-1. session-1 event_name- y (attach above search id -1) >> user-1. session-1 event_name- y (attach above search id -1) >> *user-1. session-1 event_name- search (generate searchid --2)* >> user-1. session-1 event_name- x (attach above search id -2) >> user-1. session-1 event_name- y (attach above search id -2) >> user-1. session-1 event_name- y (attach above search id -2) >> >> As events can come out of order so i want to do this after session window >> got over. So after session window i am doing like this : >> >> 1. sort all the events by time. >> 2. iterate ech event and attach the search_id >> 3. collect all th events and generate another stream with enrich >> search_id. >> >> I am trying with below code but its not working as expected . i am not >> able to understand what is happening. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *dataStream.keyBy((KeySelector<GenericRecord, String>) record -> { >> StringBuilder builder = new StringBuilder(); >> builder.append(record.get("session_id")); >> builder.append(record.get("user_id")); return >> builder.toString(); >> }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) >> .process(new ProcessWindowFunction<GenericRecord, GenericRecord, >> String, TimeWindow>() { @Override >> public void process(String key, Context context, >> Iterable<GenericRecord> iterable, Collector<GenericRecord> collector) >> throws Exception { Stream<GenericRecord> result >> = IterableUtils.toStream(iterable); >> List<GenericRecord> s = result.collect(Collectors.toList()); >> Map<Long,GenericRecord> recordMap = new HashMap<>(); >> for(GenericRecord record : s) { >> recordMap.put((long)record.get("event_ts"),record); >> } Map<Long,GenericRecord> >> sortedRecordMap = new LinkedHashMap<>(); >> recordMap.entrySet().stream() >> .sorted(Map.Entry.comparingByKey()) >> .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue())); >> String search_id = null; >> for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) { >> GenericRecord record = element.getValue(); >> if(record.get("event_name").equals("search")) { >> search_id = >> UUID.randomUUID().toString(); } >> record.put("search_id",search_id); >> collector.collect(record); } >> } }).print();* >> >> >> -- >> Thanks & Regards, >> Anuj Jain >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>