Which version of Spark do you use?
You can get help on attaching streaming query listener and print out the
QueryProcessEvent to track watermark. The value of watermark will be
updated per batch and next batch will utilize that value.
If watermark exceeds the last timestamp but the value is still
Now I ve added same aggregation query as below but still it is didn't filter
val lines_stream = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "vm3:21005,vm2:21005").
option("subscribe", "s1").
load().
withColumn("tokens", split('value, ",")).
Hi Sandeep,
Watermarks are used in aggregation queries to ensure correctness and clean
up state. They don't allow you to drop records in map-only scenarios, which
you have in your example. If you would do a test of `groupBy().count()`
then you will see that the count doesn't increase with the last
I am trying to test the water mark concept in structured streaming using the
below program
import java.sql.Timestamp
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.streaming.Trigger
val lines_stream = spark.readStream.
format("kafka").
opt