[ https://issues.apache.org/jira/browse/SPARK-49878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913318#comment-17913318 ]
Dmytro Tsyliuryk commented on SPARK-49878: ------------------------------------------ Hi [~tjshippey]. I've done a bit of research and not sure how we can reproduce the issue you are facing. There are numerous test cases in kafka connector package what would fail when we add the exception to KafkaOffsetReaderConsumer#fetchLatestOffsets(), one of them, and i think the closest to the issues you are facing, is [delete a topic when a Spark job is running|https://github.com/apache/spark/blob/master/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L1057] where a topic is deleted during execution of spark query. Important note here is that when we are changing failOnDataLoss to true, streaming query will fail with KAFKA_DATA_LOSS.COULD_NOT_READ_OFFSET_RANGE, as part of [this|https://github.com/apache/spark/blob/master/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala#L95] check. Considering that, i would not say its a bug in Spark, rather "expected" behaviour, where we want to keep streaming running even when fetched offsets are our the the range (with failOnDataLoss set tot true) > Spark Structured Streaming will process incorrect Kafka offsets > --------------------------------------------------------------- > > Key: SPARK-49878 > URL: https://issues.apache.org/jira/browse/SPARK-49878 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.2.0, 3.5.3 > Reporter: Thomas S > Priority: Critical > > When using the Kafka structured streaming integration, > KafkaOffsetReaderConsumer#fetchLatestOffsets checks if any of the offsets are > incorrect, which is good behaviour. It has a set number of retries (which is > configurable - `fetchoffset.numretries`), and a fetch retry interval (also > configurable - `fetchoffset.retryintervalms`). > However, at the end of these retries, if the offsets are still incorrect, > Spark uses the incorrect offsets to do the processing. For us this has caused > processing to start at the beginning of the Kafka topic partition (as the > incorrect offsets are usually 0), which is extremely undesirable and > difficult to recover from (as the checkpointing then sets those values). > The processing should stop after the set number of retries, as we do not want > to be in a position that we start querying (possible years) old data. > We are running version 3.2, but I can see that this logic is still in the > master branch. In our current patched version, we have thrown an > IllegalStateException if there are still incorrect offsets present in the > incorrectOffsets collection. A la > {code:java} > if (incorrectOffsets.nonEmpty) { > throw new IllegalStateException(s"Tried $attempt times > (fetchoffset.numretries) to" + > s" obtain valid offsets." + > s" knownOffsets: $knownOffsets," + > s" incorrectOffsets: $incorrectOffsets." + > s" Aborting.") > } {code} > This change will, when the retries limit is exceeded and some offsets are > still invalid, throw an IllegalStateException up through > KafkaMicroBatchStream#latestOffset(), > MicroBatchExecution#constructNextBatch(), and runActivatedStream() to > StreamExecution#runStream() where it will be caught and terminate the query. > There might be more elegant solutions available. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org