I don't think we would need multiple handlers. The handler is invoked
passing in `ErrorHandlerContext` parameter, which provides enough
information to distinguish the case (eg, topic(), processorNodeId(), and
taskId()), so users can implement different logic inside the same
handler for the different cases if necessary.
-Matthias
On 1/23/26 10:05 AM, Arpit Goyal wrote:
Thanks Bill and Lucas for the feedback.
LB1: I was wondering precisely what we are logging in the DLQ case. Do
you intend to log the full record content to make he record content
recoverable, or only some metadata. I suppose it's the latter.
> Since GlobalKTable lacks producer infrastructure, DLQ records will be
logged with full metadata but NOT sent to a Kafka topic
LB2: Maybe I missed it (also not super fluent in that part of the
code), but will the implementers of the
`ProcessExceptionalHandler` be
able to tell whether the error originated from a globalThread or a
StreamsThread? The implementer may want to specialize handling for
each case. This is not a must, but would be a nice to have for
sure.
>. Great question! We have two options here
Option 1: Single Handler Configuration
Users define one implementation of
ProcessingExceptionHandler that handles errors for all stream types
(KStream, KTable, and GlobalKTable). This maintains
consistency with the existing
DeserializationExceptionHandler pattern.
Limitation: This will enforce the same handling behavior for
global exception handling that we defined for KStream processing exception
handling. This keeps things
simple but is not flexible enough for users who
may want different behavior for GlobalKTable.
Option 2: Separate Optional Configuration
Introduce a new optional configuration:
global.processing.exception.handler. If configured, it applies specifically
to GlobalKTable processing errors; if not
configured, exceptions bubble up to the uncaught
exception handler (maintaining current behavior and backward
compatibility).
Limitation: Requires two configuration properties if users want
exception handling for both regular streams and GlobalKTable.
With Option 1 - ProcessExceptionalHandler does not have a way
to identify which thread is invoking it as of now. We may need to introduce
ThreadType(Stream or Global) in errorHandlerContext with ThreadType
information.
With Option 2 - Client would always be aware of the class it
has implemented for GlobalKTables.
Thanks and Regards
Arpit Goyal
8861094754
On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]> wrote:
Hi All,
Thanks for the KIP! Makes sense to me and helps make KS more robust.
I don't have any additional comments beyond what's been said so far.
-Bill
On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
[email protected]>
wrote:
Hi,
Overall, this makes sense to me. Thanks for the KIP!
LB1: I was wondering precisely what we are logging in the DLQ case. Do
you intend to log the full record content to make he record content
recoverable, or only some metadata. I suppose it's the latter.
LB2: Maybe I missed it (also not super fluent in that part of the
code), but will the implementers of the `ProcessExceptionalHandler` be
able to tell whether the error originated from a globalThread or a
StreamsThread? The implementer may want to specialize handling for
each case. This is not a must, but would be a nice to have for sure.
Cheers,
Lucas
On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected]>
wrote:
Hi All,
Looking for more inputs and feedback. This would help to move this KIP
forward.
Thanks and Regards
Arpit Goyal
8861094754
On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]>
wrote:
Thanks for the response Matthias.
I have updated the KIP to include KIP-1034 handleError() automatic
backward compatibility. DLQ part I already mentioned under the
Limitation
section. Let me know if it needs to be improved further.
Thanks and Regards
Arpit Goyal
8861094754
On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]>
wrote:
Thanks for the clarification. Make sense to me.
Might be good to add some of these details (no code reference to
`ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie,
state
explicitly that the new handleError() will be used and that it
provides
backward compatibility automatically based on it's current
implementaion
from KIP-1034.
And that DLQ records, if returned, would be ignored and dropped and
a
warning is logged about it for this case.
-Matthias
On 1/12/26 2:29 AM, Arpit Goyal wrote:
Thank you for the detailed questions! Let me clarify the
implementation
approach:
Which Method Will Be Called?
GlobalThread will call the NEW handleError() method (not the
deprecated
handle()).
Key Point: The exception handler is not called directly by
GlobalThread.
Instead, it's called by ProcessorNode.process(), which already
invokes
handleError() for regular processors.
The implementation is straightforward:
Current code (GlobalStateUpdateTask.initTopology - line 161):
node.init((InternalProcessorContext) this.processorContext);
//
No
handler passed
Proposed change:
node.init((InternalProcessorContext) this.processorContext,
processingExceptionHandler); // Pass handler
Once the handler is passed to ProcessorNode, the same code path
that
handles exceptions for regular KStream/KTable processors
(ProcessorNode.process() line 236) will automatically handle
GlobalKTable
exceptions:
Response response =
processingExceptionHandler.handleError(errorHandlerContext,
record,
exception);
There's no separate code path for GlobalThread - it reuses the
existing
ProcessorNode exception handling mechanism.
Backward Compatibility
The handleError() method provides automatic backward
compatibility
via
its default implementation:
default Response handleError(...) {
return new Response(Result.from(handle(...)),
Collections.emptyList());
}
- If users implement the old handle() method: handleError()
delegates to
it automatically
- If users implement the new handleError() method: it's used
directly
- No code changes required for existing applications
Dead Letter Queue (DLQ) Support
This is where GlobalKTable differs from regular processors:
The Limitation: GlobalThread does not have a Producer, so it
cannot
send
DLQ records to Kafka.
Proposed Approach:
1. For KIP-1270: When ProcessorNode detects DLQ records but the
context
doesn't support RecordCollector (i.e., GlobalThread), it will log
a
warning
instead of sending:
log.warn("Dead letter queue records cannot be sent for
GlobalKTable
processors " +
"(no producer available). DLQ support for GlobalKTable
will
be
addressed in a future KIP. " +
"Record details logged: topic={}, headers={}", ...);
2. Future KIP: Full DLQ support for GlobalKTable (requires
adding
Producer infrastructure) will be proposed separately, as it's a
larger
architectural change.
How This Avoids User Confusion
1. Single handler for all processors: Users configure ONE
ProcessingExceptionHandler that works for both regular and global
processors
2. Consistent behavior: Result.RESUME continues, Result.FAIL
stops -
same
for both
3. Clear limitation: DLQ records are logged (not sent) for
GlobalKTable,
with explicit warning message
4. Documentation: Config docs will clearly state DLQ sending
limitation
for GlobalKTable
Thanks and Regards
Arpit Goyal
8861094754
On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]
wrote:
Thanks for the KIP Arpit.
Can you elaborate a little bit more on details? With the newly
added
DLQ
support for regular `Processor`, the existing
`ProcessingHandlerResponse` and corresponding method `handle()`
are
deprecated with upcoming 4.2 release.
Thus, from AK 4.2+ going forward, users are expected to not
implement
the old `handle()` (even if it's still supported, as long as the
new
`handleError` is not overwritten).
Are you proposing, for now, to only add support for the
deprecated
`handle()` method, ie, the new `handleError()` method would not
be
called by the global-thread code? If yes, this might be confusing
for
users?
If you do not propose this, would it imply that the global-thread
code
would call the new `handlerError()` method? For this case, the
question
is what the runtime would do if users try to use the DLQ feature?
Overall, it's unclear to me what you propose in detail and how we
can
avoid to confuse users, keep it backward compatible, and make it
easy
to
understanding how the handler will work.
-Matthias
On 1/10/26 8:33 AM, Arpit Goyal wrote:
Hi Team
Just reaching out again.Need your inputs to move it forward
Thanks and Regards
Arpit Goyal
8861094754
On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
[email protected]>
wrote:
Hi All,
I would like to start a discussion for KIP-1270. This KIP
extends
ProcessingExceptionHandler support to GlobalKTable processors,
enabling
consistent exception handling across all stream processing
types.
* Current Behavior*
When a processing exception occurs in a GlobalKTable
processor:
- The exception propagates to GlobalStreamThread
- The GlobalStreamThread terminates
- The entire Kafka Streams application shuts down
- No user-configurable exception handling is available
* Proposed Behavior*
After this KIP, when a processing exception occurs in a
GlobalKTable
processor:
- The configured ProcessingExceptionHandler.handleError()
will be
invoked
- If the handler returns Result.RESUME, processing
continues
with the
next record
- If the handler returns Result.FAIL, the exception
propagates
(same
as
current behavior)
- If no handler is configured, behavior remains unchanged
(backward
compatible)
KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
Thanks and Regards
Arpit Goyal
8861094754