[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16158948#comment-16158948
 ] 

Kevin Zhang commented on SPARK-21944:
-------------------------------------

[~mgaido] Do you mean the following way by saying "define the watermark on the 
column 'time' "?

{code:java}
val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
      .withWatermark("time", "10 seconds")
      .dropDuplicates("id", "window")
      .groupBy("window")
      .count
{code}
I don't know whether this is right, because the documentation indicates we 
should use the same column as is used in watermark, that is "time" column(which 
is not what I want). I tried this way and the application dosen't throw any 
exception, but it didn't drop events older than the watermark as expected. In 
the following example, after the batch containing an event with 
time=1504774540(2017/9/7 16:55:40 CST) is processed(the watermark should be 
adjust to 2017/9/7 16:55:30 CST), then I send an event with 
time=1504745724(2017/9/7 8:55:24 CST), this event is processed instead of being 
dropped as expected.

{code:java}
+---------------------------------------------+-----+                           
|window                                       |count|
+---------------------------------------------+-----+
|[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1    |
|[2017-09-07 08:55:20.0,2017-09-07 08:55:25.0]|1    |
|[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3    |
+---------------------------------------------+-----+

{min=2017-09-07T00:55:24.000Z, avg=2017-09-07T00:55:24.000Z, 
watermark=2017-09-07T08:55:30.000Z, max=2017-09-07T00:55:24.000Z}
{code}

Here is one thing important I have to say, that is my time zone is CST, instead 
of UTC. The start and end time in window is right, but the watermark is 
reported in UTC. I don't know whether this influences.

If I didn't make everything clear, please point it and I will explain. Thanks




> Watermark on window column is wrong
> -----------------------------------
>
>                 Key: SPARK-21944
>                 URL: https://issues.apache.org/jira/browse/SPARK-21944
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>       .withWatermark("window", "10 seconds")
>       .dropDuplicates("id", "window")
>       .groupBy("window")
>       .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +---------------------------------------------+-----+                         
>   
> |window                                       |count|
> +---------------------------------------------+-----+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3    |
> +---------------------------------------------+-----+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +---------------------------------------------+-----+                         
>   
> |window                                       |count|
> +---------------------------------------------+-----+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1    |
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3    |
> +---------------------------------------------+-----+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +---------------------------------------------+-----+                         
>   
> |window                                       |count|
> +---------------------------------------------+-----+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1    |
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4    |
> +---------------------------------------------+-----+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark is calculated wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to