[ https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013924#comment-18013924 ]
Lianet Magrans commented on KAFKA-19430: ---------------------------------------- Hey, {quote}we might not even get `CorruptRecordException` directly, but that `CorruptRecordException` is used internally only, and the consumer throws a `KafkaException` from `poll()` for this case instead – this would make it even more difficult to handle this case. – And it make it questionable if RETRY is actually a valid option? {quote} Correct, on the consumer path is only internal and not considered retriable in any way. The consumer remains in a valid state btw, what's "invalid" (needs change) is the position it is trying to poll from. That's why it will break the poll loop, as some manual adjustment needs to be made to the position in order to make progress. With this, agree that we should probably dig deeper into the idea of maybe exposing the root cause, expecting that's what could let the user understand and take the actions that would allow to reuse the same consumer to call poll again (skipping the corrupted records). But not sure how we could get the info about the offsets that need to be skipped, for instance, we need to probably investigate what exactly is the info the FetchResponse contains when it comes with the CORRUPT_MESSAGE error, to see what could be done (all these would require KIPs, even the smallest change in this area just to expose the new exception on poll would need one) {quote}As I understand Kafka cosumers/producers, it's also could the case when only one record is corrupted {quote} I believe so, a fetch request including a partition and offset should fail with CorruptMessage if any record that needs to be included on that response (according to the fetch max bytes limits etc) is found corrupted on the broker when reading the log to generate the fetch response (but to double check on the broker-side handling of fetch in case I'm missing something) > Don't fail on RecordCorruptedException > -------------------------------------- > > Key: KAFKA-19430 > URL: https://issues.apache.org/jira/browse/KAFKA-19430 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Uladzislau Blok > Priority: Major > > From [https://github.com/confluentinc/kafka-streams-examples/issues/524] > Currently, the existing `DeserializationExceptionHandler` is applied when > de-serializing the record key/value byte[] inside Kafka Streams. This implies > that a `RecordCorruptedException` is not handled. > We should explore to not let Kafka Streams crash, but maybe retry this error > automatically (as `RecordCorruptedException extends RetriableException`), and > find a way to pump the error into the existing exception handler. > If the error is transient, user can still use `REPLACE_THREAD` in the > uncaught exception handler, but this is a rather heavy weight approach. -- This message was sent by Atlassian Jira (v8.20.10#820010)