[ https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013964#comment-18013964 ]
Matthias J. Sax commented on KAFKA-19430: ----------------------------------------- Thanks Lianet! {quote}Correct, on the consumer path is only internal and not considered retriable in any way. {quote} This is interesting. Why does it extend RetriableException if it's not considered retriable? Also wondering, what the root cause of such an error actually is? Would it be some "bit flip" that should actually not happen again if we retry? Or some other env related issue? Or is it some permanent corrupted that won't go away (wondering where such a permanent corruption would originate from)? {quote}The consumer remains in a valid state btw, what's "invalid" (needs change) is the position it is trying to poll from. That's why it will break the poll loop, as some manual adjustment needs to be made to the position in order to make progress. {quote} So the position does not advance at all, and if we would call `poll()` again we just fetch the same batch (and potentially fail again with the same error)? About solving the issue (at least for KS): we could always store the current position before we call poll(), and in an error comes back maybe retry incrementally by advancing the position by one before each poll() retry, until we get over the hump? Would it help to also reduce `max.poll.records` for this case, to maybe try getting back smaller junks of data, to read as much data as possible _before_ the bad batch? Not sure if we need a KIP? Guess it depends what we want to change? For example, atm the consumer through a `new KafkaException` with an `null` "root cause exception". Just passing in `CorruptRecordException` as nested error into the thrown `KafkaException` could give enough signal for KS, and should not require a KIP? I guess `max.poll.records` is a fixed config though, so currently it would require to create a new consumer to change it... maybe we could make `max.poll.records` a (optional) parameter to `poll()` to a more dynamic way to fetch fewer records? This would be a public API change and require a KIP of course. Just trying to keep brainstorming... Is any of these ideas feasible? > Don't fail on RecordCorruptedException > -------------------------------------- > > Key: KAFKA-19430 > URL: https://issues.apache.org/jira/browse/KAFKA-19430 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Uladzislau Blok > Priority: Major > > From [https://github.com/confluentinc/kafka-streams-examples/issues/524] > Currently, the existing `DeserializationExceptionHandler` is applied when > de-serializing the record key/value byte[] inside Kafka Streams. This implies > that a `RecordCorruptedException` is not handled. > We should explore to not let Kafka Streams crash, but maybe retry this error > automatically (as `RecordCorruptedException extends RetriableException`), and > find a way to pump the error into the existing exception handler. > If the error is transient, user can still use `REPLACE_THREAD` in the > uncaught exception handler, but this is a rather heavy weight approach. -- This message was sent by Atlassian Jira (v8.20.10#820010)