Hi Team, Need help on windowing & watermark concept. This code is not working as expected.
package com.jiomoney.streaming import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.ProcessingTime object SlingStreaming { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .appName("Coupons_ViewingNow") .getOrCreate() import spark.implicits._ val checkpoint_path = "/opt/checkpoints/" val ks = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .option("kafka.replica.fetch.max.bytes", "16777216") .load() val dfDeviceid = ks .withColumn("val", ($"value").cast("string")) .withColumn("count1", get_json_object(($"val"), "$.a")) .withColumn("deviceId", get_json_object(($"val"), "$.b")) .withColumn("timestamp", current_timestamp()) val final_ids = dfDeviceid .withColumn("processing_time", current_timestamp()) .withWatermark("processing_time","1 minutes") .groupBy(window($"processing_time", "10 seconds"), $"deviceId") .agg(sum($"count1") as "total") val t = final_ids .select(to_json(struct($"*")) as "value") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "sub_topic") .option("checkpointLocation", checkpoint_path) .outputMode("append") .trigger(ProcessingTime("1 seconds")) .start() t.awaitTermination() } } Thanks