[ 
https://issues.apache.org/jira/browse/SPARK-49878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913318#comment-17913318
 ] 

Dmytro Tsyliuryk commented on SPARK-49878:
------------------------------------------

Hi [~tjshippey]. I've done a bit of research and not sure how we can reproduce 
the issue you are facing.

There are numerous test cases in kafka connector package what would fail when 
we add the exception to KafkaOffsetReaderConsumer#fetchLatestOffsets(), one of 
them, and i think the closest to the issues you are facing, is [delete a topic 
when a Spark job is 
running|https://github.com/apache/spark/blob/master/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L1057]
 where a topic is deleted during execution of spark query. Important note here 
is that when we are changing failOnDataLoss to true, streaming query will fail 
with KAFKA_DATA_LOSS.COULD_NOT_READ_OFFSET_RANGE, as part of 
[this|https://github.com/apache/spark/blob/master/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala#L95]
 check.

Considering that, i would not say its a bug in Spark, rather "expected" 
behaviour, where we want to keep streaming running even when fetched offsets 
are our the the range (with failOnDataLoss set tot true)

> Spark Structured Streaming will process incorrect Kafka offsets
> ---------------------------------------------------------------
>
>                 Key: SPARK-49878
>                 URL: https://issues.apache.org/jira/browse/SPARK-49878
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.2.0, 3.5.3
>            Reporter: Thomas S
>            Priority: Critical
>
> When using the Kafka structured streaming integration, 
> KafkaOffsetReaderConsumer#fetchLatestOffsets checks if any of the offsets are 
> incorrect, which is good behaviour. It has a set number of retries (which is 
> configurable - `fetchoffset.numretries`), and a fetch retry interval (also 
> configurable - `fetchoffset.retryintervalms`).
> However, at the end of these retries, if the offsets are still incorrect, 
> Spark uses the incorrect offsets to do the processing. For us this has caused 
> processing to start at the beginning of the Kafka topic partition (as the 
> incorrect offsets are usually 0), which is extremely undesirable and 
> difficult to recover from (as the checkpointing then sets those values). 
> The processing should stop after the set number of retries, as we do not want 
> to be in a position that we start querying (possible years) old data. 
> We are running version 3.2, but I can see that this logic is still in the 
> master branch. In our current patched version, we have thrown an 
> IllegalStateException if there are still incorrect offsets present in the 
> incorrectOffsets collection. A la
> {code:java}
> if (incorrectOffsets.nonEmpty) {
>           throw new IllegalStateException(s"Tried $attempt times 
> (fetchoffset.numretries) to" +
>             s" obtain valid offsets." +
>             s" knownOffsets: $knownOffsets," +
>             s" incorrectOffsets: $incorrectOffsets." +
>             s" Aborting.")
> } {code}
> This change will, when the retries limit is exceeded and some offsets are 
> still invalid, throw an IllegalStateException up through 
> KafkaMicroBatchStream#latestOffset(), 
> MicroBatchExecution#constructNextBatch(), and runActivatedStream() to 
> StreamExecution#runStream() where it will be caught and terminate the query. 
> There might be more elegant solutions available. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to