Hi Ethan, I believe then it is because the Watermark and Timestamps in your implementation are uncorrelated. What Watermark really is a marker that says there will be no elements with timestamp smaller than the value of this watermark. For more info on the concept see [1] <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks> .
In your case as you say that events can "lag" for 30 minutes, you should try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for a case like yours. Regards, Dawid 2017-03-07 22:33 GMT+01:00 ext.eformichella <ext.eformiche...@riotgames.com> : > Hi Dawid, I'm working with Max on the project > Our code for the TimestampAndWatermarkAssigner is: > ``` > class TimestampAndWatermarkAssigner(val maxLateness: Long) extends > AssignerWithPeriodicWatermarks[Row] { > > override def extractTimestamp(element: Row, previousElementTimestamp: > Long): Long = { > element.minTime > } > > override def getCurrentWatermark(): Watermark = { > new Watermark(System.currentTimeMillis() - maxLateness) > } > } > ``` > > Where Row is a class representing the incoming JSON object coming from > Kafka, which includes the timestamp > > Thanks, > -Ethan > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Issues-with- > Event-Time-and-Kafka-tp12061p12090.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >