Hi Jan,

You're right. I think I got carried away and broadened the scope of this KIP 
beyond it's original purpose. This handler will only be there for 
deserialization errors, i.e., "poison pills" and is not intended to be a 
catch-all handler for all sorts of other problems (e.g., NPE exception in user 
code). Deserialization erros can happen either when polling or when 
deserialising from a state store. So that narrows down the scope of the KIP, 
will update it.

Thanks
Eno

> On 26 May 2017, at 11:31, Jan Filipiak <jan.filip...@trivago.com> wrote:
> 
> Hi
> 
> unfortunatly no. Think about "caching" these records popping outta there or 
> multiple step Tasks (join,aggregate,repartiton all in one go) last 
> repartitioner might throw cause it cant determine the partition only because 
> a get on the join store cause a flush through the aggregates. This has 
> nothing todo with a ConsumerRecord at all. Especially not the one we most 
> recently processed.
> 
> To be completly honest. All but grining to a hold is not appealing to me at 
> all. Sure maybe lagmonitoring will call me on Sunday but I can at least be 
> confident its working the rest of the time.
> 
> Best Jan
> 
> PS.:
> 
> Hope you get my point. I am mostly complaing about
> 
> |public| |interface| |RecordExceptionHandler {|
> |||/**|
> |||* Inspect a record and the exception received|
> |||*/|
> |||HandlerResponse handle(that guy here >>>>>>>   ConsumerRecord<||byte||[], 
> ||byte||[]> record, Exception exception);|
> |}|
> ||
> |public| |enum| |HandlerResponse {|
> |||/* continue with processing */|
> |||CONTINUE(||1||), |
> |||/* fail the processing and stop */|
> |||FAIL(||2||);|
> |}|
> 
> 
> 
> On 26.05.2017 11:18, Eno Thereska wrote:
>> Thanks Jan,
>> 
>> The record passed to the handler will always be the problematic record. 
>> There are 2 cases/types of exceptions for the purposes of this KIP: 1) any 
>> exception during deserialization. The bad record + the exception (i.e. 
>> DeserializeException) will be passed to the handler. The handler will be 
>> able to tell this was a deserialization error.
>> 2) any exception during processing of this record. So whenever a processor 
>> gets the record (after some caching, etc) it starts to process it, then it 
>> fails, then it will call the handler with this record.
>> 
>> Does that match your thinking?
>> 
>> Thanks,
>> Eno
>> 
>> 
>>> On 26 May 2017, at 09:51, Jan Filipiak <jan.filip...@trivago.com> wrote:
>>> 
>>> Hi,
>>> 
>>> quick question: From the KIP it doesn't quite makes sense to me how that 
>>> fits with caching.
>>> With caching the consumer record might not be at all related to some 
>>> processor throwing while processing.
>>> 
>>> would it not make more sense to get the ProcessorName + object object for 
>>> processing and
>>> statestore or topic name + byte[] byte[]  for serializers? maybe passing in 
>>> the used serdes?
>>> 
>>> Best Jan
>>> 
>>> 
>>> 
>>> On 25.05.2017 11:47, Eno Thereska wrote:
>>>> Hi there,
>>>> 
>>>> I’ve added a KIP on improving exception handling in streams:
>>>> KIP-161: streams record processing exception handlers. 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
>>>>  
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>
>>>> 
>>>> Discussion and feedback is welcome, thank you.
>>>> Eno
> 

Reply via email to