Hi All, Thanks for the KIP! Makes sense to me and helps make KS more robust. I don't have any additional comments beyond what's been said so far.
-Bill On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <[email protected]> wrote: > Hi, > > Overall, this makes sense to me. Thanks for the KIP! > > LB1: I was wondering precisely what we are logging in the DLQ case. Do > you intend to log the full record content to make he record content > recoverable, or only some metadata. I suppose it's the latter. > > LB2: Maybe I missed it (also not super fluent in that part of the > code), but will the implementers of the `ProcessExceptionalHandler` be > able to tell whether the error originated from a globalThread or a > StreamsThread? The implementer may want to specialize handling for > each case. This is not a must, but would be a nice to have for sure. > > Cheers, > Lucas > > On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <[email protected]> > wrote: > > > > Hi All, > > Looking for more inputs and feedback. This would help to move this KIP > > forward. > > > > > > Thanks and Regards > > Arpit Goyal > > 8861094754 > > > > On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <[email protected]> > wrote: > > > > > Thanks for the response Matthias. > > > I have updated the KIP to include KIP-1034 handleError() automatic > > > backward compatibility. DLQ part I already mentioned under the > Limitation > > > section. Let me know if it needs to be improved further. > > > Thanks and Regards > > > Arpit Goyal > > > 8861094754 > > > > > > > > > On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> > wrote: > > > > > >> Thanks for the clarification. Make sense to me. > > >> > > >> Might be good to add some of these details (no code reference to > > >> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, > state > > >> explicitly that the new handleError() will be used and that it > provides > > >> backward compatibility automatically based on it's current > implementaion > > >> from KIP-1034. > > >> > > >> And that DLQ records, if returned, would be ignored and dropped and a > > >> warning is logged about it for this case. > > >> > > >> > > >> > > >> -Matthias > > >> > > >> > > >> On 1/12/26 2:29 AM, Arpit Goyal wrote: > > >> > Thank you for the detailed questions! Let me clarify the > implementation > > >> > approach: > > >> > > > >> > Which Method Will Be Called? > > >> > > > >> > GlobalThread will call the NEW handleError() method (not the > > >> deprecated > > >> > handle()). > > >> > > > >> > Key Point: The exception handler is not called directly by > > >> GlobalThread. > > >> > Instead, it's called by ProcessorNode.process(), which already > invokes > > >> > handleError() for regular processors. > > >> > > > >> > The implementation is straightforward: > > >> > > > >> > Current code (GlobalStateUpdateTask.initTopology - line 161): > > >> > node.init((InternalProcessorContext) this.processorContext); // > No > > >> > handler passed > > >> > > > >> > Proposed change: > > >> > node.init((InternalProcessorContext) this.processorContext, > > >> > processingExceptionHandler); // Pass handler > > >> > > > >> > Once the handler is passed to ProcessorNode, the same code path > that > > >> > handles exceptions for regular KStream/KTable processors > > >> > (ProcessorNode.process() line 236) will automatically handle > > >> GlobalKTable > > >> > exceptions: > > >> > > > >> > Response response = > > >> > processingExceptionHandler.handleError(errorHandlerContext, record, > > >> > exception); > > >> > > > >> > There's no separate code path for GlobalThread - it reuses the > > >> existing > > >> > ProcessorNode exception handling mechanism. > > >> > > > >> > Backward Compatibility > > >> > > > >> > The handleError() method provides automatic backward > compatibility > > >> via > > >> > its default implementation: > > >> > > > >> > default Response handleError(...) { > > >> > return new Response(Result.from(handle(...)), > > >> > Collections.emptyList()); > > >> > } > > >> > > > >> > - If users implement the old handle() method: handleError() > > >> delegates to > > >> > it automatically > > >> > - If users implement the new handleError() method: it's used > directly > > >> > - No code changes required for existing applications > > >> > > > >> > Dead Letter Queue (DLQ) Support > > >> > > > >> > This is where GlobalKTable differs from regular processors: > > >> > > > >> > The Limitation: GlobalThread does not have a Producer, so it > cannot > > >> send > > >> > DLQ records to Kafka. > > >> > > > >> > Proposed Approach: > > >> > > > >> > 1. For KIP-1270: When ProcessorNode detects DLQ records but the > > >> context > > >> > doesn't support RecordCollector (i.e., GlobalThread), it will log a > > >> warning > > >> > instead of sending: > > >> > > > >> > log.warn("Dead letter queue records cannot be sent for > GlobalKTable > > >> > processors " + > > >> > "(no producer available). DLQ support for GlobalKTable > will > > >> be > > >> > addressed in a future KIP. " + > > >> > "Record details logged: topic={}, headers={}", ...); > > >> > > > >> > 2. Future KIP: Full DLQ support for GlobalKTable (requires adding > > >> > Producer infrastructure) will be proposed separately, as it's a > larger > > >> > architectural change. > > >> > > > >> > How This Avoids User Confusion > > >> > > > >> > 1. Single handler for all processors: Users configure ONE > > >> > ProcessingExceptionHandler that works for both regular and global > > >> processors > > >> > 2. Consistent behavior: Result.RESUME continues, Result.FAIL > stops - > > >> same > > >> > for both > > >> > 3. Clear limitation: DLQ records are logged (not sent) for > > >> GlobalKTable, > > >> > with explicit warning message > > >> > 4. Documentation: Config docs will clearly state DLQ sending > > >> limitation > > >> > for GlobalKTable > > >> > Thanks and Regards > > >> > Arpit Goyal > > >> > 8861094754 > > >> > > > >> > > > >> > On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <[email protected]> > > >> wrote: > > >> > > > >> >> Thanks for the KIP Arpit. > > >> >> > > >> >> Can you elaborate a little bit more on details? With the newly > added > > >> DLQ > > >> >> support for regular `Processor`, the existing > > >> >> `ProcessingHandlerResponse` and corresponding method `handle()` are > > >> >> deprecated with upcoming 4.2 release. > > >> >> > > >> >> Thus, from AK 4.2+ going forward, users are expected to not > implement > > >> >> the old `handle()` (even if it's still supported, as long as the > new > > >> >> `handleError` is not overwritten). > > >> >> > > >> >> Are you proposing, for now, to only add support for the deprecated > > >> >> `handle()` method, ie, the new `handleError()` method would not be > > >> >> called by the global-thread code? If yes, this might be confusing > for > > >> >> users? > > >> >> > > >> >> If you do not propose this, would it imply that the global-thread > code > > >> >> would call the new `handlerError()` method? For this case, the > question > > >> >> is what the runtime would do if users try to use the DLQ feature? > > >> >> > > >> >> Overall, it's unclear to me what you propose in detail and how we > can > > >> >> avoid to confuse users, keep it backward compatible, and make it > easy > > >> to > > >> >> understanding how the handler will work. > > >> >> > > >> >> > > >> >> -Matthias > > >> >> > > >> >> On 1/10/26 8:33 AM, Arpit Goyal wrote: > > >> >>> Hi Team > > >> >>> Just reaching out again.Need your inputs to move it forward > > >> >>> > > >> >>> Thanks and Regards > > >> >>> Arpit Goyal > > >> >>> 8861094754 > > >> >>> > > >> >>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, < > [email protected]> > > >> >> wrote: > > >> >>> > > >> >>>> Hi All, > > >> >>>> I would like to start a discussion for KIP-1270. This KIP > extends > > >> >>>> ProcessingExceptionHandler support to GlobalKTable processors, > > >> enabling > > >> >>>> consistent exception handling across all stream processing types. > > >> >>>> > > >> >>>> * Current Behavior* > > >> >>>> > > >> >>>> When a processing exception occurs in a GlobalKTable > processor: > > >> >>>> - The exception propagates to GlobalStreamThread > > >> >>>> - The GlobalStreamThread terminates > > >> >>>> - The entire Kafka Streams application shuts down > > >> >>>> - No user-configurable exception handling is available > > >> >>>> > > >> >>>> * Proposed Behavior* > > >> >>>> > > >> >>>> After this KIP, when a processing exception occurs in a > > >> GlobalKTable > > >> >>>> processor: > > >> >>>> - The configured ProcessingExceptionHandler.handleError() > will be > > >> >> invoked > > >> >>>> - If the handler returns Result.RESUME, processing continues > > >> with the > > >> >>>> next record > > >> >>>> - If the handler returns Result.FAIL, the exception > propagates > > >> (same > > >> >> as > > >> >>>> current behavior) > > >> >>>> - If no handler is configured, behavior remains unchanged > > >> (backward > > >> >>>> compatible) > > >> >>>> > > >> >>>> KIP: > > >> >>>> > > >> >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread > > >> >>>> > > >> >>>> Thanks and Regards > > >> >>>> Arpit Goyal > > >> >>>> 8861094754 > > >> >>>> > > >> >>> > > >> >> > > >> >> > > >> > > > >> > > >> >
