Hi Nimrod,

i am also interested in your first point, what exactly doesn "false alarm" mean.
Today had following scenario, which in my opinion is a false alarm.

Following example:

- Topic contains 'N' Messages
- Spark Streaming application consumed all 'N' messages successfully
- Checkpoints of streaming app has logged the 'N' Messages as committed
- Topic Retention hits and all 'N' messages got removed from Topic
- New Message 'N + 1' arrives
- Spark Streaming App sees the new message and constructs a new Batch to 'N +1'
- Spark Streaming app fails with "data loss' at
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:344)
 because it tries to fetch the messages of the previous batch with offset 'N'

we are using Spark 3.5.5

i don't understand why spark is trying to fetch the message with an already 
consumed offset at all.
Especially in Topis with very low data flow, a topic rentetion will always 
produces this data loss problems.
I am curious if some of the devs could explain this.

Here is a stracktrace where it happened in our case:

```
        at 
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.org$apache$spark$sql$kafka010$consumer$KafkaDataConsumer$$reportDataLoss0(KafkaDataConsumer.scala:724)
        at 
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.reportDataLoss(KafkaDataConsumer.scala:651)
        at 
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:344)
        at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at 
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:656)
        at 
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:299)
        at 
org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:79)

```

Best regards

Wolfgang Buchner

On 2025/07/10 10:04:14 Nimrod Ofek wrote:
> Hi everyone,
>
> I'm currently working with Spark Structured Streaming integrated with Kafka
> and had some questions regarding the failOnDataLoss option.
>
> The current documentation states:
>
> *"Whether to fail the query when it's possible that data is lost (e.g.,
> topics are deleted, or offsets are out of range). This may be a false
> alarm. You can disable it when it doesn't work as you expected."*ChatGPT
> has some explanation - but I would like to get a more detailed and certain
> answer, and I think that the documentation should have that explanation as
> well.
>
> I’d appreciate some clarification on the following points:
>
> 1.
>
> What exactly does “this may be a false alarm” mean in this context?
> Under what circumstances would that occur? What should I expect when that
> happens?
> 2.
>
> What does it mean to “fail the query”? Does this imply that the process
> will skip the problematic offset and continue, or does it stop entirely?
> How will the next offset get determined? What will happen upon restart?
> 3.
>
> If the offset is out of range, how does Spark determine the next offset
> to use? Would it default to latest, earliest, or something else?
>
> Understanding the expected behavior here would really help us configure
> this option appropriately for our use case.
>
> Thanks in advance for your help!
>
> Best regards,
> Nimrod
>



Reply via email to