Well, you should get updates every 10 seconds as long as there are events
surviving your quite aggressive watermark condition. Spark will try to drop
(not guaranteed) all events with a timestamp more than 500 milliseconds
before the current watermark timestamp. Try to increase the watermark
timespan and collect the max("timestamp") besides count on every trigger to
see what's going on in your stream. Could be that you have one producer out
of sync (clock sync) adding one message every two minutes. That will make
you drop all the other messages when you run with such a low watermark
tolerance.

Regards,

Magnus



On Wed, Jul 10, 2019 at 9:20 AM Kamalanathan Venkatesan <
kamalanatha...@in.ey.com> wrote:

> Hello,
>
>
>
> Any observations on what am I doing wrong?
>
>
>
> Thanks,
>
> -Kamal
>
>
>
> *From:* Kamalanathan Venkatesan
> *Sent:* Tuesday, July 09, 2019 7:25 PM
> *To:* 'user@spark.apache.org' <user@spark.apache.org>
> *Subject:* Spark structural streaming sinks output late
>
>
>
> Hello,
>
>
>
> I have below spark structural streaming code and I was expecting the
> results to be printed on the console every 10 seconds. But, I notice the
> sink to console happening at every ~2 mins and above.
>
> May I know what am I doing wrong?
>
>
>
> *def* streaming(): Unit = {
>
>     System.setProperty("hadoop.home.dir", "/Documents/ ")
>
>     *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram").
> setMaster("local[8]")
>
>     conf.set("spark.eventLog.enabled", "false");
>
>     *val* sc: SparkContext = *new* SparkContext(conf)
>
>     *val* sqlcontext = *new* SQLContext(sc)
>
>     *val* spark = SparkSession.builder().config(conf).getOrCreate()
>
>
>
>     *import* sqlcontext.implicits._
>
>     *import* org.apache.spark.sql.functions.window
>
>
>
>     *val* inputDf = spark.readStream.format("kafka")
>
>       .option("kafka.bootstrap.servers", "localhost:9092")
>
>       .option("subscribe", "wonderful")
>
>       .option("startingOffsets", "latest")
>
>       .load()
>
>     *import* scala.concurrent.duration._
>
>
>
>     *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value
> AS STRING)", "timestamp")
>
>       .withWatermark("timestamp", "500 milliseconds")
>
>       .groupBy(
>
>         window(*$**"timestamp"*, "10 seconds")).count()
>
>
>
>     *val* consoleOutput = personJsonDf.writeStream
>
>       .outputMode("complete")
>
>       .format("console")
>
>       .option("truncate", "false")
>
>       .outputMode(OutputMode.Update())
>
>       .start()
>
>     consoleOutput.awaitTermination()
>
>   }
>
>
>
> *object* SparkExecutor {
>
>   *val* spE: SparkExecutor = *new* SparkExecutor();
>
>   *def* main(args: Array[*String*]): Unit = {
>
>     println("test")
>
>     spE.streaming
>
>   }
>
> }
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>

Reply via email to