Hi Team,

Need help on windowing & watermark concept.  This code is not working as
expected.

package com.jiomoney.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.ProcessingTime

object SlingStreaming {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Coupons_ViewingNow")
      .getOrCreate()

    import spark.implicits._

    val checkpoint_path = "/opt/checkpoints/"

    val ks = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "16777216")
      .load()

    val dfDeviceid = ks
      .withColumn("val", ($"value").cast("string"))
      .withColumn("count1", get_json_object(($"val"), "$.a"))
      .withColumn("deviceId", get_json_object(($"val"), "$.b"))
      .withColumn("timestamp", current_timestamp())


    val final_ids = dfDeviceid
      .withColumn("processing_time", current_timestamp())
      .withWatermark("processing_time","1 minutes")
      .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
      .agg(sum($"count1") as "total")

    val t = final_ids
      .select(to_json(struct($"*")) as "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "sub_topic")
      .option("checkpointLocation", checkpoint_path)
      .outputMode("append")
      .trigger(ProcessingTime("1 seconds"))
      .start()

    t.awaitTermination()

  }

}


Thanks

Reply via email to