Hi Ethan, how late elements (elements with event time after the watermark) are handled depends on the operator. Flink's window operators will trigger a single event window when they fall into the "allowed lateness" timeframe. Otherwise, they are dropped.
On Thu, Mar 9, 2017 at 5:30 PM, ext.eformichella < ext.eformiche...@riotgames.com> wrote: > Thanks for the suggestion, we can definitely try that out. > > My one concern there is that events technically can lag for days or even > months in some cases, but we only care about including the events that lag > for 30 minutes or so, and would like the further lagging events to be > ignored - I just want to make sure that doesn't require special handling. > > I also just want to make sure I'm understanding the maximum lateness > watermark correctly. Suppose a watermark gets generated, and then an > element with an older timestamp is found. My understanding was that that > element should be ignored, but from our results it looks like the late > element actually overwrites the aggregate of the on-time elements. Is this > expected behavior? > > Thank you for your help! > -Ethan > > On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User > Mailing List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=12139&i=0>> wrote: > >> Hi Ethan, >> >> I believe then it is because the Watermark and Timestamps in your >> implementation are uncorrelated. What Watermark really is a marker that >> says there will be no elements with timestamp smaller than the value of >> this watermark. For more info on the concept see [1] >> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks> >> . >> >> In your case as you say that events can "lag" for 30 minutes, you should >> try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly >> for a case like yours. >> >> Regards, >> Dawid >> >> 2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=12092&i=0>>: >> >>> Hi Dawid, I'm working with Max on the project >>> Our code for the TimestampAndWatermarkAssigner is: >>> ``` >>> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends >>> AssignerWithPeriodicWatermarks[Row] { >>> >>> override def extractTimestamp(element: Row, previousElementTimestamp: >>> Long): Long = { >>> element.minTime >>> } >>> >>> override def getCurrentWatermark(): Watermark = { >>> new Watermark(System.currentTimeMillis() - maxLateness) >>> } >>> } >>> ``` >>> >>> Where Row is a class representing the incoming JSON object coming from >>> Kafka, which includes the timestamp >>> >>> Thanks, >>> -Ethan >>> >>> >>> >>> -- >>> View this message in context: http://apache-flink-user-maili >>> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time >>> -and-Kafka-tp12061p12090.html >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive at Nabble.com. >>> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html >> To unsubscribe from Issues with Event Time and Kafka, click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Issues with Event Time and Kafka > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12139.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >