I have a streaming job where I am doing window operation on *"user_id" *and
then doing some summarization based on some time bases logic like :

1.  end the session based on 30 mins inactivity of the user.
2.  The End_trip event or cancellation event has arrived for the user.

I am trying to rerun the job with some old offset for backfilling then I am
getting wrong results. Some of the sessions is ending with same start and
end time.  How to control the streaming job when lot of data get
accumulated in Kafka and I have to replay the job. Please help me what is
going wrong.

My assumption is it may be due to:
1. Out of order events
2. I am reading data from multiple topics so the end_trip event that is
happening at a later time can be read before and end the session.

I am using keyedProcessFunction like this :

public class DemandFunnelProcessFunction extends

    KeyedProcessFunction<Tuple, Tuple2<Long, GenericRecord>,
DemandFunnelSummaryTuple> {

  private static final Logger LOGGER =
LoggerFactory.getLogger(DemandFunnelProcessFunction.class);

  private transient ValueState<DemandFunnelSummaryTuple> sessionSummary;
  private transient ValueState<ArrayList<Integer>> distanceListState;

  @SuppressWarnings("checkstyle:LocalVariableName")
  @Override
  public void processElement(Tuple2<Long, GenericRecord> recordTuple2,
Context context,
      Collector<DemandFunnelSummaryTuple> collector) throws Exception {

    GenericRecord record = recordTuple2.f1;


    String event_name = record.get("event_name").toString();
    long event_ts = (Long) record.get("event_ts");

    DemandFunnelSummaryTuple currentTuple = sessionSummary.value();
    ArrayList<Integer> distanceList =
        distanceListState.value() != null ? distanceListState.value()
: new ArrayList<Integer>();

    try {

      if (currentTuple == null) {
        currentTuple = new DemandFunnelSummaryTuple();
        String demandSessionId = UUID.randomUUID().toString();
        currentTuple.setDemandSessionId(demandSessionId);
        currentTuple.setStartTime(event_ts);
        currentTuple.setUserId(recordTuple2.f0);
        currentTuple.setEventName("demand_funnel_summary");
        int geo_id = record.get("geo_id") != null ? (int)
record.get("geo_id") : 0;
        currentTuple.setGeoId(geo_id);
      }
      long endTime = currentTuple.getEndTime();

      if (event_name.equals("search_list_keyless")) {
        //System.out.println("inside search_list_keyless " + recordTuple2.f0);
        currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
        SearchEventUtil.searchSummaryCalculation(record, currentTuple,
distanceList);
      }


      currentTuple.setEndTime(event_ts);
      sessionSummary.update(currentTuple);
      distanceListState.update(distanceList);

      if (event_name.equals("keyless_booking_cancellation") || event_name
          .equals("keyless_end_trip")) {
        try {

          DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();

          if (sessionSummaryTuple != null) {
            sessionSummaryTuple.setAvgResultCount(
                (double) distanceList.size() /
sessionSummaryTuple.getTotalSearch());
            if (distanceList.size() > 0) {
              int distanceSum = distanceList.stream()
                  .collect(Collectors.summingInt(Integer::intValue));
              sessionSummaryTuple.setAvgBikeDistance((double)
distanceSum / distanceList.size());
              sessionSummaryTuple

.setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50));
              sessionSummaryTuple

.setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90));
            }
            sessionSummaryTuple.setEndTime(event_ts);
            collector.collect(sessionSummaryTuple);
          }
        } catch (Exception e) {
          DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
          LOGGER.info("Error in collecting event for user_id " +
sessionSummaryTuple.getUserId());
          e.printStackTrace();
        }
        sessionSummary.clear();
        distanceListState.clear();
      }
    } catch (Exception e) {
      LOGGER.info("error in processing event --" + recordTuple2.f1.toString());
      LOGGER.info(e.toString());
      e.printStackTrace();

    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx,
Collector<DemandFunnelSummaryTuple> out)
      throws Exception {

    try {
      DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
      if (sessionSummaryTuple != null) {
        System.out.println(
            "calling on timer" + sessionSummaryTuple.getUserId() + " "
+ sessionSummaryTuple
                .getEndTime() + "  " + timestamp);
        ArrayList<Integer> distanceList = distanceListState.value();
        if (distanceList != null && distanceList.size() > 0) {
          sessionSummaryTuple
              .setAvgResultCount(
                  (double) distanceList.size() /
sessionSummaryTuple.getTotalSearch());
          int distanceSum =
distanceList.stream().collect(Collectors.summingInt(Integer::intValue));
          sessionSummaryTuple.setAvgBikeDistance((double) distanceSum
/ distanceList.size());
          
sessionSummaryTuple.setP50DistNearestBike(SearchEventUtil.percentile(distanceList,
50));
          
sessionSummaryTuple.setP90DistNearestBike(SearchEventUtil.percentile(distanceList,
90));
        }
        sessionSummaryTuple.setEndTime(timestamp);
        out.collect(sessionSummaryTuple);
      }

    } catch (Exception e) {
      DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
      if (sessionSummaryTuple != null) {
        LOGGER.info("Error in collecting event for user_id " +
sessionSummaryTuple.getUserId());
      }
      e.printStackTrace();
    }
    sessionSummary.clear();
    distanceListState.clear();
  }

  @Override
  public void open(Configuration parameters) throws Exception {

    ValueStateDescriptor<DemandFunnelSummaryTuple> descriptor =
        new ValueStateDescriptor<DemandFunnelSummaryTuple>(
            "demand_session", // the state name
            TypeInformation.of(new TypeHint<DemandFunnelSummaryTuple>() {
            }));
    sessionSummary = getRuntimeContext().getState(descriptor);

    ValueStateDescriptor<ArrayList<Integer>> disDescriptor =
        new ValueStateDescriptor<ArrayList<Integer>>(
            "distance_state", // the state name
            TypeInformation.of(new TypeHint<ArrayList<Integer>>() {
            }));
    distanceListState = getRuntimeContext().getState(disDescriptor);

  }
}




-- 
Thanks & Regards,
Anuj Jain


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to