[ 
https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013924#comment-18013924
 ] 

Lianet Magrans commented on KAFKA-19430:
----------------------------------------

Hey,
{quote}we might not even get `CorruptRecordException` directly, but that 
`CorruptRecordException` is used internally only, and the consumer throws a 
`KafkaException` from `poll()` for this case instead – this would make it even 
more difficult to handle this case. – And it make it questionable if RETRY is 
actually a valid option?
{quote}
Correct, on the consumer path is only internal and not considered retriable in 
any way. The consumer remains in a valid state btw, what's "invalid" (needs 
change) is the position it is trying to poll from. That's why it will break the 
poll loop, as some manual adjustment needs to be made to the position in order 
to make progress.

With this, agree that we should probably dig deeper into the idea of maybe 
exposing the root cause, expecting that's what could let the user understand 
and take the actions that would allow to reuse the same consumer to call poll 
again (skipping the corrupted records). But not sure how we could get the info 
about the offsets that need to be skipped, for instance, we need to probably 
investigate what exactly is the info the FetchResponse contains when it comes 
with the CORRUPT_MESSAGE error, to see what could be done (all these would 
require KIPs, even the smallest change in this area just to expose the new 
exception on poll would need one)
{quote}As I understand Kafka cosumers/producers, it's also could the case when 
only one record is corrupted
{quote}
I believe so, a fetch request including a partition and offset should fail with 
CorruptMessage if any record that needs to be included on that response 
(according to the fetch max bytes limits etc) is found corrupted on the broker 
when reading the log to generate the fetch response (but to double check on the 
broker-side handling of fetch in case I'm missing something)

> Don't fail on RecordCorruptedException
> --------------------------------------
>
>                 Key: KAFKA-19430
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19430
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Uladzislau Blok
>            Priority: Major
>
> From [https://github.com/confluentinc/kafka-streams-examples/issues/524]
> Currently, the existing `DeserializationExceptionHandler` is applied when 
> de-serializing the record key/value byte[] inside Kafka Streams. This implies 
> that a `RecordCorruptedException` is not handled.
> We should explore to not let Kafka Streams crash, but maybe retry this error 
> automatically (as `RecordCorruptedException extends RetriableException`), and 
> find a way to pump the error into the existing exception handler.
> If the error is transient, user can still use `REPLACE_THREAD` in the 
> uncaught exception handler, but this is a rather heavy weight approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to