[
https://issues.apache.org/jira/browse/SPARK-49878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904760#comment-17904760
]
Thomas S commented on SPARK-49878:
----------------------------------
Yes, by all means. Thank you
> Spark Structured Streaming will process incorrect Kafka offsets
> ---------------------------------------------------------------
>
> Key: SPARK-49878
> URL: https://issues.apache.org/jira/browse/SPARK-49878
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.2.0, 3.5.3
> Reporter: Thomas S
> Priority: Critical
>
> When using the Kafka structured streaming integration,
> KafkaOffsetReaderConsumer#fetchLatestOffsets checks if any of the offsets are
> incorrect, which is good behaviour. It has a set number of retries (which is
> configurable - `fetchoffset.numretries`), and a fetch retry interval (also
> configurable - `fetchoffset.retryintervalms`).
> However, at the end of these retries, if the offsets are still incorrect,
> Spark uses the incorrect offsets to do the processing. For us this has caused
> processing to start at the beginning of the Kafka topic partition (as the
> incorrect offsets are usually 0), which is extremely undesirable and
> difficult to recover from (as the checkpointing then sets those values).
> The processing should stop after the set number of retries, as we do not want
> to be in a position that we start querying (possible years) old data.
> We are running version 3.2, but I can see that this logic is still in the
> master branch. In our current patched version, we have thrown an
> IllegalStateException if there are still incorrect offsets present in the
> incorrectOffsets collection. A la
> {code:java}
> if (incorrectOffsets.nonEmpty) {
> throw new IllegalStateException(s"Tried $attempt times
> (fetchoffset.numretries) to" +
> s" obtain valid offsets." +
> s" knownOffsets: $knownOffsets," +
> s" incorrectOffsets: $incorrectOffsets." +
> s" Aborting.")
> } {code}
> This change will, when the retries limit is exceeded and some offsets are
> still invalid, throw an IllegalStateException up through
> KafkaMicroBatchStream#latestOffset(),
> MicroBatchExecution#constructNextBatch(), and runActivatedStream() to
> StreamExecution#runStream() where it will be caught and terminate the query.
> There might be more elegant solutions available.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]