Hello there,

I am trying to calculate simple difference btw adjacent rows ( ts = ts -10)
of a column for a dataset using Join (of itself). The sql expression was
working for static datasets (trackT) as:

Dataset<Row> trackDiff = spark.sql(" select a.*, "

    + "a.posX - coalesce(b.posX, 0) as delX,

    + "from trackT a left join trackT b "

    + "on a.ts = b.ts - 10 ");



However, if the dataset is a structure streaming dataset, Spark prompts
that "Stream-stream outer join between two streaming

DataFrame/Datasets is not supported without a watermark in the join keys".
Since the dataset joins itself, I was thinking to just use an arbitrary
time interval as the watermark to create two streaming datasets and join
them:


Dataset<Row> trackWM1 = trackT.withColumn("ts1", trackT.col("timestamp"))

        .drop("timestamp").withWatermark("ts1", "10 second");

Dataset<Row> trackWM2 = trackT.withColumn("ts2", trackT.col("timestamp"))

         .drop("timestamp").withWatermark("ts2", "10 second");


Dataset<Row> joinDF = trackWM1.join(trackWM2, "???")


I am stuck and don't know how to do what I intended to do for the static
datasets for this streaming dataset. The join seems to me mean different
when I added the time interval watermark, as the original one was joining
tables with different rows. Can someone explain how I can realize the
original logic in streaming dataset. Probably I don't even need a join?


Thanks.


Best,


Mann

Reply via email to