[ https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18014279#comment-18014279 ]
Uladzislau Blok edited comment on KAFKA-19430 at 8/19/25 7:55 PM: ------------------------------------------------------------------ Good Morning. I'll start working on new ticket today. I was thinking more about handling this type of error in KS: if RecordCorruptedException is not retriable and cosumer/producer paths are different, probably it's not a good idea to have one interface to handle both flows. As more, you've also said that DeserializationExceptionHandler is serving as handler for consumer path, so we already have two separate handlers for consumer / producer paths. I think we can keep it this way, and handle RecordCorruptedException not in handler, but just in StreamsThread#pollRequests method. I like your proposal to keep offset and adjust it by 1 until we'll successfully read records Advantages of this solution: * It's simple * No KIP required, as there is no change in public API Disadvantages: * No 'generic' way to handle consumer errors (when we're trying to poll message). In case of new errors in future, we'll anyway need to bring some sort of separate interface to handle that was (Author: JIRAUSER309258): Good Morning. I'll start working on new ticket today. I was thinking more about handling this type of error in KS: if RecordCorruptedException is not retriable and cosumer/producer paths are different, probably it's not good to have one interface to handle both flows. You've also said that DeserializationExceptionHandler is handler for consumer path. I think we can keep it this way, and handle RecordCorruptedException just when we trying to read message. (using approach you've proposed) Advantages: * It will be simple * No KIP required, as there is no change in public API Disadvantages: * No 'generic' way to handle consumer errors. In case of new errors in future, we'll anyway need to bring some sort of separate interface to handle that > Don't fail on RecordCorruptedException > -------------------------------------- > > Key: KAFKA-19430 > URL: https://issues.apache.org/jira/browse/KAFKA-19430 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Uladzislau Blok > Priority: Major > > From [https://github.com/confluentinc/kafka-streams-examples/issues/524] > Currently, the existing `DeserializationExceptionHandler` is applied when > de-serializing the record key/value byte[] inside Kafka Streams. This implies > that a `RecordCorruptedException` is not handled. > We should explore to not let Kafka Streams crash, but maybe retry this error > automatically (as `RecordCorruptedException extends RetriableException`), and > find a way to pump the error into the existing exception handler. > If the error is transient, user can still use `REPLACE_THREAD` in the > uncaught exception handler, but this is a rather heavy weight approach. -- This message was sent by Atlassian Jira (v8.20.10#820010)