Hello there, I am trying to calculate simple difference btw adjacent rows ( ts = ts -10) of a column for a dataset using Join (of itself). The sql expression was working for static datasets (trackT) as:
Dataset<Row> trackDiff = spark.sql(" select a.*, " + "a.posX - coalesce(b.posX, 0) as delX, + "from trackT a left join trackT b " + "on a.ts = b.ts - 10 "); However, if the dataset is a structure streaming dataset, Spark prompts that "Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys". Since the dataset joins itself, I was thinking to just use an arbitrary time interval as the watermark to create two streaming datasets and join them: Dataset<Row> trackWM1 = trackT.withColumn("ts1", trackT.col("timestamp")) .drop("timestamp").withWatermark("ts1", "10 second"); Dataset<Row> trackWM2 = trackT.withColumn("ts2", trackT.col("timestamp")) .drop("timestamp").withWatermark("ts2", "10 second"); Dataset<Row> joinDF = trackWM1.join(trackWM2, "???") I am stuck and don't know how to do what I intended to do for the static datasets for this streaming dataset. The join seems to me mean different when I added the time interval watermark, as the original one was joining tables with different rows. Can someone explain how I can realize the original logic in streaming dataset. Probably I don't even need a join? Thanks. Best, Mann