Hi @Lucas Brutschy <[email protected]>, thanks for reviewing it. These
are my view on it. Let me know what you think ?

LB1:  To avoid duplicates for  DLQ topic, I don't think we should have an
EOS specific logic  for global thread. This would be a big architectural
change also, as we maintain checkpointing in a separate file  rather
than consumer
offsets. I can update the KIP with these details if you are also aligned on
it.

LB2:  Will not have an EOS producer specifically in the global thread. The
global thread will use a dedicated non-transactional producer — no
transactional.id is set, so there is no collision risk with stream thread
producers under EOS.
   The client.id will follow a distinct naming convention (e.g.
<threadId>-global-producer) to avoid any confusion with stream thread
producers which use the <threadId>-producer suffix.May be we need to
introduce global producer config and does not allow setting EOS for global
producer.

LB3:  sendException is never checked in the poll loop today. I will add a
checkForException() call inside StateConsumer.pollAndUpdate() (mirroring
what stream threads  do after each process() call), so async DLQ send
failures like auth errors surface promptly rather than being silently
swallowed.

LB4: PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG is deprecated since
4.3. The semantic change  from "log a warning and drop the record" to
"invoke the handler and optionally produce to DLQ"  is a meaningful
behaviour change for anyone who has set this config. I will add an explicit
entry in the KIP's Compatibility section documenting this, and update the
@deprecated javadoc on the config to accurately describe what it now does
(and does not do) rather than leaving the old description in place.

LB5:  Yes, RecordDeserializer.handleDeserializationFailure unconditionally
casts processorContext to RecordCollector.Supplier at line 124 and today
this throws a  ClassCastException when the deserialization handler returns
DLQ records during global state restoration, because
GlobalProcessorContextImpl does not implement RecordCollector.Supplier. I
flagged this in a previous PR comment
https://github.com/apache/kafka/pull/21535#issuecomment-4012433576. Once
GlobalProcessorContextImpl implements RecordCollector.Supplier as part of
this KIP, the cast succeeds, and the deserialization DLQ path works
correctly.

LB6:  I will add integration tests mirroring the coverage in
DeadLetterQueueIntegrationTest.

LB7:    Good catch; will rename the field from recordCollector to collector
to match ProcessorContextImpl and keep the naming consistent across both
context implementations.

Thanks and Regards
Arpit Goyal
8861094754



On Sun, May 31, 2026 at 7:21 AM Arpit Goyal <[email protected]>
wrote:

>
>
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
> On Sat, 30 May, 2026, 10:32 pm Arpit Goyal, <[email protected]>
> wrote:
>
>> Hi @Lucas Brutschy <[email protected]>, thanks for reviewing it.
>> These are my view on it. Let me know what you think ?
>>
>> LB1:  To avoid duplicates for  DLQ topic, I don't think we should have
>> an EOS specific logic  for global thread. This would be a big
>> architectural change also, as we maintain checkpointing in a separate file
>> rather than consumer offsets. I can update the KIP with these details if
>> you are also aligned on it.
>>
>> LB2:  Will not have an EOS producer specifically in the global thread.
>> The global thread will use a dedicated non-transactional producer — no
>> transactional.id is set, so there is no collision risk with stream
>> thread producers under EOS.
>>    The client.id will follow a distinct naming convention (e.g.
>> <threadId>-global-producer) to avoid any confusion with stream thread
>> producers which use the <threadId>-producer suffix.May be we need to
>> introduce global producer config and does not allow setting EOS for global
>> producer.
>>
>> LB3:  sendException is never checked in the poll loop today. I will add a
>> checkForException() call inside StateConsumer.pollAndUpdate() (mirroring
>> what stream threads  do after each process() call), so async DLQ send
>> failures like auth errors surface promptly rather than being silently
>> swallowed.
>>
>> LB4: PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG is deprecated
>> since 4.3. The semantic change  from "log a warning and drop the record" to
>> "invoke the handler and optionally produce to DLQ"  is a meaningful
>> behaviour change for anyone who has set this config. I will add an explicit
>> entry in the KIP's Compatibility section documenting this, and update the
>> @deprecated javadoc on the config to accurately describe what it now does
>> (and does not do) rather than leaving the old description in place.
>>
>> LB5:  Yes, RecordDeserializer.handleDeserializationFailure
>> unconditionally casts processorContext to RecordCollector.Supplier at line
>> 124 and today this throws a  ClassCastException when the deserialization
>> handler returns DLQ records during global state restoration, because
>> GlobalProcessorContextImpl does not implement RecordCollector.Supplier. I
>> flagged this in a previous PR comment
>> https://github.com/apache/kafka/pull/21535#issuecomment-4012433576. Once
>> GlobalProcessorContextImpl implements RecordCollector.Supplier as part of
>> this KIP, the cast succeeds, and the deserialization DLQ path works
>> correctly.
>>
>> LB6:  I will add integration tests mirroring the coverage in
>> DeadLetterQueueIntegrationTest.
>>
>> LB7:    Good catch; will rename the field from recordCollector to
>> collector to match ProcessorContextImpl and keep the naming consistent
>> across both context implementations.
>>
>> Thanks and Regards
>> Arpit Goyal
>> 8861094754
>>
>>
>> On Mon, May 25, 2026 at 11:11 AM Arpit Goyal <[email protected]>
>> wrote:
>>
>>> Hi Lucas
>>> Never mind, i got the link
>>> https://lists.apache.org/thread/t7x46hl4ykozrtj641woz3g52cqpwlms .
>>>
>>> Thanks and Regards
>>> Arpit Goyal
>>> 8861094754
>>>
>>>
>>> On Mon, May 25, 2026 at 11:10 AM Arpit Goyal <[email protected]>
>>> wrote:
>>>
>>>> Hi Lucas
>>>> Could you help update it or provide the link? I don't have access to
>>>> it.
>>>> [image: Screenshot 2026-05-25 at 11.08.57 AM.png]
>>>> Thanks and Regards
>>>> Arpit Goyal
>>>> 8861094754
>>>>
>>>>
>>>> On Fri, May 22, 2026 at 3:20 PM Lucas Brutschy via dev <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Arpit,
>>>>>
>>>>> Thanks for picking this up. A few questions before this goes to VOTE.
>>>>>
>>>>> Nit: the discussion thread link on the KIP is incorrect, can you fix
>>>>> it?
>>>>>
>>>>> LB1: How does this interact with EOS? Under EOS, the normal DLQ write
>>>>> is part of the same transaction as the failing record's offset commit
>>>>> (via StreamsProducer.maybeBeginTransaction()). The global thread has
>>>>> no consumer-group offset commit, and the checkpoint file sits outside
>>>>> any Kafka transaction. Do we run the global producer at-least-once
>>>>> even under EOS, or do we wrap DLQ sends in a transaction? If the
>>>>> latter, how do we handle ordering against maybeCheckpoint(), the new
>>>>> transactional.id, and fencing semantics that don't exist for the
>>>>> global thread today?
>>>>>
>>>>> LB2: What does the producer config look like - in particular client.id
>>>>> and transactional.id? How do we avoid colliding with the per-thread
>>>>> task producers under EOS?
>>>>>
>>>>> LB3: What's the shutdown ordering with the new producer, and what
>>>>> happens if sendException fires (e.g. DLQ topic auth failure)?
>>>>>
>>>>> LB4: processing.exception.handler.global.enabled is already
>>>>> deprecated. Is the semantic change from "drop with warning" to
>>>>> "produce to DLQ" called out in Compatibility, and does the deprecation
>>>>> javadoc still reflect what the config does?
>>>>>
>>>>> LB5: How does this interact with the deserialization handler path?
>>>>> RecordDeserializer.handleDeserializationFailure casts the context to
>>>>> RecordCollector.Supplier unconditionally, and
>>>>> GlobalProcessorContextImpl isn't one today - so a deserialization
>>>>> handler returning DLQ records during global-state restoration
>>>>> currently hits a ClassCastException rather than warn-and-drop. Does
>>>>> the KIP intend to cover this case too (it seems to fall out for free
>>>>> once GlobalProcessorContextImpl becomes a RecordCollector.Supplier)?
>>>>>
>>>>> LB6: What test scenarios are planned? For comparison,
>>>>> DeadLetterQueueIntegrationTest on the normal path covers
>>>>> DSL/ProcessorAPI x FAIL/CONTINUE plus a deserialization variant - are
>>>>> we mirroring that coverage for the global thread?
>>>>>
>>>>> LB7: Small naming nit - the KIP introduces a recordCollector field on
>>>>> GlobalProcessorContextImpl, but the analogous field on
>>>>> ProcessorContextImpl is just called collector. Any reason not to
>>>>> match?
>>>>>
>>>>> Cheers,
>>>>> Lucas
>>>>>
>>>>

Reply via email to