Structured Streaming really makes this easy. You can simply specify the
option of whether the start the query from earliest or latest.
Check out
-
https://www.slideshare.net/databricks/a-deep-dive-into-structured-streaming
-
https://spark.apache.org/docs/latest/structured-streaming-kafka-integratio
I'm consuming data from Kafka with createDirectStream and store the
offsets in Kafka (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, S