[ 
https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013964#comment-18013964
 ] 

Matthias J. Sax commented on KAFKA-19430:
-----------------------------------------

Thanks Lianet!
{quote}Correct, on the consumer path is only internal and not considered 
retriable in any way.
{quote}
This is interesting. Why does it extend RetriableException if it's not 
considered retriable? Also wondering, what the root cause of such an error 
actually is? Would it be some "bit flip" that should actually not happen again 
if we retry? Or some other env related issue? Or is it some permanent corrupted 
that won't go away (wondering where such a permanent corruption would originate 
from)?
{quote}The consumer remains in a valid state btw, what's "invalid" (needs 
change) is the position it is trying to poll from. That's why it will break the 
poll loop, as some manual adjustment needs to be made to the position in order 
to make progress.
{quote}
So the position does not advance at all, and if we would call `poll()` again we 
just fetch the same batch (and potentially fail again with the same error)?

About solving the issue (at least for KS): we could always store the current 
position before we call poll(), and in an error comes back maybe retry 
incrementally by advancing the position by one before each poll() retry, until 
we get over the hump? Would it help to also reduce `max.poll.records` for this 
case, to maybe try getting back smaller junks of data, to read as much data as 
possible _before_ the bad batch?

Not sure if we need a KIP? Guess it depends what we want to change? For 
example, atm the consumer through a `new KafkaException` with an `null` "root 
cause exception". Just passing in `CorruptRecordException` as nested error into 
the thrown `KafkaException` could give enough signal for KS, and should not 
require a KIP?

I guess `max.poll.records` is a fixed config though, so currently it would 
require to create a new consumer to change it... maybe we could make 
`max.poll.records` a (optional) parameter to `poll()` to a more dynamic way to 
fetch fewer records? This would be a public API change and require a KIP of 
course.

Just trying to keep brainstorming... Is any of these ideas feasible?

> Don't fail on RecordCorruptedException
> --------------------------------------
>
>                 Key: KAFKA-19430
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19430
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Uladzislau Blok
>            Priority: Major
>
> From [https://github.com/confluentinc/kafka-streams-examples/issues/524]
> Currently, the existing `DeserializationExceptionHandler` is applied when 
> de-serializing the record key/value byte[] inside Kafka Streams. This implies 
> that a `RecordCorruptedException` is not handled.
> We should explore to not let Kafka Streams crash, but maybe retry this error 
> automatically (as `RecordCorruptedException extends RetriableException`), and 
> find a way to pump the error into the existing exception handler.
> If the error is transient, user can still use `REPLACE_THREAD` in the 
> uncaught exception handler, but this is a rather heavy weight approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to