Hi Nimrod,

i am also interested in your first point, what exactly doesn "false alarm"
mean.
Today had following scenario, which in my opinion is a false alarm.

Following example:

- Topic contains 'N' Messages
- Spark Streaming application consumed all 'N' messages successfully
- Checkpoints of streaming app has logged the 'N' Messages as committed
- Topic Retention hits and all 'N' messages got removed from Topic
- New Message 'N + 1' arrives
- Spark Streaming App sees the new message and constructs a new Batch to 'N
+1'
- Spark Streaming app fails with "data loss' at
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:344)
because it tries to fetch the messages of the previous batch with offset 'N'

we are using Spark 3.5.5

i don't understand why spark is trying to fetch the message with an already
consumed offset at all.
Especially in Topis with very low data flow, a topic rentetion will always
produces this data loss problems.
I am curious if some of the devs could explain this.

Here is a stracktrace where it happened in our case:

```
        at
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.org$apache$spark$sql$kafka010$consumer$KafkaDataConsumer$$reportDataLoss0(KafkaDataConsumer.scala:724)
        at
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.reportDataLoss(KafkaDataConsumer.scala:651)
        at
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:344)
        at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:656)
        at
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:299)
        at
org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:79)

```

Best regards

Wolfgang Buchner

On 2025/07/10 10:04:14 Nimrod Ofek wrote:
> Hi everyone,
>
> I'm currently working with Spark Structured Streaming integrated with
Kafka
> and had some questions regarding the failOnDataLoss option.
>
> The current documentation states:
>
> *"Whether to fail the query when it's possible that data is lost (e.g.,
> topics are deleted, or offsets are out of range). This may be a false
> alarm. You can disable it when it doesn't work as you expected."*ChatGPT
> has some explanation - but I would like to get a more detailed and certain
> answer, and I think that the documentation should have that explanation as
> well.
>
> I’d appreciate some clarification on the following points:
>
>    1.
>
>    What exactly does “this may be a false alarm” mean in this context?
>    Under what circumstances would that occur? What should I expect when
that
>    happens?
>    2.
>
>    What does it mean to “fail the query”? Does this imply that the process
>    will skip the problematic offset and continue, or does it stop
entirely?
>    How will the next offset get determined? What will happen upon restart?
>    3.
>
>    If the offset is out of range, how does Spark determine the next offset
>    to use? Would it default to latest, earliest, or something else?
>
> Understanding the expected behavior here would really help us configure
> this option appropriately for our use case.
>
> Thanks in advance for your help!
>
> Best regards,
> Nimrod
>

Reply via email to