Thank you for the detailed questions! Let me clarify the implementation
approach:

  Which Method Will Be Called?

  GlobalThread will call the NEW handleError() method (not the deprecated
handle()).

  Key Point: The exception handler is not called directly by GlobalThread.
Instead, it's called by ProcessorNode.process(), which already invokes
handleError() for regular processors.

  The implementation is straightforward:

  Current code (GlobalStateUpdateTask.initTopology - line 161):
  node.init((InternalProcessorContext) this.processorContext);  // No
handler passed

  Proposed change:
  node.init((InternalProcessorContext) this.processorContext,
processingExceptionHandler);  // Pass handler

  Once the handler is passed to ProcessorNode, the same code path that
handles exceptions for regular KStream/KTable processors
(ProcessorNode.process() line 236) will automatically handle GlobalKTable
exceptions:

  Response response =
processingExceptionHandler.handleError(errorHandlerContext, record,
exception);

  There's no separate code path for GlobalThread - it reuses the existing
ProcessorNode exception handling mechanism.

  Backward Compatibility

  The handleError() method provides automatic backward compatibility via
its default implementation:

  default Response handleError(...) {
      return new Response(Result.from(handle(...)),
Collections.emptyList());
  }

  - If users implement the old handle() method: handleError() delegates to
it automatically
  - If users implement the new handleError() method: it's used directly
  - No code changes required for existing applications

  Dead Letter Queue (DLQ) Support

  This is where GlobalKTable differs from regular processors:

  The Limitation: GlobalThread does not have a Producer, so it cannot send
DLQ records to Kafka.

  Proposed Approach:

  1. For KIP-1270: When ProcessorNode detects DLQ records but the context
doesn't support RecordCollector (i.e., GlobalThread), it will log a warning
instead of sending:

  log.warn("Dead letter queue records cannot be sent for GlobalKTable
processors " +
           "(no producer available). DLQ support for GlobalKTable will be
addressed in a future KIP. " +
           "Record details logged: topic={}, headers={}", ...);

  2. Future KIP: Full DLQ support for GlobalKTable (requires adding
Producer infrastructure) will be proposed separately, as it's a larger
architectural change.

  How This Avoids User Confusion

  1. Single handler for all processors: Users configure ONE
ProcessingExceptionHandler that works for both regular and global processors
  2. Consistent behavior: Result.RESUME continues, Result.FAIL stops - same
for both
  3. Clear limitation: DLQ records are logged (not sent) for GlobalKTable,
with explicit warning message
  4. Documentation: Config docs will clearly state DLQ sending limitation
for GlobalKTable
Thanks and Regards
Arpit Goyal
8861094754


On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]> wrote:

> Thanks for the KIP Arpit.
>
> Can you elaborate a little bit more on details? With the newly added DLQ
> support for regular `Processor`, the existing
> `ProcessingHandlerResponse` and corresponding method `handle()` are
> deprecated with upcoming 4.2 release.
>
> Thus, from AK 4.2+ going forward, users are expected to not implement
> the old `handle()` (even if it's still supported, as long as the new
> `handleError` is not overwritten).
>
> Are you proposing, for now, to only add support for the deprecated
> `handle()` method, ie, the new `handleError()` method would not be
> called by the global-thread code? If yes, this might be confusing for
> users?
>
> If you do not propose this, would it imply that the global-thread code
> would call the new `handlerError()` method? For this case, the question
> is what the runtime would do if users try to use the DLQ feature?
>
> Overall, it's unclear to me what you propose in detail and how we can
> avoid to confuse users, keep it backward compatible, and make it easy to
> understanding how the handler will work.
>
>
> -Matthias
>
> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> > Hi Team
> > Just reaching out again.Need your inputs to move it forward
> >
> > Thanks and Regards
> > Arpit Goyal
> > 8861094754
> >
> > On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]>
> wrote:
> >
> >> Hi All,
> >> I would like to start a discussion for KIP-1270.  This KIP extends
> >> ProcessingExceptionHandler support to GlobalKTable processors, enabling
> >> consistent exception handling across all stream processing types.
> >>
> >> * Current Behavior*
> >>
> >>    When a processing exception occurs in a GlobalKTable processor:
> >>    - The exception propagates to GlobalStreamThread
> >>    - The GlobalStreamThread terminates
> >>    - The entire Kafka Streams application shuts down
> >>    - No user-configurable exception handling is available
> >>
> >> *  Proposed Behavior*
> >>
> >>    After this KIP, when a processing exception occurs in a GlobalKTable
> >> processor:
> >>    - The configured ProcessingExceptionHandler.handleError() will be
> invoked
> >>    - If the handler returns Result.RESUME, processing continues with the
> >> next record
> >>    - If the handler returns Result.FAIL, the exception propagates (same
> as
> >> current behavior)
> >>    - If no handler is configured, behavior remains unchanged (backward
> >> compatible)
> >>
> >> KIP:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> >>
> >> Thanks and Regards
> >> Arpit Goyal
> >> 8861094754
> >>
> >
>
>

Reply via email to