
It's hard for me to help you debug your code, but as long as:
- you are using event time for processing records (in operators like
- you do not have late records
- you are replaying the same records
- your code is deterministic
- you do not rely on the order of the records

Flink should behave deterministically and the results should be the same.

Maybe try to write unit tests/integration tests for your operators/logic
and feed some pre computed input? Or try to reproduce the problem and then
narrow it down to some single user_id/key, create a unit test/it case for
this and debug your code in a debugger on a local machine?

One thing to note, are you sure you are reprocessing the same records?
Kafka has for example the concept of retention time, after it can drop
older records from the topic.


> I have a streaming job where I am doing window operation on *"user_id" *and
> then doing some summarization based on some time bases logic like :
> 1.  end the session based on 30 mins inactivity of the user.
> 2.  The End_trip event or cancellation event has arrived for the user.
> I am trying to rerun the job with some old offset for backfilling then I
> am getting wrong results. Some of the sessions is ending with same start
> and end time.  How to control the streaming job when lot of data get
> accumulated in Kafka and I have to replay the job. Please help me what is
> going wrong.
> My assumption is it may be due to:
> 1. Out of order events
> 2. I am reading data from multiple topics so the end_trip event that is
> happening at a later time can be read before and end the session.
> I am using keyedProcessFunction like this :
> public class DemandFunnelProcessFunction extends
>     KeyedProcessFunction<Tuple, Tuple2<Long, GenericRecord>, 
> DemandFunnelSummaryTuple> {
>   private static final Logger LOGGER = 
> LoggerFactory.getLogger(DemandFunnelProcessFunction.class);
>   private transient ValueState<DemandFunnelSummaryTuple> sessionSummary;
>   private transient ValueState<ArrayList<Integer>> distanceListState;
>   @SuppressWarnings("checkstyle:LocalVariableName")
>   @Override
>   public void processElement(Tuple2<Long, GenericRecord> recordTuple2, 
> Context context,
>       Collector<DemandFunnelSummaryTuple> collector) throws Exception {
>     GenericRecord record = recordTuple2.f1;
>     String event_name = record.get("event_name").toString();
>     long event_ts = (Long) record.get("event_ts");
>     DemandFunnelSummaryTuple currentTuple = sessionSummary.value();
>     ArrayList<Integer> distanceList =
>         distanceListState.value() != null ? distanceListState.value() : new 
> ArrayList<Integer>();
>     try {
>       if (currentTuple == null) {
>         currentTuple = new DemandFunnelSummaryTuple();
>         String demandSessionId = UUID.randomUUID().toString();
>         currentTuple.setDemandSessionId(demandSessionId);
>         currentTuple.setStartTime(event_ts);
>         currentTuple.setUserId(recordTuple2.f0);
>         currentTuple.setEventName("demand_funnel_summary");
>         int geo_id = record.get("geo_id") != null ? (int) 
> record.get("geo_id") : 0;
>         currentTuple.setGeoId(geo_id);
>       }
>       long endTime = currentTuple.getEndTime();
>       if (event_name.equals("search_list_keyless")) {
>         //System.out.println("inside search_list_keyless " + recordTuple2.f0);
>         currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
>         SearchEventUtil.searchSummaryCalculation(record, currentTuple, 
> distanceList);
>       }
>       currentTuple.setEndTime(event_ts);
>       sessionSummary.update(currentTuple);
>       distanceListState.update(distanceList);
>       if (event_name.equals("keyless_booking_cancellation") || event_name
>           .equals("keyless_end_trip")) {
>         try {
>           DemandFunnelSummaryTuple sessionSummaryTuple = 
> sessionSummary.value();
>           if (sessionSummaryTuple != null) {
>             sessionSummaryTuple.setAvgResultCount(
>                 (double) distanceList.size() / 
> sessionSummaryTuple.getTotalSearch());
>             if (distanceList.size() > 0) {
>               int distanceSum = distanceList.stream()
>                   .collect(Collectors.summingInt(Integer::intValue));
>               sessionSummaryTuple.setAvgBikeDistance((double) distanceSum / 
> distanceList.size());
>               sessionSummaryTuple
> .setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50));
>               sessionSummaryTuple
> .setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90));
>             }
>             sessionSummaryTuple.setEndTime(event_ts);
>             collector.collect(sessionSummaryTuple);
>           }
>         } catch (Exception e) {
>           DemandFunnelSummaryTuple sessionSummaryTuple = 
> sessionSummary.value();
>           LOGGER.info("Error in collecting event for user_id " + 
> sessionSummaryTuple.getUserId());
>           e.printStackTrace();
>         }
>         sessionSummary.clear();
>         distanceListState.clear();
>       }
>     } catch (Exception e) {
>       LOGGER.info("error in processing event --" + 
> recordTuple2.f1.toString());
>       LOGGER.info(e.toString());
>       e.printStackTrace();
>     }
>   }
>   @Override
>   public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector<DemandFunnelSummaryTuple> out)
>       throws Exception {
>     try {
>       DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
>       if (sessionSummaryTuple != null) {
>         System.out.println(
>             "calling on timer" + sessionSummaryTuple.getUserId() + " " + 
> sessionSummaryTuple
>                 .getEndTime() + "  " + timestamp);
>         ArrayList<Integer> distanceList = distanceListState.value();
>         if (distanceList != null && distanceList.size() > 0) {
>           sessionSummaryTuple
>               .setAvgResultCount(
>                   (double) distanceList.size() / 
> sessionSummaryTuple.getTotalSearch());
>           int distanceSum = 
> distanceList.stream().collect(Collectors.summingInt(Integer::intValue));
>           sessionSummaryTuple.setAvgBikeDistance((double) distanceSum / 
> distanceList.size());
> sessionSummaryTuple.setP50DistNearestBike(SearchEventUtil.percentile(distanceList,
>  50));
> sessionSummaryTuple.setP90DistNearestBike(SearchEventUtil.percentile(distanceList,
>  90));
>         }
>         sessionSummaryTuple.setEndTime(timestamp);
>         out.collect(sessionSummaryTuple);
>       }
>     } catch (Exception e) {
>       DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
>       if (sessionSummaryTuple != null) {
>         LOGGER.info("Error in collecting event for user_id " + 
> sessionSummaryTuple.getUserId());
>       }
>       e.printStackTrace();
>     }
>     sessionSummary.clear();
>     distanceListState.clear();
>   }
>   @Override
>   public void open(Configuration parameters) throws Exception {
>     ValueStateDescriptor<DemandFunnelSummaryTuple> descriptor =
>         new ValueStateDescriptor<DemandFunnelSummaryTuple>(
>             "demand_session", // the state name
>             TypeInformation.of(new TypeHint<DemandFunnelSummaryTuple>() {
>             }));
>     sessionSummary = getRuntimeContext().getState(descriptor);
>     ValueStateDescriptor<ArrayList<Integer>> disDescriptor =
>         new ValueStateDescriptor<ArrayList<Integer>>(
>             "distance_state", // the state name
>             TypeInformation.of(new TypeHint<ArrayList<Integer>>() {
>             }));
>     distanceListState = getRuntimeContext().getState(disDescriptor);
>   }
> }
