Hi ,

I am working on a use case where i have a stream of events. I want to
attach a unique id to all the events happened in a session.
Below is the logis that i am trying to implement. -

1. session_started
2 whenevr a event_name=search generate a unique search_id and attch this id
to all the following events in session until a new "search" event
encountered in session.

Example :
*user-1.  session-1   event_name- search (generate searchid --1)*
user-1.  session-1   event_name- x  (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
*user-1.  session-1   event_name- search (generate searchid --2)*
user-1.  session-1   event_name- x  (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)

As events can come out of order so i want to do this after session window
got over. So after session window i am doing like this :

1. sort all the events by time.
2. iterate ech event and attach the search_id
3. collect all th events and generate another stream with enrich search_id.

I am trying with below code but its not working as expected . i am not able
to understand what is happening.
































*dataStream.keyBy((KeySelector<GenericRecord, String>) record -> {
      StringBuilder builder = new StringBuilder();
builder.append(record.get("session_id"));
builder.append(record.get("user_id"));                return
builder.toString();
}).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
          .process(new ProcessWindowFunction<GenericRecord, GenericRecord,
String, TimeWindow>() {                        @Override
    public void process(String key, Context context,
Iterable<GenericRecord> iterable, Collector<GenericRecord> collector)
throws Exception {                            Stream<GenericRecord> result
= IterableUtils.toStream(iterable);
List<GenericRecord> s = result.collect(Collectors.toList());
            Map<Long,GenericRecord> recordMap = new HashMap<>();
                for(GenericRecord record : s) {
        recordMap.put((long)record.get("event_ts"),record);
            }                            Map<Long,GenericRecord>
sortedRecordMap = new LinkedHashMap<>();
recordMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
                    String search_id = null;
for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) {
                            GenericRecord record = element.getValue();
                          if(record.get("event_name").equals("search")) {
                                  search_id =
UUID.randomUUID().toString();                                }
                  record.put("search_id",search_id);
        collector.collect(record);                            }
            }                    }).print();*


-- 
Thanks & Regards,
Anuj Jain


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to