Thanks everyone for the input. Should I start voting on it ?

Thanks and Regards
Arpit Goyal
8861094754

On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <[email protected]> wrote:

> Thanks Matthias.
> That makes sense. Client can use  the single handler implementation to
> support error handling for both Stream Thread and Global thread. There is
> no need to introduce ThreadType parameter or another configuration for the
> same.
> @Lucas Brutschy <[email protected]>  It must answered your query ?
>
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
>
> On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]> wrote:
>
>> I don't think we would need multiple handlers. The handler is invoked
>> passing in `ErrorHandlerContext` parameter, which provides enough
>> information to distinguish the case (eg, topic(), processorNodeId(), and
>> taskId()), so users can implement different logic inside the same
>> handler for the different cases if necessary.
>>
>>
>> -Matthias
>>
>>
>> On 1/23/26 10:05 AM, Arpit Goyal wrote:
>> > Thanks Bill and Lucas for the feedback.
>> >
>> > LB1:  I was wondering precisely what we are logging in the DLQ case. Do
>> >           you intend to log the full record content to make he record
>> content
>> >           recoverable, or only some metadata. I suppose it's the latter.
>> >
>> >
>> >    >   Since GlobalKTable lacks producer infrastructure, DLQ records
>> will be
>> > logged with full metadata but NOT sent to a Kafka topic
>> > LB2:   Maybe I missed it (also not super fluent in that part of the
>> >            code), but will the implementers of the
>> > `ProcessExceptionalHandler` be
>> >            able to tell whether the error originated from a
>> globalThread or a
>> >           StreamsThread? The implementer may want to specialize
>> handling for
>> >           each case. This is not a must, but would be a nice to have for
>> > sure.
>> >
>> >   >.   Great question! We have two options here
>> >            Option 1: Single Handler Configuration
>> >
>> >
>> >                   Users define one implementation of
>> > ProcessingExceptionHandler that handles errors for all stream types
>> > (KStream, KTable, and GlobalKTable). This maintains
>> >                    consistency with the existing
>> > DeserializationExceptionHandler pattern.
>> >           Limitation: This will enforce the same handling behavior for
>> > global exception handling that we defined for KStream processing
>> exception
>> > handling. This keeps things
>> >                             simple but is not flexible enough for users
>> who
>> > may want different behavior for GlobalKTable.
>> >             Option 2: Separate Optional Configuration
>> >
>> >
>> >                             Introduce a new optional configuration:
>> > global.processing.exception.handler. If configured, it applies
>> specifically
>> > to GlobalKTable processing errors; if not
>> >                             configured, exceptions bubble up to the
>> uncaught
>> > exception handler (maintaining current behavior and backward
>> > compatibility).
>> >              Limitation: Requires two configuration properties if users
>> want
>> > exception handling for both regular streams and GlobalKTable.
>> >
>> >             With Option 1 -  ProcessExceptionalHandler does not have a
>> way
>> > to identify which thread is invoking it as of now. We may need to
>> introduce
>> > ThreadType(Stream or Global)  in errorHandlerContext with ThreadType
>> > information.
>> >             With Option 2 - Client would always be aware of the class it
>> > has  implemented for GlobalKTables.
>> >
>> > Thanks and Regards
>> > Arpit Goyal
>> > 8861094754
>> >
>> >
>> > On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]> wrote:
>> >
>> >> Hi All,
>> >>
>> >> Thanks for the KIP! Makes sense to me and helps make KS more robust.
>> >> I don't have any additional comments beyond what's been said so far.
>> >>
>> >> -Bill
>> >>
>> >> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
>> >> [email protected]>
>> >> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> Overall, this makes sense to me. Thanks for the KIP!
>> >>>
>> >>> LB1: I was wondering precisely what we are logging in the DLQ case. Do
>> >>> you intend to log the full record content to make he record content
>> >>> recoverable, or only some metadata. I suppose it's the latter.
>> >>>
>> >>> LB2: Maybe I missed it (also not super fluent in that part of the
>> >>> code), but will the implementers of the `ProcessExceptionalHandler` be
>> >>> able to tell whether the error originated from a globalThread or a
>> >>> StreamsThread? The implementer may want to specialize handling for
>> >>> each case. This is not a must, but would be a nice to have for sure.
>> >>>
>> >>> Cheers,
>> >>> Lucas
>> >>>
>> >>> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected]
>> >
>> >>> wrote:
>> >>>>
>> >>>> Hi All,
>> >>>> Looking for more inputs and feedback. This would help to move this
>> KIP
>> >>>> forward.
>> >>>>
>> >>>>
>> >>>> Thanks and Regards
>> >>>> Arpit Goyal
>> >>>> 8861094754
>> >>>>
>> >>>> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]
>> >
>> >>> wrote:
>> >>>>
>> >>>>> Thanks for the response Matthias.
>> >>>>> I have updated the KIP to include KIP-1034 handleError() automatic
>> >>>>> backward compatibility. DLQ part I already mentioned under the
>> >>> Limitation
>> >>>>> section. Let me know if it needs to be improved further.
>> >>>>> Thanks and Regards
>> >>>>> Arpit Goyal
>> >>>>> 8861094754
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]>
>> >>> wrote:
>> >>>>>
>> >>>>>> Thanks for the clarification. Make sense to me.
>> >>>>>>
>> >>>>>> Might be good to add some of these details (no code reference to
>> >>>>>> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie,
>> >>> state
>> >>>>>> explicitly that the new handleError() will be used and that it
>> >>> provides
>> >>>>>> backward compatibility automatically based on it's current
>> >>> implementaion
>> >>>>>> from KIP-1034.
>> >>>>>>
>> >>>>>> And that DLQ records, if returned, would be ignored and dropped and
>> >> a
>> >>>>>> warning is logged about it for this case.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>>
>> >>>>>> On 1/12/26 2:29 AM, Arpit Goyal wrote:
>> >>>>>>> Thank you for the detailed questions! Let me clarify the
>> >>> implementation
>> >>>>>>> approach:
>> >>>>>>>
>> >>>>>>>     Which Method Will Be Called?
>> >>>>>>>
>> >>>>>>>     GlobalThread will call the NEW handleError() method (not the
>> >>>>>> deprecated
>> >>>>>>> handle()).
>> >>>>>>>
>> >>>>>>>     Key Point: The exception handler is not called directly by
>> >>>>>> GlobalThread.
>> >>>>>>> Instead, it's called by ProcessorNode.process(), which already
>> >>> invokes
>> >>>>>>> handleError() for regular processors.
>> >>>>>>>
>> >>>>>>>     The implementation is straightforward:
>> >>>>>>>
>> >>>>>>>     Current code (GlobalStateUpdateTask.initTopology - line 161):
>> >>>>>>>     node.init((InternalProcessorContext) this.processorContext);
>> >> //
>> >>> No
>> >>>>>>> handler passed
>> >>>>>>>
>> >>>>>>>     Proposed change:
>> >>>>>>>     node.init((InternalProcessorContext) this.processorContext,
>> >>>>>>> processingExceptionHandler);  // Pass handler
>> >>>>>>>
>> >>>>>>>     Once the handler is passed to ProcessorNode, the same code
>> path
>> >>> that
>> >>>>>>> handles exceptions for regular KStream/KTable processors
>> >>>>>>> (ProcessorNode.process() line 236) will automatically handle
>> >>>>>> GlobalKTable
>> >>>>>>> exceptions:
>> >>>>>>>
>> >>>>>>>     Response response =
>> >>>>>>> processingExceptionHandler.handleError(errorHandlerContext,
>> >> record,
>> >>>>>>> exception);
>> >>>>>>>
>> >>>>>>>     There's no separate code path for GlobalThread - it reuses the
>> >>>>>> existing
>> >>>>>>> ProcessorNode exception handling mechanism.
>> >>>>>>>
>> >>>>>>>     Backward Compatibility
>> >>>>>>>
>> >>>>>>>     The handleError() method provides automatic backward
>> >>> compatibility
>> >>>>>> via
>> >>>>>>> its default implementation:
>> >>>>>>>
>> >>>>>>>     default Response handleError(...) {
>> >>>>>>>         return new Response(Result.from(handle(...)),
>> >>>>>>> Collections.emptyList());
>> >>>>>>>     }
>> >>>>>>>
>> >>>>>>>     - If users implement the old handle() method: handleError()
>> >>>>>> delegates to
>> >>>>>>> it automatically
>> >>>>>>>     - If users implement the new handleError() method: it's used
>> >>> directly
>> >>>>>>>     - No code changes required for existing applications
>> >>>>>>>
>> >>>>>>>     Dead Letter Queue (DLQ) Support
>> >>>>>>>
>> >>>>>>>     This is where GlobalKTable differs from regular processors:
>> >>>>>>>
>> >>>>>>>     The Limitation: GlobalThread does not have a Producer, so it
>> >>> cannot
>> >>>>>> send
>> >>>>>>> DLQ records to Kafka.
>> >>>>>>>
>> >>>>>>>     Proposed Approach:
>> >>>>>>>
>> >>>>>>>     1. For KIP-1270: When ProcessorNode detects DLQ records but
>> the
>> >>>>>> context
>> >>>>>>> doesn't support RecordCollector (i.e., GlobalThread), it will log
>> >> a
>> >>>>>> warning
>> >>>>>>> instead of sending:
>> >>>>>>>
>> >>>>>>>     log.warn("Dead letter queue records cannot be sent for
>> >>> GlobalKTable
>> >>>>>>> processors " +
>> >>>>>>>              "(no producer available). DLQ support for
>> GlobalKTable
>> >>> will
>> >>>>>> be
>> >>>>>>> addressed in a future KIP. " +
>> >>>>>>>              "Record details logged: topic={}, headers={}", ...);
>> >>>>>>>
>> >>>>>>>     2. Future KIP: Full DLQ support for GlobalKTable (requires
>> >> adding
>> >>>>>>> Producer infrastructure) will be proposed separately, as it's a
>> >>> larger
>> >>>>>>> architectural change.
>> >>>>>>>
>> >>>>>>>     How This Avoids User Confusion
>> >>>>>>>
>> >>>>>>>     1. Single handler for all processors: Users configure ONE
>> >>>>>>> ProcessingExceptionHandler that works for both regular and global
>> >>>>>> processors
>> >>>>>>>     2. Consistent behavior: Result.RESUME continues, Result.FAIL
>> >>> stops -
>> >>>>>> same
>> >>>>>>> for both
>> >>>>>>>     3. Clear limitation: DLQ records are logged (not sent) for
>> >>>>>> GlobalKTable,
>> >>>>>>> with explicit warning message
>> >>>>>>>     4. Documentation: Config docs will clearly state DLQ sending
>> >>>>>> limitation
>> >>>>>>> for GlobalKTable
>> >>>>>>> Thanks and Regards
>> >>>>>>> Arpit Goyal
>> >>>>>>> 8861094754
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]
>> >>>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks for the KIP Arpit.
>> >>>>>>>>
>> >>>>>>>> Can you elaborate a little bit more on details? With the newly
>> >>> added
>> >>>>>> DLQ
>> >>>>>>>> support for regular `Processor`, the existing
>> >>>>>>>> `ProcessingHandlerResponse` and corresponding method `handle()`
>> >> are
>> >>>>>>>> deprecated with upcoming 4.2 release.
>> >>>>>>>>
>> >>>>>>>> Thus, from AK 4.2+ going forward, users are expected to not
>> >>> implement
>> >>>>>>>> the old `handle()` (even if it's still supported, as long as the
>> >>> new
>> >>>>>>>> `handleError` is not overwritten).
>> >>>>>>>>
>> >>>>>>>> Are you proposing, for now, to only add support for the
>> >> deprecated
>> >>>>>>>> `handle()` method, ie, the new `handleError()` method would not
>> >> be
>> >>>>>>>> called by the global-thread code? If yes, this might be confusing
>> >>> for
>> >>>>>>>> users?
>> >>>>>>>>
>> >>>>>>>> If you do not propose this, would it imply that the global-thread
>> >>> code
>> >>>>>>>> would call the new `handlerError()` method? For this case, the
>> >>> question
>> >>>>>>>> is what the runtime would do if users try to use the DLQ feature?
>> >>>>>>>>
>> >>>>>>>> Overall, it's unclear to me what you propose in detail and how we
>> >>> can
>> >>>>>>>> avoid to confuse users, keep it backward compatible, and make it
>> >>> easy
>> >>>>>> to
>> >>>>>>>> understanding how the handler will work.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>> On 1/10/26 8:33 AM, Arpit Goyal wrote:
>> >>>>>>>>> Hi Team
>> >>>>>>>>> Just reaching out again.Need your inputs to move it forward
>> >>>>>>>>>
>> >>>>>>>>> Thanks and Regards
>> >>>>>>>>> Arpit Goyal
>> >>>>>>>>> 8861094754
>> >>>>>>>>>
>> >>>>>>>>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
>> >>> [email protected]>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi All,
>> >>>>>>>>>> I would like to start a discussion for KIP-1270.  This KIP
>> >>> extends
>> >>>>>>>>>> ProcessingExceptionHandler support to GlobalKTable processors,
>> >>>>>> enabling
>> >>>>>>>>>> consistent exception handling across all stream processing
>> >> types.
>> >>>>>>>>>>
>> >>>>>>>>>> * Current Behavior*
>> >>>>>>>>>>
>> >>>>>>>>>>      When a processing exception occurs in a GlobalKTable
>> >>> processor:
>> >>>>>>>>>>      - The exception propagates to GlobalStreamThread
>> >>>>>>>>>>      - The GlobalStreamThread terminates
>> >>>>>>>>>>      - The entire Kafka Streams application shuts down
>> >>>>>>>>>>      - No user-configurable exception handling is available
>> >>>>>>>>>>
>> >>>>>>>>>> *  Proposed Behavior*
>> >>>>>>>>>>
>> >>>>>>>>>>      After this KIP, when a processing exception occurs in a
>> >>>>>> GlobalKTable
>> >>>>>>>>>> processor:
>> >>>>>>>>>>      - The configured ProcessingExceptionHandler.handleError()
>> >>> will be
>> >>>>>>>> invoked
>> >>>>>>>>>>      - If the handler returns Result.RESUME, processing
>> >> continues
>> >>>>>> with the
>> >>>>>>>>>> next record
>> >>>>>>>>>>      - If the handler returns Result.FAIL, the exception
>> >>> propagates
>> >>>>>> (same
>> >>>>>>>> as
>> >>>>>>>>>> current behavior)
>> >>>>>>>>>>      - If no handler is configured, behavior remains unchanged
>> >>>>>> (backward
>> >>>>>>>>>> compatible)
>> >>>>>>>>>>
>> >>>>>>>>>> KIP:
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks and Regards
>> >>>>>>>>>> Arpit Goyal
>> >>>>>>>>>> 8861094754
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>
>> >>
>> >
>>
>>

Reply via email to