Please help me to implement the above logic.

On Mon, Mar 2, 2020 at 4:47 PM aj <ajainje...@gmail.com> wrote:

> Hi,
> Is using the session window to implement the above logic is good idea or i
> should use process function.
>
> On Sun, Mar 1, 2020 at 11:39 AM aj <ajainje...@gmail.com> wrote:
>
>> Hi ,
>>
>> I am working on a use case where i have a stream of events. I want to
>> attach a unique id to all the events happened in a session.
>> Below is the logis that i am trying to implement. -
>>
>> 1. session_started
>> 2 whenevr a event_name=search generate a unique search_id and attch this
>> id to all the following events in session until a new "search" event
>> encountered in session.
>>
>> Example :
>> *user-1.  session-1   event_name- search (generate searchid --1)*
>> user-1.  session-1   event_name- x  (attach above search id -1)
>> user-1.  session-1   event_name- y (attach above search id -1)
>> user-1.  session-1   event_name- y (attach above search id -1)
>> *user-1.  session-1   event_name- search (generate searchid --2)*
>> user-1.  session-1   event_name- x  (attach above search id -2)
>> user-1.  session-1   event_name- y (attach above search id -2)
>> user-1.  session-1   event_name- y (attach above search id -2)
>>
>> As events can come out of order so i want to do this after session window
>> got over. So after session window i am doing like this :
>>
>> 1. sort all the events by time.
>> 2. iterate ech event and attach the search_id
>> 3. collect all th events and generate another stream with enrich
>> search_id.
>>
>> I am trying with below code but its not working as expected . i am not
>> able to understand what is happening.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *dataStream.keyBy((KeySelector<GenericRecord, String>) record -> {
>>         StringBuilder builder = new StringBuilder();
>> builder.append(record.get("session_id"));
>> builder.append(record.get("user_id"));                return
>> builder.toString();
>> }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
>>           .process(new ProcessWindowFunction<GenericRecord, GenericRecord,
>> String, TimeWindow>() {                        @Override
>>     public void process(String key, Context context,
>> Iterable<GenericRecord> iterable, Collector<GenericRecord> collector)
>> throws Exception {                            Stream<GenericRecord> result
>> = IterableUtils.toStream(iterable);
>> List<GenericRecord> s = result.collect(Collectors.toList());
>>             Map<Long,GenericRecord> recordMap = new HashMap<>();
>>                 for(GenericRecord record : s) {
>>         recordMap.put((long)record.get("event_ts"),record);
>>             }                            Map<Long,GenericRecord>
>> sortedRecordMap = new LinkedHashMap<>();
>> recordMap.entrySet().stream()
>> .sorted(Map.Entry.comparingByKey())
>> .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
>>                     String search_id = null;
>> for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) {
>>                             GenericRecord record = element.getValue();
>>                           if(record.get("event_name").equals("search")) {
>>                                   search_id =
>> UUID.randomUUID().toString();                                }
>>                   record.put("search_id",search_id);
>>         collector.collect(record);                            }
>>             }                    }).print();*
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to