[ 
https://issues.apache.org/jira/browse/SPARK-49878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17905450#comment-17905450
 ] 

Thomas S commented on SPARK-49878:
----------------------------------

[~kabhwan] we already run the queries with `failOnDataLoss` set to `true` so I 
can confirm that is does use the incorrect offsets even with this setting 
enabled. Looking at the code, I think this is confirmed, as nothing is checking 
that property when looking for incorrect offsets. 

> Spark Structured Streaming will process incorrect Kafka offsets
> ---------------------------------------------------------------
>
>                 Key: SPARK-49878
>                 URL: https://issues.apache.org/jira/browse/SPARK-49878
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.2.0, 3.5.3
>            Reporter: Thomas S
>            Priority: Critical
>
> When using the Kafka structured streaming integration, 
> KafkaOffsetReaderConsumer#fetchLatestOffsets checks if any of the offsets are 
> incorrect, which is good behaviour. It has a set number of retries (which is 
> configurable - `fetchoffset.numretries`), and a fetch retry interval (also 
> configurable - `fetchoffset.retryintervalms`).
> However, at the end of these retries, if the offsets are still incorrect, 
> Spark uses the incorrect offsets to do the processing. For us this has caused 
> processing to start at the beginning of the Kafka topic partition (as the 
> incorrect offsets are usually 0), which is extremely undesirable and 
> difficult to recover from (as the checkpointing then sets those values). 
> The processing should stop after the set number of retries, as we do not want 
> to be in a position that we start querying (possible years) old data. 
> We are running version 3.2, but I can see that this logic is still in the 
> master branch. In our current patched version, we have thrown an 
> IllegalStateException if there are still incorrect offsets present in the 
> incorrectOffsets collection. A la
> {code:java}
> if (incorrectOffsets.nonEmpty) {
>           throw new IllegalStateException(s"Tried $attempt times 
> (fetchoffset.numretries) to" +
>             s" obtain valid offsets." +
>             s" knownOffsets: $knownOffsets," +
>             s" incorrectOffsets: $incorrectOffsets." +
>             s" Aborting.")
> } {code}
> This change will, when the retries limit is exceeded and some offsets are 
> still invalid, throw an IllegalStateException up through 
> KafkaMicroBatchStream#latestOffset(), 
> MicroBatchExecution#constructNextBatch(), and runActivatedStream() to 
> StreamExecution#runStream() where it will be caught and terminate the query. 
> There might be more elegant solutions available. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to