Thanks and Regards Arpit Goyal 8861094754 On Sat, 30 May, 2026, 10:32 pm Arpit Goyal, <[email protected]> wrote:
> Hi @Lucas Brutschy <[email protected]>, thanks for reviewing it. > These are my view on it. Let me know what you think ? > > LB1: To avoid duplicates for DLQ topic, I don't think we should have an > EOS specific logic for global thread. This would be a big architectural > change also, as we maintain checkpointing in a separate file rather than > consumer > offsets. I can update the KIP with these details if you are also aligned on > it. > > LB2: Will not have an EOS producer specifically in the global thread. The > global thread will use a dedicated non-transactional producer — no > transactional.id is set, so there is no collision risk with stream thread > producers under EOS. > The client.id will follow a distinct naming convention (e.g. > <threadId>-global-producer) to avoid any confusion with stream thread > producers which use the <threadId>-producer suffix.May be we need to > introduce global producer config and does not allow setting EOS for global > producer. > > LB3: sendException is never checked in the poll loop today. I will add a > checkForException() call inside StateConsumer.pollAndUpdate() (mirroring > what stream threads do after each process() call), so async DLQ send > failures like auth errors surface promptly rather than being silently > swallowed. > > LB4: PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG is deprecated > since 4.3. The semantic change from "log a warning and drop the record" to > "invoke the handler and optionally produce to DLQ" is a meaningful > behaviour change for anyone who has set this config. I will add an explicit > entry in the KIP's Compatibility section documenting this, and update the > @deprecated javadoc on the config to accurately describe what it now does > (and does not do) rather than leaving the old description in place. > > LB5: Yes, RecordDeserializer.handleDeserializationFailure unconditionally > casts processorContext to RecordCollector.Supplier at line 124 and today > this throws a ClassCastException when the deserialization handler returns > DLQ records during global state restoration, because > GlobalProcessorContextImpl does not implement RecordCollector.Supplier. I > flagged this in a previous PR comment > https://github.com/apache/kafka/pull/21535#issuecomment-4012433576. Once > GlobalProcessorContextImpl implements RecordCollector.Supplier as part of > this KIP, the cast succeeds, and the deserialization DLQ path works > correctly. > > LB6: I will add integration tests mirroring the coverage in > DeadLetterQueueIntegrationTest. > > LB7: Good catch; will rename the field from recordCollector to > collector to match ProcessorContextImpl and keep the naming consistent > across both context implementations. > > Thanks and Regards > Arpit Goyal > 8861094754 > > > On Mon, May 25, 2026 at 11:11 AM Arpit Goyal <[email protected]> > wrote: > >> Hi Lucas >> Never mind, i got the link >> https://lists.apache.org/thread/t7x46hl4ykozrtj641woz3g52cqpwlms . >> >> Thanks and Regards >> Arpit Goyal >> 8861094754 >> >> >> On Mon, May 25, 2026 at 11:10 AM Arpit Goyal <[email protected]> >> wrote: >> >>> Hi Lucas >>> Could you help update it or provide the link? I don't have access to it. >>> [image: Screenshot 2026-05-25 at 11.08.57 AM.png] >>> Thanks and Regards >>> Arpit Goyal >>> 8861094754 >>> >>> >>> On Fri, May 22, 2026 at 3:20 PM Lucas Brutschy via dev < >>> [email protected]> wrote: >>> >>>> Hi Arpit, >>>> >>>> Thanks for picking this up. A few questions before this goes to VOTE. >>>> >>>> Nit: the discussion thread link on the KIP is incorrect, can you fix it? >>>> >>>> LB1: How does this interact with EOS? Under EOS, the normal DLQ write >>>> is part of the same transaction as the failing record's offset commit >>>> (via StreamsProducer.maybeBeginTransaction()). The global thread has >>>> no consumer-group offset commit, and the checkpoint file sits outside >>>> any Kafka transaction. Do we run the global producer at-least-once >>>> even under EOS, or do we wrap DLQ sends in a transaction? If the >>>> latter, how do we handle ordering against maybeCheckpoint(), the new >>>> transactional.id, and fencing semantics that don't exist for the >>>> global thread today? >>>> >>>> LB2: What does the producer config look like - in particular client.id >>>> and transactional.id? How do we avoid colliding with the per-thread >>>> task producers under EOS? >>>> >>>> LB3: What's the shutdown ordering with the new producer, and what >>>> happens if sendException fires (e.g. DLQ topic auth failure)? >>>> >>>> LB4: processing.exception.handler.global.enabled is already >>>> deprecated. Is the semantic change from "drop with warning" to >>>> "produce to DLQ" called out in Compatibility, and does the deprecation >>>> javadoc still reflect what the config does? >>>> >>>> LB5: How does this interact with the deserialization handler path? >>>> RecordDeserializer.handleDeserializationFailure casts the context to >>>> RecordCollector.Supplier unconditionally, and >>>> GlobalProcessorContextImpl isn't one today - so a deserialization >>>> handler returning DLQ records during global-state restoration >>>> currently hits a ClassCastException rather than warn-and-drop. Does >>>> the KIP intend to cover this case too (it seems to fall out for free >>>> once GlobalProcessorContextImpl becomes a RecordCollector.Supplier)? >>>> >>>> LB6: What test scenarios are planned? For comparison, >>>> DeadLetterQueueIntegrationTest on the normal path covers >>>> DSL/ProcessorAPI x FAIL/CONTINUE plus a deserialization variant - are >>>> we mirroring that coverage for the global thread? >>>> >>>> LB7: Small naming nit - the KIP introduces a recordCollector field on >>>> GlobalProcessorContextImpl, but the analogous field on >>>> ProcessorContextImpl is just called collector. Any reason not to >>>> match? >>>> >>>> Cheers, >>>> Lucas >>>> >>>
