I have a streaming job where I am doing window operation on *"user_id" *and then doing some summarization based on some time bases logic like :
1. end the session based on 30 mins inactivity of the user. 2. The End_trip event or cancellation event has arrived for the user. I am trying to rerun the job with some old offset for backfilling then I am getting wrong results. Some of the sessions is ending with same start and end time. How to control the streaming job when lot of data get accumulated in Kafka and I have to replay the job. Please help me what is going wrong. My assumption is it may be due to: 1. Out of order events 2. I am reading data from multiple topics so the end_trip event that is happening at a later time can be read before and end the session. I am using keyedProcessFunction like this : public class DemandFunnelProcessFunction extends KeyedProcessFunction<Tuple, Tuple2<Long, GenericRecord>, DemandFunnelSummaryTuple> { private static final Logger LOGGER = LoggerFactory.getLogger(DemandFunnelProcessFunction.class); private transient ValueState<DemandFunnelSummaryTuple> sessionSummary; private transient ValueState<ArrayList<Integer>> distanceListState; @SuppressWarnings("checkstyle:LocalVariableName") @Override public void processElement(Tuple2<Long, GenericRecord> recordTuple2, Context context, Collector<DemandFunnelSummaryTuple> collector) throws Exception { GenericRecord record = recordTuple2.f1; String event_name = record.get("event_name").toString(); long event_ts = (Long) record.get("event_ts"); DemandFunnelSummaryTuple currentTuple = sessionSummary.value(); ArrayList<Integer> distanceList = distanceListState.value() != null ? distanceListState.value() : new ArrayList<Integer>(); try { if (currentTuple == null) { currentTuple = new DemandFunnelSummaryTuple(); String demandSessionId = UUID.randomUUID().toString(); currentTuple.setDemandSessionId(demandSessionId); currentTuple.setStartTime(event_ts); currentTuple.setUserId(recordTuple2.f0); currentTuple.setEventName("demand_funnel_summary"); int geo_id = record.get("geo_id") != null ? (int) record.get("geo_id") : 0; currentTuple.setGeoId(geo_id); } long endTime = currentTuple.getEndTime(); if (event_name.equals("search_list_keyless")) { //System.out.println("inside search_list_keyless " + recordTuple2.f0); currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1); SearchEventUtil.searchSummaryCalculation(record, currentTuple, distanceList); } currentTuple.setEndTime(event_ts); sessionSummary.update(currentTuple); distanceListState.update(distanceList); if (event_name.equals("keyless_booking_cancellation") || event_name .equals("keyless_end_trip")) { try { DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value(); if (sessionSummaryTuple != null) { sessionSummaryTuple.setAvgResultCount( (double) distanceList.size() / sessionSummaryTuple.getTotalSearch()); if (distanceList.size() > 0) { int distanceSum = distanceList.stream() .collect(Collectors.summingInt(Integer::intValue)); sessionSummaryTuple.setAvgBikeDistance((double) distanceSum / distanceList.size()); sessionSummaryTuple .setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50)); sessionSummaryTuple .setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90)); } sessionSummaryTuple.setEndTime(event_ts); collector.collect(sessionSummaryTuple); } } catch (Exception e) { DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value(); LOGGER.info("Error in collecting event for user_id " + sessionSummaryTuple.getUserId()); e.printStackTrace(); } sessionSummary.clear(); distanceListState.clear(); } } catch (Exception e) { LOGGER.info("error in processing event --" + recordTuple2.f1.toString()); LOGGER.info(e.toString()); e.printStackTrace(); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<DemandFunnelSummaryTuple> out) throws Exception { try { DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value(); if (sessionSummaryTuple != null) { System.out.println( "calling on timer" + sessionSummaryTuple.getUserId() + " " + sessionSummaryTuple .getEndTime() + " " + timestamp); ArrayList<Integer> distanceList = distanceListState.value(); if (distanceList != null && distanceList.size() > 0) { sessionSummaryTuple .setAvgResultCount( (double) distanceList.size() / sessionSummaryTuple.getTotalSearch()); int distanceSum = distanceList.stream().collect(Collectors.summingInt(Integer::intValue)); sessionSummaryTuple.setAvgBikeDistance((double) distanceSum / distanceList.size()); sessionSummaryTuple.setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50)); sessionSummaryTuple.setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90)); } sessionSummaryTuple.setEndTime(timestamp); out.collect(sessionSummaryTuple); } } catch (Exception e) { DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value(); if (sessionSummaryTuple != null) { LOGGER.info("Error in collecting event for user_id " + sessionSummaryTuple.getUserId()); } e.printStackTrace(); } sessionSummary.clear(); distanceListState.clear(); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<DemandFunnelSummaryTuple> descriptor = new ValueStateDescriptor<DemandFunnelSummaryTuple>( "demand_session", // the state name TypeInformation.of(new TypeHint<DemandFunnelSummaryTuple>() { })); sessionSummary = getRuntimeContext().getState(descriptor); ValueStateDescriptor<ArrayList<Integer>> disDescriptor = new ValueStateDescriptor<ArrayList<Integer>>( "distance_state", // the state name TypeInformation.of(new TypeHint<ArrayList<Integer>>() { })); distanceListState = getRuntimeContext().getState(disDescriptor); } } -- Thanks & Regards, Anuj Jain <http://www.cse.iitm.ac.in/%7Eanujjain/>