Hi Bruno,

Thanks a lot for your comments!

> BC1
> Do you also plan to add an argument to the StreamsResetter tool to
> delete the default DLQ topic when a Streams app is reset?
Good point, I did not think about the StreamsResetter tool. Thinking
out loud, I am not sure if it is a good idea to add an option to clean
them up. In my opinion DLQ topics should be viewed as a sink topic and
AFAIK, this tool does not clean up sink topics.

> BC2
In case of a custom exception handlers, they can get the
errors.deadletterqueue.topic.name configuration by overriding `void
configure(Map<String, ?> configs);`. As it is the location where all
the configuration can be accessed, I think it's the best way no? I
will think about it a bit further, it might be useful/convenient for
users.

> BC3
The "with" syntax is used in many locations in the Kafka Streams
public classes, that's why I used it, e.g. Materialized.with(),
Consumed.withKeySerde(...).withValueSerde(...), Grouped.withName(...).
I do agree that .andAddToDeadLetterQueue is more intuitive, but I
would argue that being consistent is better in this situation. What do
you think?

Cheers,
Damien

On Tue, 3 Sept 2024 at 14:59, Bruno Cadonna <cado...@apache.org> wrote:
>
> Hi,
>
> Thanks for the updates!
>
> I have a couple of comments.
>
> BC1
> Do you also plan to add an argument to the StreamsResetter tool to
> delete the default DLQ topic when a Streams app is reset?
>
>
> BC2
> Would it make sense to add errors.deadletterqueue.topic.name to the
> ErrorHandlerContext in case that a custom exception handler wants to
> write to the configured DLQ topic?
> For example if only one of the handler needs to be customized but all
> handlers should write to the configured DLQ topic.
>
>
> BC3
> What do you think about renaming withDeadLetterQueueRecords() to
> andAddToDeadLetterQueue()?
> A customized handler would look like
>
> public ProcessingHandlerResponse handle(
>      final ErrorHandlerContext context,
>      final Record<?, ?> record,
>      final Exception exception
> ) {
>      return ProcessingHandlerResponse.CONTINUE
>          .andAddToDeadLetterQueue(
>              Collections.singletonList(new ProducerRecord<>(
>                  "app-dlq",
>                  "Hello".getBytes(StandardCharsets.UTF_8),
>                  "World".getBytes(StandardCharsets.UTF_8)
>              ))
>          );
> }
>
> I think the code becomes more readable.
>
>
> Best,
> Bruno
>
> On 8/30/24 3:37 PM, Damien Gasparina wrote:
> > Hi everyone,
> >
> > We just updated KIP-1034, we changed the following:
> >    - We included the ProcessingExceptionHandler (KIP-1033) directly in the 
> > KIP;
> >    - We provided examples to clarify the new configuration, and how it
> > could be leveraged.
> >
> > I think we can resume the conversation on this KIP.
> >
> > Cheers,
> > Damien Sebastien and Loic
> >
> >
> > On Tue, 27 Aug 2024 at 15:37, Sebastien Viale
> > <sebastien.vi...@michelin.com> wrote:
> >>
> >>
> >> Hi Bruno,
> >>
> >> We have planned a meeting for next friday to discuss it with Loic and 
> >> Damien.
> >>
> >> We will be able to restart discussions about it soon.
> >>
> >> regards
> >>
> >> ________________________________
> >> De : Bruno Cadonna <cado...@apache.org>
> >> Envoyé : lundi 26 août 2024 11:32
> >> À : dev@kafka.apache.org <dev@kafka.apache.org>
> >> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
> >>
> >> Warning External sender Do not click on any links or open any attachments 
> >> unless you trust the sender and know the content is safe.
> >>
> >> Hi Loïc, Sebastien, and Damien,
> >>
> >> Now that KIP-1033 is going to be released in 3.9, what is the plan to
> >> progress with this KIP?
> >>
> >> Is the KIP up-to-date, so that we can restart discussion?
> >>
> >> Best,
> >> Bruno
> >>
> >> This email was screened for spam and malicious content but exercise 
> >> caution anyway.
> >>
> >>
> >>
> >>
> >> On 6/13/24 6:16 PM, Damien Gasparina wrote:
> >>> Hi Bruno,
> >>>
> >>> We focused our effort (well, mostly Seb and Loic :)) on KIP-1033,
> >>> that's why not much progress has been made on this one yet.
> >>> Regarding your points:
> >>>
> >>> B1: It is up to the user to specify the DLQ topic name and to
> >>> implement a potential differentiation. I tend to think that having one
> >>> DLQ per application ID is the wisest, but I encountered cases and
> >>> applications that preferred having a shared DLQ topic between multiple
> >>> applications, e.g. to reduce the number of partitions, or to ease
> >>> monitoring
> >>>
> >>> B2 : Goot catch, it should be
> >>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT"
> >>> prefix during the discussion, looks like I forgot to update all
> >>> occurrences in the KIP.
> >>>
> >>> B3 :The trigger for sending to the DLQ would be if
> >>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user
> >>> implemented a custom exception handler that returns DLQ records.
> >>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the
> >>> behavior of the default handler, thus custom exception handlers will
> >>> completely ignore this parameter.
> >>>
> >>> I think it's a good trade-off between providing a production-ready
> >>> default implementation, yet providing sufficient flexibility for
> >>> complex use-cases.
> >>> This behavior definitely needs to be documented, but I guess it's safe
> >>> to push the responsibility of the DLQ records to the user if they
> >>> implement custom handlers.
> >>>
> >>> Cheers,
> >>> Damien
> >>>
> >>>
> >>> On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna <cado...@apache.org> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> since there was not too much activity in this thread recently, I was
> >>>> wondering what the status of this discussion is.
> >>>>
> >>>> I cannot find the examples in the KIP Sébastien mentioned in the last
> >>>> message to this thread. I can also not find the corresponding definition
> >>>> of the following method call in the KIP:
> >>>>
> >>>> FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>>
> >>>> I have also some comments:
> >>>>
> >>>> B1
> >>>> Did you consider to prefix the dead letter queue topic names with the
> >>>> application ID to distinguish the topics between Streams apps? Or is the
> >>>> user responsible for the differentiation? If the user is responsible, we
> >>>> risk that faulty records of different Streams apps end up in the same
> >>>> dead letter queue.
> >>>>
> >>>> B2
> >>>> Is the name of the dead letter queue topic config
> >>>> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or
> >>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used.
> >>>>
> >>>> B3
> >>>> What is exactly the trigger to send a record to the dead letter queue?
> >>>> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a
> >>>> record to the return value of the exception handler?
> >>>> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do
> >>>> not add a record to the return value of the handler? What happens if I
> >>>> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to
> >>>> the return value of the handler?
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 4/22/24 10:19 PM, Sebastien Viale wrote:
> >>>>> Hi,
> >>>>>
> >>>>> Thanks for your remarks
> >>>>>
> >>>>> L1. I would say "who can do the most can do the least", even though 
> >>>>> most people will fail and stop, we found it interesting to offer the 
> >>>>> possibility to fail-and-send-to-DLQ
> >>>>>
> >>>>> L2: We did not consider extending the TimestampExtractor because we 
> >>>>> estimate it out of scope for this KIP. Perhaps it will be possible to 
> >>>>> include it in an ExceptionHandler later.
> >>>>>
> >>>>> L3: we will include an example in the KIP, but as we mentioned earlier, 
> >>>>> the DLQ topic can be different in each custom Exception Handler:
> >>>>>
> >>>>> When providing custom handlers, users would have the possibility to 
> >>>>> return:
> >>>>> * FAIL
> >>>>> * CONTINUE
> >>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>>>
> >>>>> cheers !
> >>>>> Sébastien
> >>>>>
> >>>>>
> >>>>> ________________________________
> >>>>> De : Lucas Brutschy <lbruts...@confluent.io.INVALID>
> >>>>> Envoyé : lundi 22 avril 2024 14:36
> >>>>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
> >>>>>
> >>>>> Warning External sender Do not click on any links or open any 
> >>>>> attachments unless you trust the sender and know the content is safe.
> >>>>>
> >>>>> Hi!
> >>>>>
> >>>>> Thanks for the KIP, great stuff.
> >>>>>
> >>>>> L1. I was a bit confused that the default configuration (once you set
> >>>>> a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
> >>>>> correctly. Is this something that will be a common use-case, and is it
> >>>>> a configuration that we want to encourage? It expected that you either
> >>>>> want to fail or skip-and-send-to-DLQ.
> >>>>>
> >>>>> L2. Have you considered extending the `TimestampExtractor` interface
> >>>>> so that it can also produce to DLQ? AFAIK it's not covered by any of
> >>>>> the existing exception handlers, but it can cause similar failures
> >>>>> (potentially custom logic, depends on validity input record). There
> >>>>> could also be a default implementation as a subclass of
> >>>>> `ExtractRecordMetadataTimestamp`.
> >>>>>
> >>>>> L3. It would be nice to include an example of how to produce to
> >>>>> multiple topics in the KIP, as I can imagine that this will be a
> >>>>> common use-case. I wasn't sure how much code would be involved to make
> >>>>> it work. If a lot of code is required, we may want to consider
> >>>>> exposing some utils that make it easier.
> >>>>>
> >>>>> Cheers,
> >>>>> Lucas
> >>>>>
> >>>>> This email was screened for spam and malicious content but exercise 
> >>>>> caution anyway.
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina 
> >>>>> <d.gaspar...@gmail.com> wrote:
> >>>>>>
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> Following all the discussion on this KIP and KIP-1033, we introduced a
> >>>>>> new container class containing only processing context metadata:
> >>>>>> ProcessingMetadata. This new container class is actually part of
> >>>>>> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> >>>>>> think it's the wisest implementation wise.
> >>>>>>
> >>>>>> I also clarified the interface of the enums:
> >>>>>> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[],
> >>>>>> byte[]>> deadLetterQueueRecords) . Very likely most users would just
> >>>>>> send one DLQ record, but there might be specific use-cases and what
> >>>>>> can do more can do less, so I added an Iterable.
> >>>>>>
> >>>>>> I took some time to think about the impact of storing the
> >>>>>> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> >>>>>> the topic/offset/partition should be fine, but I am concerned about
> >>>>>> storing the rawSourceKey/Value. I think it could impact some specific
> >>>>>> use-cases, for example, a high-throughput Kafka Streams application
> >>>>>> "counting" messages could have huge source input messages, and very
> >>>>>> small sink messages, here, I assume storing the rawSourceKey/Value
> >>>>>> could significantly require more memory than the actual Kafka Producer
> >>>>>> buffer.
> >>>>>>
> >>>>>> I think the safest approach is actually to only store the fixed-size
> >>>>>> metadata for the ProductionExceptionHandler.handle:
> >>>>>> topic/partition/offset/processorNodeId/taskId, it might be confusing
> >>>>>> for the user, but 1) it is still better than nowaday where there are
> >>>>>> no context information at all, 2) it would be clearly stated in the
> >>>>>> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> >>>>>> punctuate case). .
> >>>>>>
> >>>>>> Do you think it would be a suitable design Sophie?
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Damien
> >>>>>>
> >>>>>> On Sun, 14 Apr 2024 at 21:30, Loic Greffier 
> >>>>>> <loic.greff...@michelin.com> wrote:
> >>>>>>>
> >>>>>>> Hi Sophie,
> >>>>>>>
> >>>>>>> Thanks for your feedback.
> >>>>>>> Completing the Damien's comments here for points S1 and S5B.
> >>>>>>>
> >>>>>>> S1:
> >>>>>>>> I'm confused -- are you saying that we're introducing a new kind of 
> >>>>>>>> ProducerRecord class for this?
> >>>>>>>
> >>>>>>> I am wondering if it makes sense to alter the ProducerRecord from 
> >>>>>>> Clients API with a "deadLetterQueueTopicName" attribute dedicated to 
> >>>>>>> Kafka Streams DLQ.
> >>>>>>> Adding "deadLetterQueueTopicName" as an additional parameter to 
> >>>>>>> "withDeadLetterQueueRecord" is a good option, and may allow users to 
> >>>>>>> send records to different DLQ topics depending on conditions:
> >>>>>>> @Override
> >>>>>>> public ProductionExceptionHandlerResponse handle(final 
> >>>>>>> ProcessingContext context,
> >>>>>>> ProducerRecord<byte[], byte[]> record,
> >>>>>>> Exception exception) {
> >>>>>>> if (condition1) {
> >>>>>>> return ProductionExceptionHandlerResponse.CONTINUE
> >>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-a");
> >>>>>>> }
> >>>>>>> if (condition2) {
> >>>>>>> return ProductionExceptionHandlerResponse.CONTINUE
> >>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-b");
> >>>>>>> }
> >>>>>>> return ProductionExceptionHandlerResponse.CONTINUE
> >>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-c");
> >>>>>>> }
> >>>>>>>
> >>>>>>> S5B:
> >>>>>>>> I was having a bit of trouble understanding what the behavior would 
> >>>>>>>> be if someone configured a "errors.deadletterqueue.topic.name" but 
> >>>>>>>> didn't implement the handlers.
> >>>>>>>
> >>>>>>> The provided LogAndContinueExceptionHandler, 
> >>>>>>> LogAndFailExceptionHandler and DefaultProductionExceptionHandler 
> >>>>>>> should be able to tell if records should be sent to DLQ or not.
> >>>>>>> The "errors.deadletterqueue.topic.name" takes place to:
> >>>>>>>
> >>>>>>> * Specifying if the provided handlers should or should not send 
> >>>>>>> records to DLQ.
> >>>>>>> * If the value is empty, the handlers should not send records to DLQ.
> >>>>>>> * If the value is not empty, the handlers should send records to DLQ.
> >>>>>>> * Define the name of the DLQ topic that should be used by the 
> >>>>>>> provided handlers.
> >>>>>>>
> >>>>>>> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided 
> >>>>>>> handlers should return either:
> >>>>>>>
> >>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue)
> >>>>>>> * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue).
> >>>>>>> If "errors.deadletterqueue.topic.name" is defined but neither 
> >>>>>>> DeserializationExceptionHandler nor ProductionExceptionHandler 
> >>>>>>> classes are defined in the configuration, then nothing should happen 
> >>>>>>> as sending to DLQ is based on handlers’ response.
> >>>>>>> When providing custom handlers, users would have the possibility to 
> >>>>>>> return:
> >>>>>>>
> >>>>>>> * FAIL
> >>>>>>> * CONTINUE
> >>>>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>>>>>
> >>>>>>> A DLQ topic name is currently required using the two last response 
> >>>>>>> types.
> >>>>>>> I am wondering if it could benefit users to ease the use of the 
> >>>>>>> default DLQ topic "errors.deadletterqueue.topic.name" when 
> >>>>>>> implementing custom handlers, with such kind of implementation:
> >>>>>>>
> >>>>>>> * FAIL.withDefaultDeadLetterQueueRecord(record)
> >>>>>>> * CONTINUE.withDefaultDeadLetterQueueRecord(record)
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Loïc
> >>>>>>>
> >>>>>>> De : Damien Gasparina <d.gaspar...@gmail.com>
> >>>>>>> Envoyé : dimanche 14 avril 2024 20:24
> >>>>>>> À : dev@kafka.apache.org
> >>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka 
> >>>>>>> Streams
> >>>>>>>
> >>>>>>> Warning External sender Do not click on any links or open any 
> >>>>>>> attachments unless you trust the sender and know the content is safe.
> >>>>>>>
> >>>>>>> Hi Sophie,
> >>>>>>>
> >>>>>>> Thanks a lot for your feedback and your detailed comments.
> >>>>>>>
> >>>>>>> S1.
> >>>>>>>> I'm confused -- are you saying that we're introducing a new kind of
> >>>>>>> ProducerRecord class for this?
> >>>>>>>
> >>>>>>> Sorry for the poor wording, that's not what I meant. While writing the
> >>>>>>> KIP, I was hesitating between 1. leveraging the Kafka Producer
> >>>>>>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in
> >>>>>>> a separate parameter, 3. a new custom interface (e.g.
> >>>>>>> DeadLetterQueueRecord).
> >>>>>>> As the KafkaProducer ProducerRecord is not used in the Kafka Streams
> >>>>>>> API (except ProductionExceptionHandler) and I would like to avoid a
> >>>>>>> new interface if not strictly required, I leaned toward option 2.
> >>>>>>> Thinking about it, maybe option 1. would be best, but I assume it
> >>>>>>> could create confusion with KafkaStreams ProducerRecord. Let me sleep
> >>>>>>> on it.
> >>>>>>>
> >>>>>>> S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and
> >>>>>>> your point in S4, it seems more and more likely that we will create a
> >>>>>>> new container class containing only the metadata for the exception
> >>>>>>> handlers. To be consistent, I think we should use this new
> >>>>>>> implementation in all exception handlers.
> >>>>>>> The only issue I could think off is that the new interface would
> >>>>>>> expose less data than the current ProcessorContext in the
> >>>>>>> DeserializationException(e.g. stateDir(), metrics(), getStateStore()),
> >>>>>>> thus it could be hard for some users to migrate to the new interface.
> >>>>>>> I do expect that only a few users would be impacted as the javadoc is
> >>>>>>> very clear: `Note, that the passed in {@link ProcessorContext} only
> >>>>>>> allows access to metadata like the task ID.`
> >>>>>>>
> >>>>>>> S3. I completely agree with you, it is something that might not be
> >>>>>>> trivial and should be thoroughly covered by unit tests during the
> >>>>>>> implementation.
> >>>>>>>
> >>>>>>> S4. Good point, I did not notice that the ProductionExceptionHandler
> >>>>>>> is also invoked in the producer.send() callback.
> >>>>>>> Capturing the ProcessingContext for each in-flight message is probably
> >>>>>>> not possible. I think there is no other way to write a custom
> >>>>>>> container class holding only the metadata that are essentials, I am
> >>>>>>> thinking of storing the following attributes: source topic, partition,
> >>>>>>> offset, rawKey, rawValue and taskId.
> >>>>>>> Those metadata should be relatively small, but I assume that there
> >>>>>>> could be a high number of in-flight messages, especially with at least
> >>>>>>> once processing guarantee. Do you think it would be fine memory wise?
> >>>>>>>
> >>>>>>> S5. As many exceptions are only accessible in exception handlers, and
> >>>>>>> we wanted to 1) allow users to customize the DLQ records and 2) have a
> >>>>>>> suitable DLQ out of the box implementation, we felt it natural to rely
> >>>>>>> on exception handlers, that's also why we created KIP-1033.
> >>>>>>> Piggybacking on the enum response was the cleanest way we could think
> >>>>>>> off, but we are completely open to suggestions.
> >>>>>>>
> >>>>>>> S5a. Completely agree with you on this point, for this DLQ approach to
> >>>>>>> be complete, the ProcessingExceptionHandler introduced in KIP-1033 is
> >>>>>>> required. KIP-1033 is definitely our first priority. We decided to
> >>>>>>> kick-off the KIP-1034 discussion as we expected the discussions to be
> >>>>>>> dynamic and could potentially impact some choices of KIP-1033.
> >>>>>>>
> >>>>>>> S5b. In this KIP, we wanted to 1. provide as much flexibility to the
> >>>>>>> user as possible; 2. provide a good default implementation
> >>>>>>> for the DLQ without having to write custom exception handlers.
> >>>>>>> For the default implementation, we introduced a new configuration:
> >>>>>>> errors.deadletterqueue.topic.name.
> >>>>>>>
> >>>>>>> If this configuration is set, it changes the behavior of the provided
> >>>>>>> exception handlers to return a DLQ record containing the raw key/value
> >>>>>>> + headers + exception metadata in headers.
> >>>>>>> If the out of the box implementation is not suitable for a user, e.g.
> >>>>>>> the payload needs to be masked in the DLQ, it could implement their
> >>>>>>> own exception handlers. The errors.deadletterqueue.topic.name would
> >>>>>>> only impact Kafka Streams bundled exception handlers (e.g.
> >>>>>>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler)
> >>>>>>>
> >>>>>>> Let me update the KIP to make it clear and also provide examples.
> >>>>>>>
> >>>>>>> S6/S7. Good point, mea culpa for the camel case, it must have been a
> >>>>>>> sugar rush :)
> >>>>>>>
> >>>>>>> Thanks again for your detailed comments and pointing out S4
> >>>>>>> (production exception & Processing Context)!
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Damien
> >>>>>>> This email was screened for spam and malicious content but exercise 
> >>>>>>> caution anyway.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman 
> >>>>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for the KIP, this will make a lot of people very happy.
> >>>>>>>>
> >>>>>>>> Wanted to chime in on a few points that have been raised so far and 
> >>>>>>>> add
> >>>>>>>> some of my own (numbering with an S to distinguish my points from the
> >>>>>>>> previous ones)
> >>>>>>>>
> >>>>>>>> S1.
> >>>>>>>>
> >>>>>>>>> 1.a I really meant ProducerRecord, that's the class used to forward 
> >>>>>>>>> to
> >>>>>>>>> downstream processors in the PAPI. The only information missing in
> >>>>>>>>> this class is the topic name. I also considered relying on the Kafka
> >>>>>>>>> Producer ProducerRecord, but I assume it would not be consistent 
> >>>>>>>>> with
> >>>>>>>>> the KafkaStreams API.
> >>>>>>>>
> >>>>>>>> I'm confused -- are you saying that we're introducing a new kind of
> >>>>>>>> ProducerRecord class for this? Why not just use the existing one, ie 
> >>>>>>>> the
> >>>>>>>> o.a.k.clients.producer.ProducerRecord class? This is what the
> >>>>>>>> ProductionExceptionHandler uses, so it's definitely "consistent". In 
> >>>>>>>> other
> >>>>>>>> words, we can remove the "String deadLetterQueueTopicName"
> >>>>>>>>
> >>>>>>>> S2.
> >>>>>>>> I think this would be a good opportunity to also deprecate the 
> >>>>>>>> existing
> >>>>>>>> #handle method of the DeserializationExceptionHandler, and replace 
> >>>>>>>> it with
> >>>>>>>> one that uses a ProcessingContext instead of the ProcessorContext. 
> >>>>>>>> Partly
> >>>>>>>> for the same reasons about guarding access to the #forward methods, 
> >>>>>>>> partly
> >>>>>>>> because this method needs to be migrated to the new PAPI interface
> >>>>>>>> anyways, and ProcessingContext is part of the new one.
> >>>>>>>>
> >>>>>>>> S3.
> >>>>>>>> Regarding 2a. -- I'm inclined to agree that records which a 
> >>>>>>>> Punctuator
> >>>>>>>> failed to produce should also be sent to the DLQ via the
> >>>>>>>> ProductionExceptionHandler. Users will just need to be careful about
> >>>>>>>> accessing certain fields of the ProcessingContext that aren't 
> >>>>>>>> available in
> >>>>>>>> the punctuator, and need to check the Optional returned by the
> >>>>>>>> ProcessingContext#recordMetadata API.
> >>>>>>>> Also, from an implementation standpoint, it will be really hard to
> >>>>>>>> distinguish between a record created by a punctuator vs a processor 
> >>>>>>>> from
> >>>>>>>> within the RecordCollector, which is the class that actually handles
> >>>>>>>> sending records to the Streams Producer and invoking the
> >>>>>>>> ProductionExceptionHandler. This is because the RecordCollector is 
> >>>>>>>> at the
> >>>>>>>> "end" of the topology graph and doesn't have any context about which 
> >>>>>>>> of the
> >>>>>>>> upstream processors actually attempted to forward a record.
> >>>>>>>>
> >>>>>>>> This in itself is at least theoretically solvable, but it leads into 
> >>>>>>>> my
> >>>>>>>> first major new point:
> >>>>>>>>
> >>>>>>>> S4:
> >>>>>>>> I'm deeply worried about passing the ProcessingContext in as a means 
> >>>>>>>> of
> >>>>>>>> forwarding metadata. The problem is that the processing/processor 
> >>>>>>>> context
> >>>>>>>> is a mutable class and is inherently meaningless outside the context 
> >>>>>>>> of a
> >>>>>>>> specific task. And when I said earlier that the RecordCollector sits 
> >>>>>>>> at
> >>>>>>>> the "end" of the topology, I meant that it's literally outside the 
> >>>>>>>> task's
> >>>>>>>> subtopology and is used/shared by all tasks on that StreamThread. So 
> >>>>>>>> to
> >>>>>>>> begin with, there's no guarantee what will actually be returned for
> >>>>>>>> essential methods such as the new #rawSourceKey/Value or the existing
> >>>>>>>> #recordMetadata
> >>>>>>>>
> >>>>>>>> For serialization exceptions it'll probably be correct, but for 
> >>>>>>>> general
> >>>>>>>> send errors it almost definitely won't be. In short, this is because 
> >>>>>>>> we
> >>>>>>>> send records to the producer after the sink node, but don't check 
> >>>>>>>> for send
> >>>>>>>> errors right away since obviously it takes some time for the 
> >>>>>>>> producer to
> >>>>>>>> actually send. In other words, sending/producing records is actually 
> >>>>>>>> done
> >>>>>>>> asynchronously with processing, and we simply check for errors on any
> >>>>>>>> previously-sent records
> >>>>>>>> during the send on a new record in a sink node. This means the 
> >>>>>>>> context we
> >>>>>>>> would be passing in to a (non-serialization) exception would pretty 
> >>>>>>>> much
> >>>>>>>> always correspond not the the record that experienced the error, but 
> >>>>>>>> the
> >>>>>>>> random record that happened to be being sent when we checked and saw 
> >>>>>>>> the
> >>>>>>>> error for the failed record.
> >>>>>>>>
> >>>>>>>> This discrepancy, in addition to the whole "sourceRawKey/Value and
> >>>>>>>> recordMetadata are null for punctuators" issue, seems like an
> >>>>>>>> insurmountable inconsistency that is more likely to cause users 
> >>>>>>>> confusion
> >>>>>>>> or problems than be helpful.
> >>>>>>>> We could create a new metadata object and copy over the relevant 
> >>>>>>>> info from
> >>>>>>>> the ProcessingContext, but I worry that has the potential to explode 
> >>>>>>>> memory
> >>>>>>>> since we'd need to hold on to it for all in-flight records up until 
> >>>>>>>> they
> >>>>>>>> are either successfully sent or failed and passed in to the
> >>>>>>>> ProductionExceptionHandler. But if the metadata is relatively small, 
> >>>>>>>> it's
> >>>>>>>> probably fine. Especially if it's just the raw source key/value. Are
> >>>>>>>> there any other parts of the ProcessingContext you think should be 
> >>>>>>>> made
> >>>>>>>> available?
> >>>>>>>>
> >>>>>>>> Note that this only applies to the ProductionExceptionHandler, as the
> >>>>>>>> DeserializationExceptionHandler (and the newly proposed
> >>>>>>>> ProcessingExceptionHandler) would both be invoked immediately and 
> >>>>>>>> therefore
> >>>>>>>> with the failed record's context. However, I'm also a bit 
> >>>>>>>> uncomfortable
> >>>>>>>> with adding the rawSourceKey/rawSourceValue to the 
> >>>>>>>> ProcessingContext. So
> >>>>>>>> I'd propose to just wrap those (and any other metadata you might 
> >>>>>>>> want) in a
> >>>>>>>> container class and pass that in instead of the ProcessingContext, 
> >>>>>>>> to all
> >>>>>>>> of the exception handlers.
> >>>>>>>>
> >>>>>>>> S5:
> >>>>>>>> For some reason I'm finding the proposed API a little bit awkward, 
> >>>>>>>> although
> >>>>>>>> it's entirely possible that the problem is with me, not the proposal 
> >>>>>>>> :)
> >>>>>>>> Specifically I'm struggling with the approach of piggybacking on the
> >>>>>>>> exception handlers and their response enums to dictate how records 
> >>>>>>>> are
> >>>>>>>> forwarded to the DLQ. I think this comes down to two things, though 
> >>>>>>>> again,
> >>>>>>>> these aren't necessarily problems with the API and probably just 
> >>>>>>>> need to be
> >>>>>>>> hashed out:
> >>>>>>>>
> >>>>>>>> S5a.
> >>>>>>>> When I envision a DLQ, to me, the most common use case would be to 
> >>>>>>>> forward
> >>>>>>>> input records that failed somewhere along the processing graph. But 
> >>>>>>>> it
> >>>>>>>> seems like all the focus here is on the two far ends of the 
> >>>>>>>> subtopology --
> >>>>>>>> the input/consumer, and the output/producer. I get that
> >>>>>>>> the ProcessingExceptionHandler is really the missing piece here, and 
> >>>>>>>> it's
> >>>>>>>> hard to say anything specific since it's not yet accepted, but maybe 
> >>>>>>>> a
> >>>>>>>> somewhat more concrete example would help. FWIW I think/hope to get 
> >>>>>>>> that
> >>>>>>>> KIP accepted and implementation ASAP, so I'm not worried about the 
> >>>>>>>> "what if
> >>>>>>>> it doesn't happen" case -- more just want to know what it will look 
> >>>>>>>> like
> >>>>>>>> when it does. Imo it's fine to build KIPs on top of future ones, it 
> >>>>>>>> feels
> >>>>>>>> clear that this part will just have to wait for that KIP to actually 
> >>>>>>>> be
> >>>>>>>> added.
> >>>>>>>>
> >>>>>>>> S5b:
> >>>>>>>> Why do users have to define the entire ProducerRecord -- shouldn't 
> >>>>>>>> Streams
> >>>>>>>> handle all this for them? Or will we just automatically send every 
> >>>>>>>> record
> >>>>>>>> on failure to the default global DLQ, and users only have to 
> >>>>>>>> implement the
> >>>>>>>> handlers if they want to change the headers or send to a different 
> >>>>>>>> topic? I
> >>>>>>>> was having a bit of trouble understanding what the behavior would be 
> >>>>>>>> if
> >>>>>>>> someone configured a "errors.deadletterqueue.topic.name" but didn't
> >>>>>>>> implement the handlers. Apologies if it's somewhere in the KIP and I
> >>>>>>>> happened to miss it!
> >>>>>>>>
> >>>>>>>> Either way, I really think an example would help me to better 
> >>>>>>>> imagine what
> >>>>>>>> this will look like in practice, and evaluate whether it actually 
> >>>>>>>> involves
> >>>>>>>> as much overhead as I'm worried it will. Can you add a section that
> >>>>>>>> includes a basic implementation of all the features here? Nothing too
> >>>>>>>> complicated, just the most bare-bones code needed to actually 
> >>>>>>>> implement
> >>>>>>>> forwarding to a dead-letter-queue via the handlers.
> >>>>>>>>
> >>>>>>>> Lastly, two super small things:
> >>>>>>>>
> >>>>>>>> S6:
> >>>>>>>> We use camel case in Streams, so it should be rawSourceKey/Value 
> >>>>>>>> rather
> >>>>>>>> than raw_source_key/value
> >>>>>>>>
> >>>>>>>> S7:
> >>>>>>>> Can you add javadocs for the #withDeadLetterQueueRecord? For 
> >>>>>>>> example, it
> >>>>>>>> seems to me that if the topic to be sent to here is different than 
> >>>>>>>> the
> >>>>>>>> default/global DLQ, then the user will need to make sure to have 
> >>>>>>>> created
> >>>>>>>> this themselves up front.
> >>>>>>>>
> >>>>>>>> That's it from me...sorry for the long response, it's just because 
> >>>>>>>> I'm
> >>>>>>>> excited for this feature and have been waiting on a KIP for this for 
> >>>>>>>> years.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Sophie
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina 
> >>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Andrew,
> >>>>>>>>>
> >>>>>>>>> Thanks a lot for your review, plenty of good points!
> >>>>>>>>>
> >>>>>>>>> 11. Typo fixed, good cach.
> >>>>>>>>>
> >>>>>>>>> 12. I do agree with you and Nick also mentioned it, I updated the 
> >>>>>>>>> KIP
> >>>>>>>>> to mention that context headers should be forwarded.
> >>>>>>>>>
> >>>>>>>>> 13. Good catch, to be consistent with KIP-298, and without a strong
> >>>>>>>>> opinion from my side, I updated the KIP with your prefix proposal.
> >>>>>>>>>
> >>>>>>>>> 14. I am not sure about this point, a big difference between KIP-298
> >>>>>>>>> and this one is that the handlers can easily be overridden, 
> >>>>>>>>> something
> >>>>>>>>> that is not doable in Kafka Connect.
> >>>>>>>>> If someone would like a different behavior, e.g. to mask the payload
> >>>>>>>>> or include further headers, I think we should encourage them to 
> >>>>>>>>> write
> >>>>>>>>> their own exception handlers to build the DLQ Record the way they
> >>>>>>>>> expect.
> >>>>>>>>>
> >>>>>>>>> 15. Yeah, that's a good point, I was not fully convinced about 
> >>>>>>>>> putting
> >>>>>>>>> a String in it, I do assume that "null" is also a valid value. I do
> >>>>>>>>> assume that the Stacktrace and the Exception in this case are the 
> >>>>>>>>> key
> >>>>>>>>> metadata for the user to troubleshoot the problem.
> >>>>>>>>> I updated the KIP to mention that the value should be null if
> >>>>>>>>> triggered in a punctuate.
> >>>>>>>>>
> >>>>>>>>> 16. I added a session to mention that Kafka Streams would not try to
> >>>>>>>>> automatically create the topic and the topic should either be
> >>>>>>>>> automatically created, or pre-created.
> >>>>>>>>>
> >>>>>>>>> 17. If a DLQ record can not be sent, the exception should go to the
> >>>>>>>>> uncaughtExceptionHandler. Let me clearly state it in the KIP.
> >>>>>>>>>
> >>>>>>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina 
> >>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Nick,
> >>>>>>>>>>
> >>>>>>>>>> 1. Good point, that's less impactful than a custom interface, I 
> >>>>>>>>>> just
> >>>>>>>>>> updated the KIP with the new signature.
> >>>>>>>>>>
> >>>>>>>>>> 1.a I really meant ProducerRecord, that's the class used to 
> >>>>>>>>>> forward to
> >>>>>>>>>> downstream processors in the PAPI. The only information missing in
> >>>>>>>>>> this class is the topic name. I also considered relying on the 
> >>>>>>>>>> Kafka
> >>>>>>>>>> Producer ProducerRecord, but I assume it would not be consistent 
> >>>>>>>>>> with
> >>>>>>>>>> the KafkaStreams API.
> >>>>>>>>>>
> >>>>>>>>>> 2. Agreed
> >>>>>>>>>>
> >>>>>>>>>> 2.a I do think exceptions occurring during punctuate should be
> >>>>>>>>>> included in the DLQ.
> >>>>>>>>>> Even if building a suitable payload is almost impossible, even with
> >>>>>>>>>> custom code; those exceptions are still fatal for Kafka Streams by
> >>>>>>>>>> default and are something that can not be ignored safely.
> >>>>>>>>>> I do assume that most users would want to be informed if an error
> >>>>>>>>>> happened during a punctuate, even if only the metadata (e.g.
> >>>>>>>>>> stacktrace, exception) is provided.
> >>>>>>>>>> I am only concerned flooding the DLQ topic as, if a scheduled
> >>>>>>>>>> operation failed, very likely it will fails during the next
> >>>>>>>>>> invocation, but
> >>>>>>>>>>
> >>>>>>>>>> 4. Good point, I clarified the wording in the KIP to make it 
> >>>>>>>>>> explicit.
> >>>>>>>>>>
> >>>>>>>>>> 5. Good point, I will clearly mention that it is out of scope as 
> >>>>>>>>>> part
> >>>>>>>>>> of the KIP and might not be as trivial as people could expect. I 
> >>>>>>>>>> will
> >>>>>>>>>> update the KIP once I do have some spare time.
> >>>>>>>>>>
> >>>>>>>>>> 6. Oh yeah, I didn't think about it, but forwarding input headers
> >>>>>>>>>> would definitely make sense. Confluent Schema Registry ID is 
> >>>>>>>>>> actually
> >>>>>>>>>> part of the payload, but many correlation ID and technical metadata
> >>>>>>>>>> are passed through headers, it makes sense to forward them, 
> >>>>>>>>>> specially
> >>>>>>>>>> as it is the default behavior of Kafka Streams,
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford 
> >>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Damien and Sebastien,
> >>>>>>>>>>>
> >>>>>>>>>>> 1.
> >>>>>>>>>>> I think you can just add a `String topic` argument to the existing
> >>>>>>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> >>>>>>>>>>> deadLetterQueueRecord)` method, and then the implementation of the
> >>>>>>>>>>> exception handler could choose the topic to send records to using
> >>>>>>>>> whatever
> >>>>>>>>>>> logic the user desires. You could perhaps provide a built-in
> >>>>>>>>> implementation
> >>>>>>>>>>> that leverages your new config to send all records to an untyped 
> >>>>>>>>>>> DLQ
> >>>>>>>>> topic?
> >>>>>>>>>>>
> >>>>>>>>>>> 1a.
> >>>>>>>>>>> BTW you have a typo: in your DeserializationExceptionHandler, the 
> >>>>>>>>>>> type
> >>>>>>>>> of
> >>>>>>>>>>> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it
> >>>>>>>>> should
> >>>>>>>>>>> probably be `ConsumerRecord`.
> >>>>>>>>>>>
> >>>>>>>>>>> 2.
> >>>>>>>>>>> Agreed. I think it's a good idea to provide an implementation that
> >>>>>>>>> sends to
> >>>>>>>>>>> a single DLQ by default, but it's important to enable users to
> >>>>>>>>> customize
> >>>>>>>>>>> this with their own exception handlers.
> >>>>>>>>>>>
> >>>>>>>>>>> 2a.
> >>>>>>>>>>> I'm not convinced that "errors" (e.g. failed punctuate) should be 
> >>>>>>>>>>> sent
> >>>>>>>>> to a
> >>>>>>>>>>> DLQ topic like it's a bad record. To me, a DLQ should only contain
> >>>>>>>>> records
> >>>>>>>>>>> that failed to process. I'm not even sure how a user would
> >>>>>>>>>>> re-process/action one of these other errors; it seems like the 
> >>>>>>>>>>> purview
> >>>>>>>>> of
> >>>>>>>>>>> error logging to me?
> >>>>>>>>>>>
> >>>>>>>>>>> 4.
> >>>>>>>>>>> My point here was that I think it would be useful for the KIP to
> >>>>>>>>> contain an
> >>>>>>>>>>> explanation of the behavior both with KIP-1033 and without it. 
> >>>>>>>>>>> i.e.
> >>>>>>>>> clarify
> >>>>>>>>>>> if/how records that throw an exception in a processor are 
> >>>>>>>>>>> handled. At
> >>>>>>>>> the
> >>>>>>>>>>> moment, I'm assuming that without KIP-1033, processing exceptions
> >>>>>>>>> would not
> >>>>>>>>>>> cause records to be sent to the DLQ, but with KIP-1033, they 
> >>>>>>>>>>> would. If
> >>>>>>>>> this
> >>>>>>>>>>> assumption is correct, I think it should be made explicit in the 
> >>>>>>>>>>> KIP.
> >>>>>>>>>>>
> >>>>>>>>>>> 5.
> >>>>>>>>>>> Understood. You may want to make this explicit in the 
> >>>>>>>>>>> documentation for
> >>>>>>>>>>> users, so they understand the consequences of re-processing data 
> >>>>>>>>>>> sent
> >>>>>>>>> to
> >>>>>>>>>>> their DLQ. The main reason I raised this point is it's something 
> >>>>>>>>>>> that's
> >>>>>>>>>>> tripped me up in numerous KIPs that that committers frequently 
> >>>>>>>>>>> remind
> >>>>>>>>> me
> >>>>>>>>>>> of; so I wanted to get ahead of it for once! :D
> >>>>>>>>>>>
> >>>>>>>>>>> And one new point:
> >>>>>>>>>>> 6.
> >>>>>>>>>>> The DLQ record schema appears to discard all custom headers set 
> >>>>>>>>>>> on the
> >>>>>>>>>>> source record. Is there a way these can be included? In 
> >>>>>>>>>>> particular, I'm
> >>>>>>>>>>> concerned with "schema pointer" headers (like those set by Schema
> >>>>>>>>>>> Registry), that may need to be propagated, especially if the 
> >>>>>>>>>>> records
> >>>>>>>>> are
> >>>>>>>>>>> fed back into the source topics for re-processing by the user.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Nick
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
> >>>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks a lot for your review and your useful comments!
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. It is a good point, as you mentioned, I think it would make 
> >>>>>>>>>>>> sense
> >>>>>>>>>>>> in some use cases to have potentially multiple DLQ topics, so we
> >>>>>>>>>>>> should provide an API to let users do it.
> >>>>>>>>>>>> Thinking out-loud here, maybe it is a better approach to create 
> >>>>>>>>>>>> a new
> >>>>>>>>>>>> Record class containing the topic name, e.g. 
> >>>>>>>>>>>> DeadLetterQueueRecord
> >>>>>>>>> and
> >>>>>>>>>>>> changing the signature to
> >>>>>>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord>
> >>>>>>>>>>>> deadLetterQueueRecords) instead of
> >>>>>>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> >>>>>>>>>>>> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord
> >>>>>>>>> would
> >>>>>>>>>>>> be something like "class DeadLetterQueueRecord extends
> >>>>>>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords { String
> >>>>>>>>>>>> topic; /* + getter/setter + */ } "
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2. I think the root question here is: should we have one DLQ 
> >>>>>>>>>>>> topic or
> >>>>>>>>>>>> multiple DLQ topics by default. This question highly depends on 
> >>>>>>>>>>>> the
> >>>>>>>>>>>> context, but implementing a default implementation to handle 
> >>>>>>>>>>>> multiple
> >>>>>>>>>>>> DLQ topics would be opinionated, e.g. how to manage errors in a
> >>>>>>>>>>>> punctuate?
> >>>>>>>>>>>> I think it makes sense to have the default implementation 
> >>>>>>>>>>>> writing all
> >>>>>>>>>>>> faulty records to a single DLQ, that's at least the approach I 
> >>>>>>>>>>>> used
> >>>>>>>>> in
> >>>>>>>>>>>> past applications: one DLQ per Kafka Streams application. Of 
> >>>>>>>>>>>> course
> >>>>>>>>>>>> the message format could change in the DLQ e.g. due to the source
> >>>>>>>>>>>> topic, but those DLQ records will be very likely troubleshooted, 
> >>>>>>>>>>>> and
> >>>>>>>>>>>> maybe replay, manually anyway.
> >>>>>>>>>>>> If a user needs to have multiple DLQ topics or want to enforce a
> >>>>>>>>>>>> specific schema, it's still possible, but they would need to
> >>>>>>>>> implement
> >>>>>>>>>>>> custom Exception Handlers.
> >>>>>>>>>>>> Coming back to 1. I do agree that it would make sense to have the
> >>>>>>>>> user
> >>>>>>>>>>>> set the DLQ topic name in the handlers for more flexibility.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3. Good point, sorry it was a typo, the ProcessingContext makes 
> >>>>>>>>>>>> much
> >>>>>>>>>>>> more sense here indeed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4. I do assume that we could implement KIP-1033 (Processing 
> >>>>>>>>>>>> exception
> >>>>>>>>>>>> handler) independently from KIP-1034. I do hope that KIP-1033 
> >>>>>>>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>>>> adopted and implemented before KIP-1034, but if that's not the 
> >>>>>>>>>>>> case,
> >>>>>>>>>>>> we could implement KIP-1034 indepantly and update KIP-1033 to 
> >>>>>>>>>>>> include
> >>>>>>>>>>>> the DLQ record afterward (in the same KIP or in a new one if not
> >>>>>>>>>>>> possible).
> >>>>>>>>>>>>
> >>>>>>>>>>>> 5. I think we should be clear that this KIP only covers the DLQ
> >>>>>>>>> record
> >>>>>>>>>>>> produced.
> >>>>>>>>>>>> Everything related to replay messages or recovery plan should be
> >>>>>>>>>>>> considered out-of-scope as it is use-case and error specific.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Let me know if that's not clear, there are definitely points that
> >>>>>>>>>>>> highly debatable.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Damien
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford 
> >>>>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Oh, and one more thing:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 5.
> >>>>>>>>>>>>> Whenever you take a record out of the stream, and then 
> >>>>>>>>>>>>> potentially
> >>>>>>>>>>>>> re-introduce it at a later date, you introduce the potential for
> >>>>>>>>> record
> >>>>>>>>>>>>> ordering issues. For example, that record could have been 
> >>>>>>>>>>>>> destined
> >>>>>>>>> for a
> >>>>>>>>>>>>> Window that has been closed by the time it's re-processed. I'd
> >>>>>>>>> like to
> >>>>>>>>>>>> see
> >>>>>>>>>>>>> a section that considers these consequences, and perhaps make
> >>>>>>>>> those risks
> >>>>>>>>>>>>> clear to users. For the record, this is exactly what sunk 
> >>>>>>>>>>>>> KIP-990,
> >>>>>>>>> which
> >>>>>>>>>>>>> was an alternative approach to error handling that introduced 
> >>>>>>>>>>>>> the
> >>>>>>>>> same
> >>>>>>>>>>>>> issues.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford 
> >>>>>>>>>>>>> <nick.telf...@gmail.com
> >>>>>>> <mailto:nick.telf...@gmail.com%0b>> > >
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Damien,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the KIP! Dead-letter queues are something that I
> >>>>>>>>> think a
> >>>>>>>>>>>> lot of
> >>>>>>>>>>>>>> users would like.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think there are a few points with this KIP that concern me:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>> It looks like you can only define a single, global DLQ for the
> >>>>>>>>> entire
> >>>>>>>>>>>>>> Kafka Streams application? What about applications that would
> >>>>>>>>> like to
> >>>>>>>>>>>>>> define different DLQs for different data flows? This is
> >>>>>>>>> especially
> >>>>>>>>>>>>>> important when dealing with multiple source topics that have
> >>>>>>>>> different
> >>>>>>>>>>>>>> record schemas.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>> Your DLQ payload value can either be the record value that
> >>>>>>>>> failed, or
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>> error string (such as "error during punctuate"). This is likely
> >>>>>>>>> to
> >>>>>>>>>>>> cause
> >>>>>>>>>>>>>> problems when users try to process the records from the DLQ, as
> >>>>>>>>> they
> >>>>>>>>>>>> can't
> >>>>>>>>>>>>>> guarantee the format of every record value will be the same.
> >>>>>>>>> This is
> >>>>>>>>>>>> very
> >>>>>>>>>>>>>> loosely related to point 1. above.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>> You provide a ProcessorContext to both exception handlers, but
> >>>>>>>>> state
> >>>>>>>>>>>> they
> >>>>>>>>>>>>>> cannot be used to forward records. In that case, I believe you
> >>>>>>>>> should
> >>>>>>>>>>>> use
> >>>>>>>>>>>>>> ProcessingContext instead, which statically guarantees that it
> >>>>>>>>> can't be
> >>>>>>>>>>>>>> used to forward records.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4.
> >>>>>>>>>>>>>> You mention the KIP-1033 ProcessingExceptionHandler, but what's
> >>>>>>>>> the
> >>>>>>>>>>>> plan
> >>>>>>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <
> >>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In a general way, if the user does not configure the right 
> >>>>>>>>>>>>>>> ACL,
> >>>>>>>>> that
> >>>>>>>>>>>>>>> would be a security issue, but that's true for any topic.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This KIP allows users to configure a Dead Letter Queue without
> >>>>>>>>> writing
> >>>>>>>>>>>>>>> custom Java code in Kafka Streams, not at the topic level.
> >>>>>>>>>>>>>>> A lot of applications are already implementing this pattern,
> >>>>>>>>> but the
> >>>>>>>>>>>>>>> required code to do it is quite painful and error prone, for
> >>>>>>>>> example
> >>>>>>>>>>>>>>> most apps I have seen created a new KafkaProducer to send
> >>>>>>>>> records to
> >>>>>>>>>>>>>>> their DLQ.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As it would be disabled by default for backward compatibility,
> >>>>>>>>> I doubt
> >>>>>>>>>>>>>>> it would generate any security concern.
> >>>>>>>>>>>>>>> If a user explicitly configures a Deal Letter Queue, it would
> >>>>>>>>> be up to
> >>>>>>>>>>>>>>> him to configure the relevant ACLs to ensure that the right
> >>>>>>>>> principal
> >>>>>>>>>>>>>>> can access it.
> >>>>>>>>>>>>>>> It is already the case for all internal, input and output 
> >>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>> Streams topics (e.g. repartition, changelog topics) that also
> >>>>>>>>> could
> >>>>>>>>>>>>>>> contain confidential data, so I do not think we should
> >>>>>>>>> implement a
> >>>>>>>>>>>>>>> different behavior for this one.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In this KIP, we configured the default DLQ record to have the
> >>>>>>>>> initial
> >>>>>>>>>>>>>>> record key/value as we assume that it is the expected and 
> >>>>>>>>>>>>>>> wanted
> >>>>>>>>>>>>>>> behavior for most applications.
> >>>>>>>>>>>>>>> If a user does not want to have the key/value in the DLQ 
> >>>>>>>>>>>>>>> record
> >>>>>>>>> for
> >>>>>>>>>>>>>>> any reason, they could still implement exception handlers to
> >>>>>>>>> build
> >>>>>>>>>>>>>>> their own DLQ record.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regarding ACL, maybe something smarter could be done in Kafka
> >>>>>>>>> Streams,
> >>>>>>>>>>>>>>> but this is out of scope for this KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren 
> >>>>>>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> My concern is that someone would create a dead letter queue
> >>>>>>>>> on a
> >>>>>>>>>>>>>>> sensitive
> >>>>>>>>>>>>>>>> topic and not get the ACL correct from the start. Thus
> >>>>>>>>> causing
> >>>>>>>>>>>>>>> potential
> >>>>>>>>>>>>>>>> confidential data leak. Is there anything in the proposal
> >>>>>>>>> that
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> prevent that from happening? If so I did not recognize it as
> >>>>>>>>> such.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina <
> >>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Claude,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> In this KIP, the Dead Letter Queue is materialized by a
> >>>>>>>>> standard
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> independant topic, thus normal ACL applies to it like any
> >>>>>>>>> other
> >>>>>>>>>>>> topic.
> >>>>>>>>>>>>>>>>> This should not introduce any security issues, obviously,
> >>>>>>>>> the
> >>>>>>>>>>>> right
> >>>>>>>>>>>>>>>>> ACL would need to be provided to write to the DLQ if
> >>>>>>>>> configured.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Damien
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> >>>>>>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>>
> >>>>>>>>>>>>>>>>>  wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I am new to the Kafka codebase so please excuse any
> >>>>>>>>> ignorance
> >>>>>>>>>>>> on my
> >>>>>>>>>>>>>>> part.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> When a dead letter queue is established is there a
> >>>>>>>>> process to
> >>>>>>>>>>>>>>> ensure that
> >>>>>>>>>>>>>>>>>> it at least is defined with the same ACL as the original
> >>>>>>>>> queue?
> >>>>>>>>>>>>>>> Without
> >>>>>>>>>>>>>>>>>> such a guarantee at the start it seems that managing dead
> >>>>>>>>> letter
> >>>>>>>>>>>>>>> queues
> >>>>>>>>>>>>>>>>>> will be fraught with security issues.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
> >>>>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> To continue on our effort to improve Kafka Streams error
> >>>>>>>>>>>>>>> handling, we
> >>>>>>>>>>>>>>>>>>> propose a new KIP to add out of the box support for Dead
> >>>>>>>>>>>> Letter
> >>>>>>>>>>>>>>> Queue.
> >>>>>>>>>>>>>>>>>>> The goal of this KIP is to provide a default
> >>>>>>>>> implementation
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> should be suitable for most applications and allow
> >>>>>>>>> users to
> >>>>>>>>>>>>>>> override
> >>>>>>>>>>>>>>>>>>> it if they have specific requirements.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> In order to build a suitable payload, some additional
> >>>>>>>>> changes
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> included in this KIP:
> >>>>>>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, when
> >>>>>>>>> available, the
> >>>>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>> node raw key/value byte[]
> >>>>>>>>>>>>>>>>>>> 2. expose the ProcessingContext to the
> >>>>>>>>>>>>>>> ProductionExceptionHandler,
> >>>>>>>>>>>>>>>>>>> it is currently not available in the handle parameters.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regarding point 2., to expose the ProcessingContext to
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> ProductionExceptionHandler, we considered two choices:
> >>>>>>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a parameter in
> >>>>>>>>> the
> >>>>>>>>>>>> handle()
> >>>>>>>>>>>>>>>>>>> method. That's the cleanest way IMHO, but we would need
> >>>>>>>>> to
> >>>>>>>>>>>>>>> deprecate
> >>>>>>>>>>>>>>>>>>> the old method.
> >>>>>>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an attribute in
> >>>>>>>>> the
> >>>>>>>>>>>>>>> interface.
> >>>>>>>>>>>>>>>>>>> This way, no method is deprecated, but we would not be
> >>>>>>>>>>>> consistent
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> the other ExceptionHandler.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new handle
> >>>>>>>>> signature
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>>> one deprecated), but we could use other opinions on
> >>>>>>>>> this part.
> >>>>>>>>>>>>>>>>>>> More information is available directly on the KIP.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> KIP link:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Feedbacks and suggestions are welcome,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>> Damien, Sebastien and Loic
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> LinkedIn: 
> >>>>>>>>>>>>>>>> http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>

Reply via email to