[ https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16158948#comment-16158948 ]
Kevin Zhang commented on SPARK-21944: ------------------------------------- [~mgaido] Do you mean the following way by saying "define the watermark on the column 'time' "? {code:java} val counts = events.select(window($"time", "5 seconds"), $"time", $"id") .withWatermark("time", "10 seconds") .dropDuplicates("id", "window") .groupBy("window") .count {code} I don't know whether this is right, because the documentation indicates we should use the same column as is used in watermark, that is "time" column(which is not what I want). I tried this way and the application dosen't throw any exception, but it didn't drop events older than the watermark as expected. In the following example, after the batch containing an event with time=1504774540(2017/9/7 16:55:40 CST) is processed(the watermark should be adjust to 2017/9/7 16:55:30 CST), then I send an event with time=1504745724(2017/9/7 8:55:24 CST), this event is processed instead of being dropped as expected. {code:java} +---------------------------------------------+-----+ |window |count| +---------------------------------------------+-----+ |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1 | |[2017-09-07 08:55:20.0,2017-09-07 08:55:25.0]|1 | |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3 | +---------------------------------------------+-----+ {min=2017-09-07T00:55:24.000Z, avg=2017-09-07T00:55:24.000Z, watermark=2017-09-07T08:55:30.000Z, max=2017-09-07T00:55:24.000Z} {code} Here is one thing important I have to say, that is my time zone is CST, instead of UTC. The start and end time in window is right, but the watermark is reported in UTC. I don't know whether this influences. If I didn't make everything clear, please point it and I will explain. Thanks > Watermark on window column is wrong > ----------------------------------- > > Key: SPARK-21944 > URL: https://issues.apache.org/jira/browse/SPARK-21944 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Kevin Zhang > > When I use a watermark with dropDuplicates in the following way, the > watermark is calculated wrong > {code:java} > val counts = events.select(window($"time", "5 seconds"), $"time", $"id") > .withWatermark("window", "10 seconds") > .dropDuplicates("id", "window") > .groupBy("window") > .count > {code} > where events is a dataframe with a timestamp column "time" and long column > "id". > I registered a listener to print the event time stats in each batch, and the > results is like the following > {code:shell} > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +---------------------------------------------+-----+ > > |window |count| > +---------------------------------------------+-----+ > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3 | > +---------------------------------------------+-----+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +---------------------------------------------+-----+ > > |window |count| > +---------------------------------------------+-----+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1 | > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3 | > +---------------------------------------------+-----+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +---------------------------------------------+-----+ > > |window |count| > +---------------------------------------------+-----+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1 | > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4 | > +---------------------------------------------+-----+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > {code} > As can be seen, the event time stats are wrong which are always in > 1970-01-01, so the watermark is calculated wrong. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org