Hi Ethan,

I believe then it is because the Watermark and Timestamps in your
implementation are uncorrelated. What Watermark really is a marker that
says there will be no elements with timestamp smaller than the value of
this watermark. For more info on the concept see [1]
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks>
.

In your case as you say that events can "lag" for 30 minutes, you should
try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly for
a case like yours.

Regards,
Dawid

2017-03-07 22:33 GMT+01:00 ext.eformichella <ext.eformiche...@riotgames.com>
:

> Hi Dawid, I'm working with Max on the project
> Our code for the TimestampAndWatermarkAssigner is:
> ```
> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
> AssignerWithPeriodicWatermarks[Row] {
>
>   override def extractTimestamp(element: Row, previousElementTimestamp:
> Long): Long = {
>     element.minTime
>   }
>
>   override def getCurrentWatermark(): Watermark = {
>     new Watermark(System.currentTimeMillis() - maxLateness)
>   }
> }
> ```
>
> Where Row is a class representing the incoming JSON object coming from
> Kafka, which includes the timestamp
>
> Thanks,
> -Ethan
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Issues-with-
> Event-Time-and-Kafka-tp12061p12090.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to