[ 
https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-27340:
--------------------------------
    Description: 
When we use data api to write a structured streaming query job we usually 
specify a watermark on event time column. If we define a window on the event 
time column, the delayKey metadata of the event time column is supposed to be 
propagated to the new column generated by time window expression. But if we add 
additional alias on the time window column, the delayKey metadata is lost.

Currently I only find the bug will affect stream-stream join with equal window 
join keys. In terms of aggregation, the gourping expression can be trimed(in 
CleanupAliases rule) so additional alias are removed and the metadata is kept.

Here is an example:
{code:scala}
  val sparkSession = SparkSession
    .builder()
    .master("local")
    .getOrCreate()
  val rateStream = sparkSession.readStream
    .format("rate")
    .option("rowsPerSecond", 10)
    .load()
    val fooStream = rateStream
      .select(
        col("value").as("fooId"),
        col("timestamp").as("fooTime")
      )
      .withWatermark("fooTime", "2 seconds")
      .select($"fooId", $"fooTime", window($"fooTime", "2 
seconds").alias("fooWindow"))

    val barStream = rateStream
      .where(col("value") % 2 === 0)
      .select(
        col("value").as("barId"),
        col("timestamp").as("barTime")
      )
      .withWatermark("barTime", "2 seconds")
      .select($"barId", $"barTime", window($"barTime", "2 
seconds").alias("barWindow"))

    val joinedDf = fooStream
      .join(
        barStream,
        $"fooId" === $"barId" &&
          fooStream.col("fooWindow") === barStream.col("barWindow"),
        joinType = "LeftOuter"
      )

      val query = joinedDf
      .writeStream
      .format("console")
      .option("truncate", 100)
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()

    query.awaitTermination()
{code}

this program will end with an exception, and from the analyzed plan we can see 
there is no delayKey metadata on 'fooWindow'

{code:java}
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
streaming DataFrame/Datasets is not supported without a watermark in the join 
keys, or a watermark on the nullable side and an appropriate range condition;;
Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
:- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
:  +- Filter isnotnull(fooTime#5-T2000ms)
:     +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end, 
precisetimestampconversion((((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, TimestampType)) AS 
window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
:        +- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
:           +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
:              +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
+- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
   +- Filter isnotnull(barTime#15-T2000ms)
      +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) 
- 0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end, 
precisetimestampconversion((((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) 
- 0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, TimestampType)) AS 
window#20-T2000ms, barId#14L, barTime#15-T2000ms]
         +- EventTimeWatermark barTime#15: timestamp, interval 2 seconds
            +- Project [value#1L AS barId#14L, timestamp#0 AS barTime#15]
               +- Filter ((value#1L % cast(2 as bigint)) = cast(0 as bigint))
                  +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
{code}


  was:
When we use data api to write a structured streaming query job we usually 
specify a watermark on event time column. If we define a window on the event 
time column, the delayKey metadata of the event time column is supposed to be 
propagated to the new column generated by time window expression. But if we add 
additional alias on the time window column, the delayKey metadata is lost.

Currently I only find the bug will affect stream-stream join. In terms of 
aggregation, the gourping expression can be trimed(in CleanupAliases rule) so 
additional alias are removed and the metadata is kept.

Here is an example:
{code:scala}
  val sparkSession = SparkSession
    .builder()
    .master("local")
    .getOrCreate()
  val rateStream = sparkSession.readStream
    .format("rate")
    .option("rowsPerSecond", 10)
    .load()
    val fooStream = rateStream
      .select(
        col("value").as("fooId"),
        col("timestamp").as("fooTime")
      )
      .withWatermark("fooTime", "2 seconds")
      .select($"fooId", $"fooTime", window($"fooTime", "2 
seconds").alias("fooWindow"))

    val barStream = rateStream
      .where(col("value") % 2 === 0)
      .select(
        col("value").as("barId"),
        col("timestamp").as("barTime")
      )
      .withWatermark("barTime", "2 seconds")
      .select($"barId", $"barTime", window($"barTime", "2 
seconds").alias("barWindow"))

    val joinedDf = fooStream
      .join(
        barStream,
        $"fooId" === $"barId" &&
          fooStream.col("fooWindow") === barStream.col("barWindow"),
        joinType = "LeftOuter"
      )

      val query = joinedDf
      .writeStream
      .format("console")
      .option("truncate", 100)
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()

    query.awaitTermination()
{code}

this program will end with an exception, and from the analyzed plan we can see 
there is no delayKey metadata on 'fooWindow'

{code:java}
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
streaming DataFrame/Datasets is not supported without a watermark in the join 
keys, or a watermark on the nullable side and an appropriate range condition;;
Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
:- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
:  +- Filter isnotnull(fooTime#5-T2000ms)
:     +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end, 
precisetimestampconversion((((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, TimestampType)) AS 
window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
:        +- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
:           +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
:              +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
+- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
   +- Filter isnotnull(barTime#15-T2000ms)
      +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) 
- 0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end, 
precisetimestampconversion((((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) as double) = 
(cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) 
- 0) as double) / cast(2000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, TimestampType)) AS 
window#20-T2000ms, barId#14L, barTime#15-T2000ms]
         +- EventTimeWatermark barTime#15: timestamp, interval 2 seconds
            +- Project [value#1L AS barId#14L, timestamp#0 AS barTime#15]
               +- Filter ((value#1L % cast(2 as bigint)) = cast(0 as bigint))
                  +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
{code}



> Alias on TimeWIndow expression may cause watermark metadata lost 
> -----------------------------------------------------------------
>
>                 Key: SPARK-27340
>                 URL: https://issues.apache.org/jira/browse/SPARK-27340
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Kevin Zhang
>            Priority: Major
>
> When we use data api to write a structured streaming query job we usually 
> specify a watermark on event time column. If we define a window on the event 
> time column, the delayKey metadata of the event time column is supposed to be 
> propagated to the new column generated by time window expression. But if we 
> add additional alias on the time window column, the delayKey metadata is lost.
> Currently I only find the bug will affect stream-stream join with equal 
> window join keys. In terms of aggregation, the gourping expression can be 
> trimed(in CleanupAliases rule) so additional alias are removed and the 
> metadata is kept.
> Here is an example:
> {code:scala}
>   val sparkSession = SparkSession
>     .builder()
>     .master("local")
>     .getOrCreate()
>   val rateStream = sparkSession.readStream
>     .format("rate")
>     .option("rowsPerSecond", 10)
>     .load()
>     val fooStream = rateStream
>       .select(
>         col("value").as("fooId"),
>         col("timestamp").as("fooTime")
>       )
>       .withWatermark("fooTime", "2 seconds")
>       .select($"fooId", $"fooTime", window($"fooTime", "2 
> seconds").alias("fooWindow"))
>     val barStream = rateStream
>       .where(col("value") % 2 === 0)
>       .select(
>         col("value").as("barId"),
>         col("timestamp").as("barTime")
>       )
>       .withWatermark("barTime", "2 seconds")
>       .select($"barId", $"barTime", window($"barTime", "2 
> seconds").alias("barWindow"))
>     val joinedDf = fooStream
>       .join(
>         barStream,
>         $"fooId" === $"barId" &&
>           fooStream.col("fooWindow") === barStream.col("barWindow"),
>         joinType = "LeftOuter"
>       )
>       val query = joinedDf
>       .writeStream
>       .format("console")
>       .option("truncate", 100)
>       .trigger(Trigger.ProcessingTime("5 seconds"))
>       .start()
>     query.awaitTermination()
> {code}
> this program will end with an exception, and from the analyzed plan we can 
> see there is no delayKey metadata on 'fooWindow'
> {code:java}
> org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
> streaming DataFrame/Datasets is not supported without a watermark in the join 
> keys, or a watermark on the nullable side and an appropriate range condition;;
> Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
> :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
> :  +- Filter isnotnull(fooTime#5-T2000ms)
> :     +- Project [named_struct(start, precisetimestampconversion(((((CASE 
> WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end, 
> precisetimestampconversion((((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, 
> TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
> :        +- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
> :           +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
> :              +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
> rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
> +- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
>    +- Filter isnotnull(barTime#15-T2000ms)
>       +- Project [named_struct(start, precisetimestampconversion(((((CASE 
> WHEN (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + 
> cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType, 
> TimestampType), end, precisetimestampconversion((((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + 
> cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, 
> TimestampType)) AS window#20-T2000ms, barId#14L, barTime#15-T2000ms]
>          +- EventTimeWatermark barTime#15: timestamp, interval 2 seconds
>             +- Project [value#1L AS barId#14L, timestamp#0 AS barTime#15]
>                +- Filter ((value#1L % cast(2 as bigint)) = cast(0 as bigint))
>                   +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
> rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to