Thanks for the suggestion, we can definitely try that out. My one concern there is that events technically can lag for days or even months in some cases, but we only care about including the events that lag for 30 minutes or so, and would like the further lagging events to be ignored - I just want to make sure that doesn't require special handling.
I also just want to make sure I'm understanding the maximum lateness watermark correctly. Suppose a watermark gets generated, and then an element with an older timestamp is found. My understanding was that that element should be ignored, but from our results it looks like the late element actually overwrites the aggregate of the on-time elements. Is this expected behavior? Thank you for your help! -Ethan On Tue, Mar 7, 2017 at 6:01 PM, Dawid Wysakowicz [via Apache Flink User Mailing List archive.] <ml-node+s2336050n12092...@n4.nabble.com> wrote: > Hi Ethan, > > I believe then it is because the Watermark and Timestamps in your > implementation are uncorrelated. What Watermark really is a marker that > says there will be no elements with timestamp smaller than the value of > this watermark. For more info on the concept see [1] > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks> > . > > In your case as you say that events can "lag" for 30 minutes, you should > try the BoundedOutOfOrdernessTimestampExtractor. It is designed exactly > for a case like yours. > > Regards, > Dawid > > 2017-03-07 22:33 GMT+01:00 ext.eformichella <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=12092&i=0>>: > >> Hi Dawid, I'm working with Max on the project >> Our code for the TimestampAndWatermarkAssigner is: >> ``` >> class TimestampAndWatermarkAssigner(val maxLateness: Long) extends >> AssignerWithPeriodicWatermarks[Row] { >> >> override def extractTimestamp(element: Row, previousElementTimestamp: >> Long): Long = { >> element.minTime >> } >> >> override def getCurrentWatermark(): Watermark = { >> new Watermark(System.currentTimeMillis() - maxLateness) >> } >> } >> ``` >> >> Where Row is a class representing the incoming JSON object coming from >> Kafka, which includes the timestamp >> >> Thanks, >> -Ethan >> >> >> >> -- >> View this message in context: http://apache-flink-user-maili >> ng-list-archive.2336050.n4.nabble.com/Issues-with-Event- >> Time-and-Kafka-tp12061p12090.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12092.html > To unsubscribe from Issues with Event Time and Kafka, click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=12061&code=ZXh0LmVmb3JtaWNoZWxsYUByaW90Z2FtZXMuY29tfDEyMDYxfDk2NzY4OTQ2Mg==> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12139.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.