[ https://issues.apache.org/jira/browse/SPARK-49878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17905450#comment-17905450 ]
Thomas S commented on SPARK-49878: ---------------------------------- [~kabhwan] we already run the queries with `failOnDataLoss` set to `true` so I can confirm that is does use the incorrect offsets even with this setting enabled. Looking at the code, I think this is confirmed, as nothing is checking that property when looking for incorrect offsets. > Spark Structured Streaming will process incorrect Kafka offsets > --------------------------------------------------------------- > > Key: SPARK-49878 > URL: https://issues.apache.org/jira/browse/SPARK-49878 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.2.0, 3.5.3 > Reporter: Thomas S > Priority: Critical > > When using the Kafka structured streaming integration, > KafkaOffsetReaderConsumer#fetchLatestOffsets checks if any of the offsets are > incorrect, which is good behaviour. It has a set number of retries (which is > configurable - `fetchoffset.numretries`), and a fetch retry interval (also > configurable - `fetchoffset.retryintervalms`). > However, at the end of these retries, if the offsets are still incorrect, > Spark uses the incorrect offsets to do the processing. For us this has caused > processing to start at the beginning of the Kafka topic partition (as the > incorrect offsets are usually 0), which is extremely undesirable and > difficult to recover from (as the checkpointing then sets those values). > The processing should stop after the set number of retries, as we do not want > to be in a position that we start querying (possible years) old data. > We are running version 3.2, but I can see that this logic is still in the > master branch. In our current patched version, we have thrown an > IllegalStateException if there are still incorrect offsets present in the > incorrectOffsets collection. A la > {code:java} > if (incorrectOffsets.nonEmpty) { > throw new IllegalStateException(s"Tried $attempt times > (fetchoffset.numretries) to" + > s" obtain valid offsets." + > s" knownOffsets: $knownOffsets," + > s" incorrectOffsets: $incorrectOffsets." + > s" Aborting.") > } {code} > This change will, when the retries limit is exceeded and some offsets are > still invalid, throw an IllegalStateException up through > KafkaMicroBatchStream#latestOffset(), > MicroBatchExecution#constructNextBatch(), and runActivatedStream() to > StreamExecution#runStream() where it will be caught and terminate the query. > There might be more elegant solutions available. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org