Hi Kostas,

Yes, I have a flag in my timestampextractor.

As you can see from the code below, I am checking whether 
currentTime - systemTimeSinceLastModification > 10 sec..... as new events
come then the watermark wouldn't be incremented. But as soon as I have a
difference of more than 10 seconds, I am incrementing the watermark by 1
sec, I feel this is very small and I would try incrementing the watermark
with a higher value but yeah this is what I am doing.


public class TimestampAndWatermarkGenerator implements
AssignerWithPeriodicWatermarks<BAMEvent>{

  private final long maxOutOfOrderness = 10000; // 10 seconds

  private long currentMaxTimestamp;
  private long systemTimeSinceLastModification;
  private boolean firstEventFlag = false;
  private Logger log =
LoggerFactory.getLogger(TimestampAndWatermarkGenerator.class);

  @Nullable
  @Override
  public Watermark getCurrentWatermark() {
    long currentTime = System.currentTimeMillis();
    if(firstEventFlag && (currentTime - systemTimeSinceLastModification >
10000)){
      systemTimeSinceLastModification = currentTime;
      currentMaxTimestamp = currentMaxTimestamp + 1000;
      //log.info("Current Max Time - {}, Last Modification Time - {}",
currentMaxTimestamp, systemTimeSinceLastModification );
    }
    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  }

  @Override
  public long extractTimestamp(BAMEvent bamEvent, long l) {
    long timestamp = bamEvent.getTimestamp();
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    systemTimeSinceLastModification = System.currentTimeMillis();
    firstEventFlag = true;
    //log.info("Current Max Time - {}, Current Event Time - {}",
currentMaxTimestamp, systemTimeSinceLastModification);

    return timestamp;
  }
}



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13860.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to