To get any meaningful answers you may want to provide the information/context as much as possible. e.g. Spark version, which behavior/output was expected (and why you think) and how it behaves actually.
On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj <samraj.mi...@gmail.com> wrote: > Hi Team, > > Need help on windowing & watermark concept. This code is not working as > expected. > > package com.jiomoney.streaming > > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.ProcessingTime > > object SlingStreaming { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[*]") > .appName("Coupons_ViewingNow") > .getOrCreate() > > import spark.implicits._ > > val checkpoint_path = "/opt/checkpoints/" > > val ks = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "test") > .option("startingOffsets", "latest") > .option("failOnDataLoss", "false") > .option("kafka.replica.fetch.max.bytes", "16777216") > .load() > > val dfDeviceid = ks > .withColumn("val", ($"value").cast("string")) > .withColumn("count1", get_json_object(($"val"), "$.a")) > .withColumn("deviceId", get_json_object(($"val"), "$.b")) > .withColumn("timestamp", current_timestamp()) > > > val final_ids = dfDeviceid > .withColumn("processing_time", current_timestamp()) > .withWatermark("processing_time","1 minutes") > .groupBy(window($"processing_time", "10 seconds"), $"deviceId") > .agg(sum($"count1") as "total") > > val t = final_ids > .select(to_json(struct($"*")) as "value") > .writeStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("topic", "sub_topic") > .option("checkpointLocation", checkpoint_path) > .outputMode("append") > .trigger(ProcessingTime("1 seconds")) > .start() > > t.awaitTermination() > > } > > } > > > Thanks > >