Thanks for the clarification. Make sense to me.
Might be good to add some of these details (no code reference to
`ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, state
explicitly that the new handleError() will be used and that it provides
backward compatibility automatically based on it's current implementaion
from KIP-1034.
And that DLQ records, if returned, would be ignored and dropped and a
warning is logged about it for this case.
-Matthias
On 1/12/26 2:29 AM, Arpit Goyal wrote:
Thank you for the detailed questions! Let me clarify the implementation
approach:
Which Method Will Be Called?
GlobalThread will call the NEW handleError() method (not the deprecated
handle()).
Key Point: The exception handler is not called directly by GlobalThread.
Instead, it's called by ProcessorNode.process(), which already invokes
handleError() for regular processors.
The implementation is straightforward:
Current code (GlobalStateUpdateTask.initTopology - line 161):
node.init((InternalProcessorContext) this.processorContext); // No
handler passed
Proposed change:
node.init((InternalProcessorContext) this.processorContext,
processingExceptionHandler); // Pass handler
Once the handler is passed to ProcessorNode, the same code path that
handles exceptions for regular KStream/KTable processors
(ProcessorNode.process() line 236) will automatically handle GlobalKTable
exceptions:
Response response =
processingExceptionHandler.handleError(errorHandlerContext, record,
exception);
There's no separate code path for GlobalThread - it reuses the existing
ProcessorNode exception handling mechanism.
Backward Compatibility
The handleError() method provides automatic backward compatibility via
its default implementation:
default Response handleError(...) {
return new Response(Result.from(handle(...)),
Collections.emptyList());
}
- If users implement the old handle() method: handleError() delegates to
it automatically
- If users implement the new handleError() method: it's used directly
- No code changes required for existing applications
Dead Letter Queue (DLQ) Support
This is where GlobalKTable differs from regular processors:
The Limitation: GlobalThread does not have a Producer, so it cannot send
DLQ records to Kafka.
Proposed Approach:
1. For KIP-1270: When ProcessorNode detects DLQ records but the context
doesn't support RecordCollector (i.e., GlobalThread), it will log a warning
instead of sending:
log.warn("Dead letter queue records cannot be sent for GlobalKTable
processors " +
"(no producer available). DLQ support for GlobalKTable will be
addressed in a future KIP. " +
"Record details logged: topic={}, headers={}", ...);
2. Future KIP: Full DLQ support for GlobalKTable (requires adding
Producer infrastructure) will be proposed separately, as it's a larger
architectural change.
How This Avoids User Confusion
1. Single handler for all processors: Users configure ONE
ProcessingExceptionHandler that works for both regular and global processors
2. Consistent behavior: Result.RESUME continues, Result.FAIL stops - same
for both
3. Clear limitation: DLQ records are logged (not sent) for GlobalKTable,
with explicit warning message
4. Documentation: Config docs will clearly state DLQ sending limitation
for GlobalKTable
Thanks and Regards
Arpit Goyal
8861094754
On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]> wrote:
Thanks for the KIP Arpit.
Can you elaborate a little bit more on details? With the newly added DLQ
support for regular `Processor`, the existing
`ProcessingHandlerResponse` and corresponding method `handle()` are
deprecated with upcoming 4.2 release.
Thus, from AK 4.2+ going forward, users are expected to not implement
the old `handle()` (even if it's still supported, as long as the new
`handleError` is not overwritten).
Are you proposing, for now, to only add support for the deprecated
`handle()` method, ie, the new `handleError()` method would not be
called by the global-thread code? If yes, this might be confusing for
users?
If you do not propose this, would it imply that the global-thread code
would call the new `handlerError()` method? For this case, the question
is what the runtime would do if users try to use the DLQ feature?
Overall, it's unclear to me what you propose in detail and how we can
avoid to confuse users, keep it backward compatible, and make it easy to
understanding how the handler will work.
-Matthias
On 1/10/26 8:33 AM, Arpit Goyal wrote:
Hi Team
Just reaching out again.Need your inputs to move it forward
Thanks and Regards
Arpit Goyal
8861094754
On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <[email protected]>
wrote:
Hi All,
I would like to start a discussion for KIP-1270. This KIP extends
ProcessingExceptionHandler support to GlobalKTable processors, enabling
consistent exception handling across all stream processing types.
* Current Behavior*
When a processing exception occurs in a GlobalKTable processor:
- The exception propagates to GlobalStreamThread
- The GlobalStreamThread terminates
- The entire Kafka Streams application shuts down
- No user-configurable exception handling is available
* Proposed Behavior*
After this KIP, when a processing exception occurs in a GlobalKTable
processor:
- The configured ProcessingExceptionHandler.handleError() will be
invoked
- If the handler returns Result.RESUME, processing continues with the
next record
- If the handler returns Result.FAIL, the exception propagates (same
as
current behavior)
- If no handler is configured, behavior remains unchanged (backward
compatible)
KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
Thanks and Regards
Arpit Goyal
8861094754