Hi Eno,

that does make a lot more sense to me. when you pop stuff out of a topic you can at least put the coordinates (topicpartition,offset) additionally into the log wich is probably kinda nice to just fetch it from CLI an check whats going on.

One additional question:

This handler is only going to cover Serde exceptions or MessageSet Iterator exceptions aswell? Speaking Checksum Error. We can't rely on the deserializer to properly throw when we hand it data with a bad checksum + the checksum errors are the only bad pills I have seen in production until this point.

Best Jan


On 26.05.2017 17:31, Eno Thereska wrote:
Hi Jan,

You're right. I think I got carried away and broadened the scope of this KIP beyond it's 
original purpose. This handler will only be there for deserialization errors, i.e., 
"poison pills" and is not intended to be a catch-all handler for all sorts of 
other problems (e.g., NPE exception in user code). Deserialization erros can happen 
either when polling or when deserialising from a state store. So that narrows down the 
scope of the KIP, will update it.

Thanks
Eno

On 26 May 2017, at 11:31, Jan Filipiak <jan.filip...@trivago.com> wrote:

Hi

unfortunatly no. Think about "caching" these records popping outta there or 
multiple step Tasks (join,aggregate,repartiton all in one go) last repartitioner might 
throw cause it cant determine the partition only because a get on the join store cause a 
flush through the aggregates. This has nothing todo with a ConsumerRecord at all. 
Especially not the one we most recently processed.

To be completly honest. All but grining to a hold is not appealing to me at 
all. Sure maybe lagmonitoring will call me on Sunday but I can at least be 
confident its working the rest of the time.

Best Jan

PS.:

Hope you get my point. I am mostly complaing about

|public| |interface| |RecordExceptionHandler {|
|||/**|
|||* Inspect a record and the exception received|
|||*/|
|||HandlerResponse handle(that guy here >>>>>>>   ConsumerRecord<||byte||[], 
||byte||[]> record, Exception exception);|
|}|
||
|public| |enum| |HandlerResponse {|
|||/* continue with processing */|
|||CONTINUE(||1||), |
|||/* fail the processing and stop */|
|||FAIL(||2||);|
|}|



On 26.05.2017 11:18, Eno Thereska wrote:
Thanks Jan,

The record passed to the handler will always be the problematic record. There 
are 2 cases/types of exceptions for the purposes of this KIP: 1) any exception 
during deserialization. The bad record + the exception (i.e. 
DeserializeException) will be passed to the handler. The handler will be able 
to tell this was a deserialization error.
2) any exception during processing of this record. So whenever a processor gets 
the record (after some caching, etc) it starts to process it, then it fails, 
then it will call the handler with this record.

Does that match your thinking?

Thanks,
Eno


On 26 May 2017, at 09:51, Jan Filipiak <jan.filip...@trivago.com> wrote:

Hi,

quick question: From the KIP it doesn't quite makes sense to me how that fits 
with caching.
With caching the consumer record might not be at all related to some processor 
throwing while processing.

would it not make more sense to get the ProcessorName + object object for 
processing and
statestore or topic name + byte[] byte[]  for serializers? maybe passing in the 
used serdes?

Best Jan



On 25.05.2017 11:47, Eno Thereska wrote:
Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>

Discussion and feedback is welcome, thank you.
Eno

Reply via email to