To get any meaningful answers you may want to provide the
information/context as much as possible. e.g. Spark version, which
behavior/output was expected (and why you think) and how it behaves
actually.

On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj <samraj.mi...@gmail.com> wrote:

> Hi Team,
>
> Need help on windowing & watermark concept.  This code is not working as
> expected.
>
> package com.jiomoney.streaming
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.ProcessingTime
>
> object SlingStreaming {
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession
>       .builder()
>       .master("local[*]")
>       .appName("Coupons_ViewingNow")
>       .getOrCreate()
>
>     import spark.implicits._
>
>     val checkpoint_path = "/opt/checkpoints/"
>
>     val ks = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", "localhost:9092")
>       .option("subscribe", "test")
>       .option("startingOffsets", "latest")
>       .option("failOnDataLoss", "false")
>       .option("kafka.replica.fetch.max.bytes", "16777216")
>       .load()
>
>     val dfDeviceid = ks
>       .withColumn("val", ($"value").cast("string"))
>       .withColumn("count1", get_json_object(($"val"), "$.a"))
>       .withColumn("deviceId", get_json_object(($"val"), "$.b"))
>       .withColumn("timestamp", current_timestamp())
>
>
>     val final_ids = dfDeviceid
>       .withColumn("processing_time", current_timestamp())
>       .withWatermark("processing_time","1 minutes")
>       .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
>       .agg(sum($"count1") as "total")
>
>     val t = final_ids
>       .select(to_json(struct($"*")) as "value")
>       .writeStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", "localhost:9092")
>       .option("topic", "sub_topic")
>       .option("checkpointLocation", checkpoint_path)
>       .outputMode("append")
>       .trigger(ProcessingTime("1 seconds"))
>       .start()
>
>     t.awaitTermination()
>
>   }
>
> }
>
>
> Thanks
>
>

Reply via email to