Hi everyone,

We just updated KIP-1034, we changed the following:
  - We included the ProcessingExceptionHandler (KIP-1033) directly in the KIP;
  - We provided examples to clarify the new configuration, and how it
could be leveraged.

I think we can resume the conversation on this KIP.

Cheers,
Damien Sebastien and Loic


On Tue, 27 Aug 2024 at 15:37, Sebastien Viale
<sebastien.vi...@michelin.com> wrote:
>
>
> Hi Bruno,
>
> We have planned a meeting for next friday to discuss it with Loic and Damien.
>
> We will be able to restart discussions about it soon.
>
> regards
>
> ________________________________
> De : Bruno Cadonna <cado...@apache.org>
> Envoyé : lundi 26 août 2024 11:32
> À : dev@kafka.apache.org <dev@kafka.apache.org>
> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
>
> Warning External sender Do not click on any links or open any attachments 
> unless you trust the sender and know the content is safe.
>
> Hi Loïc, Sebastien, and Damien,
>
> Now that KIP-1033 is going to be released in 3.9, what is the plan to
> progress with this KIP?
>
> Is the KIP up-to-date, so that we can restart discussion?
>
> Best,
> Bruno
>
> This email was screened for spam and malicious content but exercise caution 
> anyway.
>
>
>
>
> On 6/13/24 6:16 PM, Damien Gasparina wrote:
> > Hi Bruno,
> >
> > We focused our effort (well, mostly Seb and Loic :)) on KIP-1033,
> > that's why not much progress has been made on this one yet.
> > Regarding your points:
> >
> > B1: It is up to the user to specify the DLQ topic name and to
> > implement a potential differentiation. I tend to think that having one
> > DLQ per application ID is the wisest, but I encountered cases and
> > applications that preferred having a shared DLQ topic between multiple
> > applications, e.g. to reduce the number of partitions, or to ease
> > monitoring
> >
> > B2 : Goot catch, it should be
> > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT"
> > prefix during the discussion, looks like I forgot to update all
> > occurrences in the KIP.
> >
> > B3 :The trigger for sending to the DLQ would be if
> > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user
> > implemented a custom exception handler that returns DLQ records.
> > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the
> > behavior of the default handler, thus custom exception handlers will
> > completely ignore this parameter.
> >
> > I think it's a good trade-off between providing a production-ready
> > default implementation, yet providing sufficient flexibility for
> > complex use-cases.
> > This behavior definitely needs to be documented, but I guess it's safe
> > to push the responsibility of the DLQ records to the user if they
> > implement custom handlers.
> >
> > Cheers,
> > Damien
> >
> >
> > On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna <cado...@apache.org> wrote:
> >>
> >> Hi,
> >>
> >> since there was not too much activity in this thread recently, I was
> >> wondering what the status of this discussion is.
> >>
> >> I cannot find the examples in the KIP Sébastien mentioned in the last
> >> message to this thread. I can also not find the corresponding definition
> >> of the following method call in the KIP:
> >>
> >> FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> >>
> >> I have also some comments:
> >>
> >> B1
> >> Did you consider to prefix the dead letter queue topic names with the
> >> application ID to distinguish the topics between Streams apps? Or is the
> >> user responsible for the differentiation? If the user is responsible, we
> >> risk that faulty records of different Streams apps end up in the same
> >> dead letter queue.
> >>
> >> B2
> >> Is the name of the dead letter queue topic config
> >> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or
> >> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used.
> >>
> >> B3
> >> What is exactly the trigger to send a record to the dead letter queue?
> >> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a
> >> record to the return value of the exception handler?
> >> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do
> >> not add a record to the return value of the handler? What happens if I
> >> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to
> >> the return value of the handler?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 4/22/24 10:19 PM, Sebastien Viale wrote:
> >>> Hi,
> >>>
> >>> Thanks for your remarks
> >>>
> >>> L1. I would say "who can do the most can do the least", even though most 
> >>> people will fail and stop, we found it interesting to offer the 
> >>> possibility to fail-and-send-to-DLQ
> >>>
> >>> L2: We did not consider extending the TimestampExtractor because we 
> >>> estimate it out of scope for this KIP. Perhaps it will be possible to 
> >>> include it in an ExceptionHandler later.
> >>>
> >>> L3: we will include an example in the KIP, but as we mentioned earlier, 
> >>> the DLQ topic can be different in each custom Exception Handler:
> >>>
> >>> When providing custom handlers, users would have the possibility to 
> >>> return:
> >>> * FAIL
> >>> * CONTINUE
> >>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> >>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>
> >>> cheers !
> >>> Sébastien
> >>>
> >>>
> >>> ________________________________
> >>> De : Lucas Brutschy <lbruts...@confluent.io.INVALID>
> >>> Envoyé : lundi 22 avril 2024 14:36
> >>> À : dev@kafka.apache.org <dev@kafka.apache.org>
> >>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
> >>>
> >>> Warning External sender Do not click on any links or open any attachments 
> >>> unless you trust the sender and know the content is safe.
> >>>
> >>> Hi!
> >>>
> >>> Thanks for the KIP, great stuff.
> >>>
> >>> L1. I was a bit confused that the default configuration (once you set
> >>> a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
> >>> correctly. Is this something that will be a common use-case, and is it
> >>> a configuration that we want to encourage? It expected that you either
> >>> want to fail or skip-and-send-to-DLQ.
> >>>
> >>> L2. Have you considered extending the `TimestampExtractor` interface
> >>> so that it can also produce to DLQ? AFAIK it's not covered by any of
> >>> the existing exception handlers, but it can cause similar failures
> >>> (potentially custom logic, depends on validity input record). There
> >>> could also be a default implementation as a subclass of
> >>> `ExtractRecordMetadataTimestamp`.
> >>>
> >>> L3. It would be nice to include an example of how to produce to
> >>> multiple topics in the KIP, as I can imagine that this will be a
> >>> common use-case. I wasn't sure how much code would be involved to make
> >>> it work. If a lot of code is required, we may want to consider
> >>> exposing some utils that make it easier.
> >>>
> >>> Cheers,
> >>> Lucas
> >>>
> >>> This email was screened for spam and malicious content but exercise 
> >>> caution anyway.
> >>>
> >>>
> >>>
> >>> On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina <d.gaspar...@gmail.com> 
> >>> wrote:
> >>>>
> >>>> Hi everyone,
> >>>>
> >>>> Following all the discussion on this KIP and KIP-1033, we introduced a
> >>>> new container class containing only processing context metadata:
> >>>> ProcessingMetadata. This new container class is actually part of
> >>>> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> >>>> think it's the wisest implementation wise.
> >>>>
> >>>> I also clarified the interface of the enums:
> >>>> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[],
> >>>> byte[]>> deadLetterQueueRecords) . Very likely most users would just
> >>>> send one DLQ record, but there might be specific use-cases and what
> >>>> can do more can do less, so I added an Iterable.
> >>>>
> >>>> I took some time to think about the impact of storing the
> >>>> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> >>>> the topic/offset/partition should be fine, but I am concerned about
> >>>> storing the rawSourceKey/Value. I think it could impact some specific
> >>>> use-cases, for example, a high-throughput Kafka Streams application
> >>>> "counting" messages could have huge source input messages, and very
> >>>> small sink messages, here, I assume storing the rawSourceKey/Value
> >>>> could significantly require more memory than the actual Kafka Producer
> >>>> buffer.
> >>>>
> >>>> I think the safest approach is actually to only store the fixed-size
> >>>> metadata for the ProductionExceptionHandler.handle:
> >>>> topic/partition/offset/processorNodeId/taskId, it might be confusing
> >>>> for the user, but 1) it is still better than nowaday where there are
> >>>> no context information at all, 2) it would be clearly stated in the
> >>>> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> >>>> punctuate case). .
> >>>>
> >>>> Do you think it would be a suitable design Sophie?
> >>>>
> >>>> Cheers,
> >>>> Damien
> >>>>
> >>>> On Sun, 14 Apr 2024 at 21:30, Loic Greffier <loic.greff...@michelin.com> 
> >>>> wrote:
> >>>>>
> >>>>> Hi Sophie,
> >>>>>
> >>>>> Thanks for your feedback.
> >>>>> Completing the Damien's comments here for points S1 and S5B.
> >>>>>
> >>>>> S1:
> >>>>>> I'm confused -- are you saying that we're introducing a new kind of 
> >>>>>> ProducerRecord class for this?
> >>>>>
> >>>>> I am wondering if it makes sense to alter the ProducerRecord from 
> >>>>> Clients API with a "deadLetterQueueTopicName" attribute dedicated to 
> >>>>> Kafka Streams DLQ.
> >>>>> Adding "deadLetterQueueTopicName" as an additional parameter to 
> >>>>> "withDeadLetterQueueRecord" is a good option, and may allow users to 
> >>>>> send records to different DLQ topics depending on conditions:
> >>>>> @Override
> >>>>> public ProductionExceptionHandlerResponse handle(final 
> >>>>> ProcessingContext context,
> >>>>> ProducerRecord<byte[], byte[]> record,
> >>>>> Exception exception) {
> >>>>> if (condition1) {
> >>>>> return ProductionExceptionHandlerResponse.CONTINUE
> >>>>> .withDeadLetterQueueRecord(record, "dlq-topic-a");
> >>>>> }
> >>>>> if (condition2) {
> >>>>> return ProductionExceptionHandlerResponse.CONTINUE
> >>>>> .withDeadLetterQueueRecord(record, "dlq-topic-b");
> >>>>> }
> >>>>> return ProductionExceptionHandlerResponse.CONTINUE
> >>>>> .withDeadLetterQueueRecord(record, "dlq-topic-c");
> >>>>> }
> >>>>>
> >>>>> S5B:
> >>>>>> I was having a bit of trouble understanding what the behavior would be 
> >>>>>> if someone configured a "errors.deadletterqueue.topic.name" but didn't 
> >>>>>> implement the handlers.
> >>>>>
> >>>>> The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler 
> >>>>> and DefaultProductionExceptionHandler should be able to tell if records 
> >>>>> should be sent to DLQ or not.
> >>>>> The "errors.deadletterqueue.topic.name" takes place to:
> >>>>>
> >>>>> * Specifying if the provided handlers should or should not send records 
> >>>>> to DLQ.
> >>>>> * If the value is empty, the handlers should not send records to DLQ.
> >>>>> * If the value is not empty, the handlers should send records to DLQ.
> >>>>> * Define the name of the DLQ topic that should be used by the provided 
> >>>>> handlers.
> >>>>>
> >>>>> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided 
> >>>>> handlers should return either:
> >>>>>
> >>>>> * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue)
> >>>>> * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue).
> >>>>> If "errors.deadletterqueue.topic.name" is defined but neither 
> >>>>> DeserializationExceptionHandler nor ProductionExceptionHandler classes 
> >>>>> are defined in the configuration, then nothing should happen as sending 
> >>>>> to DLQ is based on handlers’ response.
> >>>>> When providing custom handlers, users would have the possibility to 
> >>>>> return:
> >>>>>
> >>>>> * FAIL
> >>>>> * CONTINUE
> >>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>>>
> >>>>> A DLQ topic name is currently required using the two last response 
> >>>>> types.
> >>>>> I am wondering if it could benefit users to ease the use of the default 
> >>>>> DLQ topic "errors.deadletterqueue.topic.name" when implementing custom 
> >>>>> handlers, with such kind of implementation:
> >>>>>
> >>>>> * FAIL.withDefaultDeadLetterQueueRecord(record)
> >>>>> * CONTINUE.withDefaultDeadLetterQueueRecord(record)
> >>>>>
> >>>>> Regards,
> >>>>> Loïc
> >>>>>
> >>>>> De : Damien Gasparina <d.gaspar...@gmail.com>
> >>>>> Envoyé : dimanche 14 avril 2024 20:24
> >>>>> À : dev@kafka.apache.org
> >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
> >>>>>
> >>>>> Warning External sender Do not click on any links or open any 
> >>>>> attachments unless you trust the sender and know the content is safe.
> >>>>>
> >>>>> Hi Sophie,
> >>>>>
> >>>>> Thanks a lot for your feedback and your detailed comments.
> >>>>>
> >>>>> S1.
> >>>>>> I'm confused -- are you saying that we're introducing a new kind of
> >>>>> ProducerRecord class for this?
> >>>>>
> >>>>> Sorry for the poor wording, that's not what I meant. While writing the
> >>>>> KIP, I was hesitating between 1. leveraging the Kafka Producer
> >>>>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in
> >>>>> a separate parameter, 3. a new custom interface (e.g.
> >>>>> DeadLetterQueueRecord).
> >>>>> As the KafkaProducer ProducerRecord is not used in the Kafka Streams
> >>>>> API (except ProductionExceptionHandler) and I would like to avoid a
> >>>>> new interface if not strictly required, I leaned toward option 2.
> >>>>> Thinking about it, maybe option 1. would be best, but I assume it
> >>>>> could create confusion with KafkaStreams ProducerRecord. Let me sleep
> >>>>> on it.
> >>>>>
> >>>>> S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and
> >>>>> your point in S4, it seems more and more likely that we will create a
> >>>>> new container class containing only the metadata for the exception
> >>>>> handlers. To be consistent, I think we should use this new
> >>>>> implementation in all exception handlers.
> >>>>> The only issue I could think off is that the new interface would
> >>>>> expose less data than the current ProcessorContext in the
> >>>>> DeserializationException(e.g. stateDir(), metrics(), getStateStore()),
> >>>>> thus it could be hard for some users to migrate to the new interface.
> >>>>> I do expect that only a few users would be impacted as the javadoc is
> >>>>> very clear: `Note, that the passed in {@link ProcessorContext} only
> >>>>> allows access to metadata like the task ID.`
> >>>>>
> >>>>> S3. I completely agree with you, it is something that might not be
> >>>>> trivial and should be thoroughly covered by unit tests during the
> >>>>> implementation.
> >>>>>
> >>>>> S4. Good point, I did not notice that the ProductionExceptionHandler
> >>>>> is also invoked in the producer.send() callback.
> >>>>> Capturing the ProcessingContext for each in-flight message is probably
> >>>>> not possible. I think there is no other way to write a custom
> >>>>> container class holding only the metadata that are essentials, I am
> >>>>> thinking of storing the following attributes: source topic, partition,
> >>>>> offset, rawKey, rawValue and taskId.
> >>>>> Those metadata should be relatively small, but I assume that there
> >>>>> could be a high number of in-flight messages, especially with at least
> >>>>> once processing guarantee. Do you think it would be fine memory wise?
> >>>>>
> >>>>> S5. As many exceptions are only accessible in exception handlers, and
> >>>>> we wanted to 1) allow users to customize the DLQ records and 2) have a
> >>>>> suitable DLQ out of the box implementation, we felt it natural to rely
> >>>>> on exception handlers, that's also why we created KIP-1033.
> >>>>> Piggybacking on the enum response was the cleanest way we could think
> >>>>> off, but we are completely open to suggestions.
> >>>>>
> >>>>> S5a. Completely agree with you on this point, for this DLQ approach to
> >>>>> be complete, the ProcessingExceptionHandler introduced in KIP-1033 is
> >>>>> required. KIP-1033 is definitely our first priority. We decided to
> >>>>> kick-off the KIP-1034 discussion as we expected the discussions to be
> >>>>> dynamic and could potentially impact some choices of KIP-1033.
> >>>>>
> >>>>> S5b. In this KIP, we wanted to 1. provide as much flexibility to the
> >>>>> user as possible; 2. provide a good default implementation
> >>>>> for the DLQ without having to write custom exception handlers.
> >>>>> For the default implementation, we introduced a new configuration:
> >>>>> errors.deadletterqueue.topic.name.
> >>>>>
> >>>>> If this configuration is set, it changes the behavior of the provided
> >>>>> exception handlers to return a DLQ record containing the raw key/value
> >>>>> + headers + exception metadata in headers.
> >>>>> If the out of the box implementation is not suitable for a user, e.g.
> >>>>> the payload needs to be masked in the DLQ, it could implement their
> >>>>> own exception handlers. The errors.deadletterqueue.topic.name would
> >>>>> only impact Kafka Streams bundled exception handlers (e.g.
> >>>>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler)
> >>>>>
> >>>>> Let me update the KIP to make it clear and also provide examples.
> >>>>>
> >>>>> S6/S7. Good point, mea culpa for the camel case, it must have been a
> >>>>> sugar rush :)
> >>>>>
> >>>>> Thanks again for your detailed comments and pointing out S4
> >>>>> (production exception & Processing Context)!
> >>>>>
> >>>>> Cheers,
> >>>>> Damien
> >>>>> This email was screened for spam and malicious content but exercise 
> >>>>> caution anyway.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman 
> >>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote:
> >>>>>>
> >>>>>> Thanks for the KIP, this will make a lot of people very happy.
> >>>>>>
> >>>>>> Wanted to chime in on a few points that have been raised so far and add
> >>>>>> some of my own (numbering with an S to distinguish my points from the
> >>>>>> previous ones)
> >>>>>>
> >>>>>> S1.
> >>>>>>
> >>>>>>> 1.a I really meant ProducerRecord, that's the class used to forward to
> >>>>>>> downstream processors in the PAPI. The only information missing in
> >>>>>>> this class is the topic name. I also considered relying on the Kafka
> >>>>>>> Producer ProducerRecord, but I assume it would not be consistent with
> >>>>>>> the KafkaStreams API.
> >>>>>>
> >>>>>> I'm confused -- are you saying that we're introducing a new kind of
> >>>>>> ProducerRecord class for this? Why not just use the existing one, ie 
> >>>>>> the
> >>>>>> o.a.k.clients.producer.ProducerRecord class? This is what the
> >>>>>> ProductionExceptionHandler uses, so it's definitely "consistent". In 
> >>>>>> other
> >>>>>> words, we can remove the "String deadLetterQueueTopicName"
> >>>>>>
> >>>>>> S2.
> >>>>>> I think this would be a good opportunity to also deprecate the existing
> >>>>>> #handle method of the DeserializationExceptionHandler, and replace it 
> >>>>>> with
> >>>>>> one that uses a ProcessingContext instead of the ProcessorContext. 
> >>>>>> Partly
> >>>>>> for the same reasons about guarding access to the #forward methods, 
> >>>>>> partly
> >>>>>> because this method needs to be migrated to the new PAPI interface
> >>>>>> anyways, and ProcessingContext is part of the new one.
> >>>>>>
> >>>>>> S3.
> >>>>>> Regarding 2a. -- I'm inclined to agree that records which a Punctuator
> >>>>>> failed to produce should also be sent to the DLQ via the
> >>>>>> ProductionExceptionHandler. Users will just need to be careful about
> >>>>>> accessing certain fields of the ProcessingContext that aren't 
> >>>>>> available in
> >>>>>> the punctuator, and need to check the Optional returned by the
> >>>>>> ProcessingContext#recordMetadata API.
> >>>>>> Also, from an implementation standpoint, it will be really hard to
> >>>>>> distinguish between a record created by a punctuator vs a processor 
> >>>>>> from
> >>>>>> within the RecordCollector, which is the class that actually handles
> >>>>>> sending records to the Streams Producer and invoking the
> >>>>>> ProductionExceptionHandler. This is because the RecordCollector is at 
> >>>>>> the
> >>>>>> "end" of the topology graph and doesn't have any context about which 
> >>>>>> of the
> >>>>>> upstream processors actually attempted to forward a record.
> >>>>>>
> >>>>>> This in itself is at least theoretically solvable, but it leads into my
> >>>>>> first major new point:
> >>>>>>
> >>>>>> S4:
> >>>>>> I'm deeply worried about passing the ProcessingContext in as a means of
> >>>>>> forwarding metadata. The problem is that the processing/processor 
> >>>>>> context
> >>>>>> is a mutable class and is inherently meaningless outside the context 
> >>>>>> of a
> >>>>>> specific task. And when I said earlier that the RecordCollector sits at
> >>>>>> the "end" of the topology, I meant that it's literally outside the 
> >>>>>> task's
> >>>>>> subtopology and is used/shared by all tasks on that StreamThread. So to
> >>>>>> begin with, there's no guarantee what will actually be returned for
> >>>>>> essential methods such as the new #rawSourceKey/Value or the existing
> >>>>>> #recordMetadata
> >>>>>>
> >>>>>> For serialization exceptions it'll probably be correct, but for general
> >>>>>> send errors it almost definitely won't be. In short, this is because we
> >>>>>> send records to the producer after the sink node, but don't check for 
> >>>>>> send
> >>>>>> errors right away since obviously it takes some time for the producer 
> >>>>>> to
> >>>>>> actually send. In other words, sending/producing records is actually 
> >>>>>> done
> >>>>>> asynchronously with processing, and we simply check for errors on any
> >>>>>> previously-sent records
> >>>>>> during the send on a new record in a sink node. This means the context 
> >>>>>> we
> >>>>>> would be passing in to a (non-serialization) exception would pretty 
> >>>>>> much
> >>>>>> always correspond not the the record that experienced the error, but 
> >>>>>> the
> >>>>>> random record that happened to be being sent when we checked and saw 
> >>>>>> the
> >>>>>> error for the failed record.
> >>>>>>
> >>>>>> This discrepancy, in addition to the whole "sourceRawKey/Value and
> >>>>>> recordMetadata are null for punctuators" issue, seems like an
> >>>>>> insurmountable inconsistency that is more likely to cause users 
> >>>>>> confusion
> >>>>>> or problems than be helpful.
> >>>>>> We could create a new metadata object and copy over the relevant info 
> >>>>>> from
> >>>>>> the ProcessingContext, but I worry that has the potential to explode 
> >>>>>> memory
> >>>>>> since we'd need to hold on to it for all in-flight records up until 
> >>>>>> they
> >>>>>> are either successfully sent or failed and passed in to the
> >>>>>> ProductionExceptionHandler. But if the metadata is relatively small, 
> >>>>>> it's
> >>>>>> probably fine. Especially if it's just the raw source key/value. Are
> >>>>>> there any other parts of the ProcessingContext you think should be made
> >>>>>> available?
> >>>>>>
> >>>>>> Note that this only applies to the ProductionExceptionHandler, as the
> >>>>>> DeserializationExceptionHandler (and the newly proposed
> >>>>>> ProcessingExceptionHandler) would both be invoked immediately and 
> >>>>>> therefore
> >>>>>> with the failed record's context. However, I'm also a bit uncomfortable
> >>>>>> with adding the rawSourceKey/rawSourceValue to the ProcessingContext. 
> >>>>>> So
> >>>>>> I'd propose to just wrap those (and any other metadata you might want) 
> >>>>>> in a
> >>>>>> container class and pass that in instead of the ProcessingContext, to 
> >>>>>> all
> >>>>>> of the exception handlers.
> >>>>>>
> >>>>>> S5:
> >>>>>> For some reason I'm finding the proposed API a little bit awkward, 
> >>>>>> although
> >>>>>> it's entirely possible that the problem is with me, not the proposal :)
> >>>>>> Specifically I'm struggling with the approach of piggybacking on the
> >>>>>> exception handlers and their response enums to dictate how records are
> >>>>>> forwarded to the DLQ. I think this comes down to two things, though 
> >>>>>> again,
> >>>>>> these aren't necessarily problems with the API and probably just need 
> >>>>>> to be
> >>>>>> hashed out:
> >>>>>>
> >>>>>> S5a.
> >>>>>> When I envision a DLQ, to me, the most common use case would be to 
> >>>>>> forward
> >>>>>> input records that failed somewhere along the processing graph. But it
> >>>>>> seems like all the focus here is on the two far ends of the 
> >>>>>> subtopology --
> >>>>>> the input/consumer, and the output/producer. I get that
> >>>>>> the ProcessingExceptionHandler is really the missing piece here, and 
> >>>>>> it's
> >>>>>> hard to say anything specific since it's not yet accepted, but maybe a
> >>>>>> somewhat more concrete example would help. FWIW I think/hope to get 
> >>>>>> that
> >>>>>> KIP accepted and implementation ASAP, so I'm not worried about the 
> >>>>>> "what if
> >>>>>> it doesn't happen" case -- more just want to know what it will look 
> >>>>>> like
> >>>>>> when it does. Imo it's fine to build KIPs on top of future ones, it 
> >>>>>> feels
> >>>>>> clear that this part will just have to wait for that KIP to actually be
> >>>>>> added.
> >>>>>>
> >>>>>> S5b:
> >>>>>> Why do users have to define the entire ProducerRecord -- shouldn't 
> >>>>>> Streams
> >>>>>> handle all this for them? Or will we just automatically send every 
> >>>>>> record
> >>>>>> on failure to the default global DLQ, and users only have to implement 
> >>>>>> the
> >>>>>> handlers if they want to change the headers or send to a different 
> >>>>>> topic? I
> >>>>>> was having a bit of trouble understanding what the behavior would be if
> >>>>>> someone configured a "errors.deadletterqueue.topic.name" but didn't
> >>>>>> implement the handlers. Apologies if it's somewhere in the KIP and I
> >>>>>> happened to miss it!
> >>>>>>
> >>>>>> Either way, I really think an example would help me to better imagine 
> >>>>>> what
> >>>>>> this will look like in practice, and evaluate whether it actually 
> >>>>>> involves
> >>>>>> as much overhead as I'm worried it will. Can you add a section that
> >>>>>> includes a basic implementation of all the features here? Nothing too
> >>>>>> complicated, just the most bare-bones code needed to actually implement
> >>>>>> forwarding to a dead-letter-queue via the handlers.
> >>>>>>
> >>>>>> Lastly, two super small things:
> >>>>>>
> >>>>>> S6:
> >>>>>> We use camel case in Streams, so it should be rawSourceKey/Value rather
> >>>>>> than raw_source_key/value
> >>>>>>
> >>>>>> S7:
> >>>>>> Can you add javadocs for the #withDeadLetterQueueRecord? For example, 
> >>>>>> it
> >>>>>> seems to me that if the topic to be sent to here is different than the
> >>>>>> default/global DLQ, then the user will need to make sure to have 
> >>>>>> created
> >>>>>> this themselves up front.
> >>>>>>
> >>>>>> That's it from me...sorry for the long response, it's just because I'm
> >>>>>> excited for this feature and have been waiting on a KIP for this for 
> >>>>>> years.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Sophie
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina 
> >>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Andrew,
> >>>>>>>
> >>>>>>> Thanks a lot for your review, plenty of good points!
> >>>>>>>
> >>>>>>> 11. Typo fixed, good cach.
> >>>>>>>
> >>>>>>> 12. I do agree with you and Nick also mentioned it, I updated the KIP
> >>>>>>> to mention that context headers should be forwarded.
> >>>>>>>
> >>>>>>> 13. Good catch, to be consistent with KIP-298, and without a strong
> >>>>>>> opinion from my side, I updated the KIP with your prefix proposal.
> >>>>>>>
> >>>>>>> 14. I am not sure about this point, a big difference between KIP-298
> >>>>>>> and this one is that the handlers can easily be overridden, something
> >>>>>>> that is not doable in Kafka Connect.
> >>>>>>> If someone would like a different behavior, e.g. to mask the payload
> >>>>>>> or include further headers, I think we should encourage them to write
> >>>>>>> their own exception handlers to build the DLQ Record the way they
> >>>>>>> expect.
> >>>>>>>
> >>>>>>> 15. Yeah, that's a good point, I was not fully convinced about putting
> >>>>>>> a String in it, I do assume that "null" is also a valid value. I do
> >>>>>>> assume that the Stacktrace and the Exception in this case are the key
> >>>>>>> metadata for the user to troubleshoot the problem.
> >>>>>>> I updated the KIP to mention that the value should be null if
> >>>>>>> triggered in a punctuate.
> >>>>>>>
> >>>>>>> 16. I added a session to mention that Kafka Streams would not try to
> >>>>>>> automatically create the topic and the topic should either be
> >>>>>>> automatically created, or pre-created.
> >>>>>>>
> >>>>>>> 17. If a DLQ record can not be sent, the exception should go to the
> >>>>>>> uncaughtExceptionHandler. Let me clearly state it in the KIP.
> >>>>>>>
> >>>>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina 
> >>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi Nick,
> >>>>>>>>
> >>>>>>>> 1. Good point, that's less impactful than a custom interface, I just
> >>>>>>>> updated the KIP with the new signature.
> >>>>>>>>
> >>>>>>>> 1.a I really meant ProducerRecord, that's the class used to forward 
> >>>>>>>> to
> >>>>>>>> downstream processors in the PAPI. The only information missing in
> >>>>>>>> this class is the topic name. I also considered relying on the Kafka
> >>>>>>>> Producer ProducerRecord, but I assume it would not be consistent with
> >>>>>>>> the KafkaStreams API.
> >>>>>>>>
> >>>>>>>> 2. Agreed
> >>>>>>>>
> >>>>>>>> 2.a I do think exceptions occurring during punctuate should be
> >>>>>>>> included in the DLQ.
> >>>>>>>> Even if building a suitable payload is almost impossible, even with
> >>>>>>>> custom code; those exceptions are still fatal for Kafka Streams by
> >>>>>>>> default and are something that can not be ignored safely.
> >>>>>>>> I do assume that most users would want to be informed if an error
> >>>>>>>> happened during a punctuate, even if only the metadata (e.g.
> >>>>>>>> stacktrace, exception) is provided.
> >>>>>>>> I am only concerned flooding the DLQ topic as, if a scheduled
> >>>>>>>> operation failed, very likely it will fails during the next
> >>>>>>>> invocation, but
> >>>>>>>>
> >>>>>>>> 4. Good point, I clarified the wording in the KIP to make it 
> >>>>>>>> explicit.
> >>>>>>>>
> >>>>>>>> 5. Good point, I will clearly mention that it is out of scope as part
> >>>>>>>> of the KIP and might not be as trivial as people could expect. I will
> >>>>>>>> update the KIP once I do have some spare time.
> >>>>>>>>
> >>>>>>>> 6. Oh yeah, I didn't think about it, but forwarding input headers
> >>>>>>>> would definitely make sense. Confluent Schema Registry ID is actually
> >>>>>>>> part of the payload, but many correlation ID and technical metadata
> >>>>>>>> are passed through headers, it makes sense to forward them, specially
> >>>>>>>> as it is the default behavior of Kafka Streams,
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford 
> >>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Damien and Sebastien,
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>> I think you can just add a `String topic` argument to the existing
> >>>>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> >>>>>>>>> deadLetterQueueRecord)` method, and then the implementation of the
> >>>>>>>>> exception handler could choose the topic to send records to using
> >>>>>>> whatever
> >>>>>>>>> logic the user desires. You could perhaps provide a built-in
> >>>>>>> implementation
> >>>>>>>>> that leverages your new config to send all records to an untyped DLQ
> >>>>>>> topic?
> >>>>>>>>>
> >>>>>>>>> 1a.
> >>>>>>>>> BTW you have a typo: in your DeserializationExceptionHandler, the 
> >>>>>>>>> type
> >>>>>>> of
> >>>>>>>>> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it
> >>>>>>> should
> >>>>>>>>> probably be `ConsumerRecord`.
> >>>>>>>>>
> >>>>>>>>> 2.
> >>>>>>>>> Agreed. I think it's a good idea to provide an implementation that
> >>>>>>> sends to
> >>>>>>>>> a single DLQ by default, but it's important to enable users to
> >>>>>>> customize
> >>>>>>>>> this with their own exception handlers.
> >>>>>>>>>
> >>>>>>>>> 2a.
> >>>>>>>>> I'm not convinced that "errors" (e.g. failed punctuate) should be 
> >>>>>>>>> sent
> >>>>>>> to a
> >>>>>>>>> DLQ topic like it's a bad record. To me, a DLQ should only contain
> >>>>>>> records
> >>>>>>>>> that failed to process. I'm not even sure how a user would
> >>>>>>>>> re-process/action one of these other errors; it seems like the 
> >>>>>>>>> purview
> >>>>>>> of
> >>>>>>>>> error logging to me?
> >>>>>>>>>
> >>>>>>>>> 4.
> >>>>>>>>> My point here was that I think it would be useful for the KIP to
> >>>>>>> contain an
> >>>>>>>>> explanation of the behavior both with KIP-1033 and without it. i.e.
> >>>>>>> clarify
> >>>>>>>>> if/how records that throw an exception in a processor are handled. 
> >>>>>>>>> At
> >>>>>>> the
> >>>>>>>>> moment, I'm assuming that without KIP-1033, processing exceptions
> >>>>>>> would not
> >>>>>>>>> cause records to be sent to the DLQ, but with KIP-1033, they would. 
> >>>>>>>>> If
> >>>>>>> this
> >>>>>>>>> assumption is correct, I think it should be made explicit in the 
> >>>>>>>>> KIP.
> >>>>>>>>>
> >>>>>>>>> 5.
> >>>>>>>>> Understood. You may want to make this explicit in the documentation 
> >>>>>>>>> for
> >>>>>>>>> users, so they understand the consequences of re-processing data 
> >>>>>>>>> sent
> >>>>>>> to
> >>>>>>>>> their DLQ. The main reason I raised this point is it's something 
> >>>>>>>>> that's
> >>>>>>>>> tripped me up in numerous KIPs that that committers frequently 
> >>>>>>>>> remind
> >>>>>>> me
> >>>>>>>>> of; so I wanted to get ahead of it for once! :D
> >>>>>>>>>
> >>>>>>>>> And one new point:
> >>>>>>>>> 6.
> >>>>>>>>> The DLQ record schema appears to discard all custom headers set on 
> >>>>>>>>> the
> >>>>>>>>> source record. Is there a way these can be included? In particular, 
> >>>>>>>>> I'm
> >>>>>>>>> concerned with "schema pointer" headers (like those set by Schema
> >>>>>>>>> Registry), that may need to be propagated, especially if the records
> >>>>>>> are
> >>>>>>>>> fed back into the source topics for re-processing by the user.
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Nick
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
> >>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Nick,
> >>>>>>>>>>
> >>>>>>>>>> Thanks a lot for your review and your useful comments!
> >>>>>>>>>>
> >>>>>>>>>> 1. It is a good point, as you mentioned, I think it would make 
> >>>>>>>>>> sense
> >>>>>>>>>> in some use cases to have potentially multiple DLQ topics, so we
> >>>>>>>>>> should provide an API to let users do it.
> >>>>>>>>>> Thinking out-loud here, maybe it is a better approach to create a 
> >>>>>>>>>> new
> >>>>>>>>>> Record class containing the topic name, e.g. DeadLetterQueueRecord
> >>>>>>> and
> >>>>>>>>>> changing the signature to
> >>>>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord>
> >>>>>>>>>> deadLetterQueueRecords) instead of
> >>>>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> >>>>>>>>>> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord
> >>>>>>> would
> >>>>>>>>>> be something like "class DeadLetterQueueRecord extends
> >>>>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords { String
> >>>>>>>>>> topic; /* + getter/setter + */ } "
> >>>>>>>>>>
> >>>>>>>>>> 2. I think the root question here is: should we have one DLQ topic 
> >>>>>>>>>> or
> >>>>>>>>>> multiple DLQ topics by default. This question highly depends on the
> >>>>>>>>>> context, but implementing a default implementation to handle 
> >>>>>>>>>> multiple
> >>>>>>>>>> DLQ topics would be opinionated, e.g. how to manage errors in a
> >>>>>>>>>> punctuate?
> >>>>>>>>>> I think it makes sense to have the default implementation writing 
> >>>>>>>>>> all
> >>>>>>>>>> faulty records to a single DLQ, that's at least the approach I used
> >>>>>>> in
> >>>>>>>>>> past applications: one DLQ per Kafka Streams application. Of course
> >>>>>>>>>> the message format could change in the DLQ e.g. due to the source
> >>>>>>>>>> topic, but those DLQ records will be very likely troubleshooted, 
> >>>>>>>>>> and
> >>>>>>>>>> maybe replay, manually anyway.
> >>>>>>>>>> If a user needs to have multiple DLQ topics or want to enforce a
> >>>>>>>>>> specific schema, it's still possible, but they would need to
> >>>>>>> implement
> >>>>>>>>>> custom Exception Handlers.
> >>>>>>>>>> Coming back to 1. I do agree that it would make sense to have the
> >>>>>>> user
> >>>>>>>>>> set the DLQ topic name in the handlers for more flexibility.
> >>>>>>>>>>
> >>>>>>>>>> 3. Good point, sorry it was a typo, the ProcessingContext makes 
> >>>>>>>>>> much
> >>>>>>>>>> more sense here indeed.
> >>>>>>>>>>
> >>>>>>>>>> 4. I do assume that we could implement KIP-1033 (Processing 
> >>>>>>>>>> exception
> >>>>>>>>>> handler) independently from KIP-1034. I do hope that KIP-1033 would
> >>>>>>> be
> >>>>>>>>>> adopted and implemented before KIP-1034, but if that's not the 
> >>>>>>>>>> case,
> >>>>>>>>>> we could implement KIP-1034 indepantly and update KIP-1033 to 
> >>>>>>>>>> include
> >>>>>>>>>> the DLQ record afterward (in the same KIP or in a new one if not
> >>>>>>>>>> possible).
> >>>>>>>>>>
> >>>>>>>>>> 5. I think we should be clear that this KIP only covers the DLQ
> >>>>>>> record
> >>>>>>>>>> produced.
> >>>>>>>>>> Everything related to replay messages or recovery plan should be
> >>>>>>>>>> considered out-of-scope as it is use-case and error specific.
> >>>>>>>>>>
> >>>>>>>>>> Let me know if that's not clear, there are definitely points that
> >>>>>>>>>> highly debatable.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Damien
> >>>>>>>>>>
> >>>>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford 
> >>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Oh, and one more thing:
> >>>>>>>>>>>
> >>>>>>>>>>> 5.
> >>>>>>>>>>> Whenever you take a record out of the stream, and then potentially
> >>>>>>>>>>> re-introduce it at a later date, you introduce the potential for
> >>>>>>> record
> >>>>>>>>>>> ordering issues. For example, that record could have been destined
> >>>>>>> for a
> >>>>>>>>>>> Window that has been closed by the time it's re-processed. I'd
> >>>>>>> like to
> >>>>>>>>>> see
> >>>>>>>>>>> a section that considers these consequences, and perhaps make
> >>>>>>> those risks
> >>>>>>>>>>> clear to users. For the record, this is exactly what sunk KIP-990,
> >>>>>>> which
> >>>>>>>>>>> was an alternative approach to error handling that introduced the
> >>>>>>> same
> >>>>>>>>>>> issues.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Nick
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com
> >>>>> <mailto:nick.telf...@gmail.com%0b>> > >
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Damien,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP! Dead-letter queues are something that I
> >>>>>>> think a
> >>>>>>>>>> lot of
> >>>>>>>>>>>> users would like.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think there are a few points with this KIP that concern me:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1.
> >>>>>>>>>>>> It looks like you can only define a single, global DLQ for the
> >>>>>>> entire
> >>>>>>>>>>>> Kafka Streams application? What about applications that would
> >>>>>>> like to
> >>>>>>>>>>>> define different DLQs for different data flows? This is
> >>>>>>> especially
> >>>>>>>>>>>> important when dealing with multiple source topics that have
> >>>>>>> different
> >>>>>>>>>>>> record schemas.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2.
> >>>>>>>>>>>> Your DLQ payload value can either be the record value that
> >>>>>>> failed, or
> >>>>>>>>>> an
> >>>>>>>>>>>> error string (such as "error during punctuate"). This is likely
> >>>>>>> to
> >>>>>>>>>> cause
> >>>>>>>>>>>> problems when users try to process the records from the DLQ, as
> >>>>>>> they
> >>>>>>>>>> can't
> >>>>>>>>>>>> guarantee the format of every record value will be the same.
> >>>>>>> This is
> >>>>>>>>>> very
> >>>>>>>>>>>> loosely related to point 1. above.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3.
> >>>>>>>>>>>> You provide a ProcessorContext to both exception handlers, but
> >>>>>>> state
> >>>>>>>>>> they
> >>>>>>>>>>>> cannot be used to forward records. In that case, I believe you
> >>>>>>> should
> >>>>>>>>>> use
> >>>>>>>>>>>> ProcessingContext instead, which statically guarantees that it
> >>>>>>> can't be
> >>>>>>>>>>>> used to forward records.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4.
> >>>>>>>>>>>> You mention the KIP-1033 ProcessingExceptionHandler, but what's
> >>>>>>> the
> >>>>>>>>>> plan
> >>>>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Nick
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <
> >>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> In a general way, if the user does not configure the right ACL,
> >>>>>>> that
> >>>>>>>>>>>>> would be a security issue, but that's true for any topic.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This KIP allows users to configure a Dead Letter Queue without
> >>>>>>> writing
> >>>>>>>>>>>>> custom Java code in Kafka Streams, not at the topic level.
> >>>>>>>>>>>>> A lot of applications are already implementing this pattern,
> >>>>>>> but the
> >>>>>>>>>>>>> required code to do it is quite painful and error prone, for
> >>>>>>> example
> >>>>>>>>>>>>> most apps I have seen created a new KafkaProducer to send
> >>>>>>> records to
> >>>>>>>>>>>>> their DLQ.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As it would be disabled by default for backward compatibility,
> >>>>>>> I doubt
> >>>>>>>>>>>>> it would generate any security concern.
> >>>>>>>>>>>>> If a user explicitly configures a Deal Letter Queue, it would
> >>>>>>> be up to
> >>>>>>>>>>>>> him to configure the relevant ACLs to ensure that the right
> >>>>>>> principal
> >>>>>>>>>>>>> can access it.
> >>>>>>>>>>>>> It is already the case for all internal, input and output Kafka
> >>>>>>>>>>>>> Streams topics (e.g. repartition, changelog topics) that also
> >>>>>>> could
> >>>>>>>>>>>>> contain confidential data, so I do not think we should
> >>>>>>> implement a
> >>>>>>>>>>>>> different behavior for this one.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In this KIP, we configured the default DLQ record to have the
> >>>>>>> initial
> >>>>>>>>>>>>> record key/value as we assume that it is the expected and wanted
> >>>>>>>>>>>>> behavior for most applications.
> >>>>>>>>>>>>> If a user does not want to have the key/value in the DLQ record
> >>>>>>> for
> >>>>>>>>>>>>> any reason, they could still implement exception handlers to
> >>>>>>> build
> >>>>>>>>>>>>> their own DLQ record.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding ACL, maybe something smarter could be done in Kafka
> >>>>>>> Streams,
> >>>>>>>>>>>>> but this is out of scope for this KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren 
> >>>>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>>
> >>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> My concern is that someone would create a dead letter queue
> >>>>>>> on a
> >>>>>>>>>>>>> sensitive
> >>>>>>>>>>>>>> topic and not get the ACL correct from the start. Thus
> >>>>>>> causing
> >>>>>>>>>>>>> potential
> >>>>>>>>>>>>>> confidential data leak. Is there anything in the proposal
> >>>>>>> that
> >>>>>>>>>> would
> >>>>>>>>>>>>>> prevent that from happening? If so I did not recognize it as
> >>>>>>> such.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina <
> >>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Claude,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In this KIP, the Dead Letter Queue is materialized by a
> >>>>>>> standard
> >>>>>>>>>> and
> >>>>>>>>>>>>>>> independant topic, thus normal ACL applies to it like any
> >>>>>>> other
> >>>>>>>>>> topic.
> >>>>>>>>>>>>>>> This should not introduce any security issues, obviously,
> >>>>>>> the
> >>>>>>>>>> right
> >>>>>>>>>>>>>>> ACL would need to be provided to write to the DLQ if
> >>>>>>> configured.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Damien
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> >>>>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>>
> >>>>>>>>>>>>>>>  wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I am new to the Kafka codebase so please excuse any
> >>>>>>> ignorance
> >>>>>>>>>> on my
> >>>>>>>>>>>>> part.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> When a dead letter queue is established is there a
> >>>>>>> process to
> >>>>>>>>>>>>> ensure that
> >>>>>>>>>>>>>>>> it at least is defined with the same ACL as the original
> >>>>>>> queue?
> >>>>>>>>>>>>> Without
> >>>>>>>>>>>>>>>> such a guarantee at the start it seems that managing dead
> >>>>>>> letter
> >>>>>>>>>>>>> queues
> >>>>>>>>>>>>>>>> will be fraught with security issues.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
> >>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> To continue on our effort to improve Kafka Streams error
> >>>>>>>>>>>>> handling, we
> >>>>>>>>>>>>>>>>> propose a new KIP to add out of the box support for Dead
> >>>>>>>>>> Letter
> >>>>>>>>>>>>> Queue.
> >>>>>>>>>>>>>>>>> The goal of this KIP is to provide a default
> >>>>>>> implementation
> >>>>>>>>>> that
> >>>>>>>>>>>>>>>>> should be suitable for most applications and allow
> >>>>>>> users to
> >>>>>>>>>>>>> override
> >>>>>>>>>>>>>>>>> it if they have specific requirements.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> In order to build a suitable payload, some additional
> >>>>>>> changes
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>>> included in this KIP:
> >>>>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, when
> >>>>>>> available, the
> >>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>> node raw key/value byte[]
> >>>>>>>>>>>>>>>>> 2. expose the ProcessingContext to the
> >>>>>>>>>>>>> ProductionExceptionHandler,
> >>>>>>>>>>>>>>>>> it is currently not available in the handle parameters.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regarding point 2., to expose the ProcessingContext to
> >>>>>>> the
> >>>>>>>>>>>>>>>>> ProductionExceptionHandler, we considered two choices:
> >>>>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a parameter in
> >>>>>>> the
> >>>>>>>>>> handle()
> >>>>>>>>>>>>>>>>> method. That's the cleanest way IMHO, but we would need
> >>>>>>> to
> >>>>>>>>>>>>> deprecate
> >>>>>>>>>>>>>>>>> the old method.
> >>>>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an attribute in
> >>>>>>> the
> >>>>>>>>>>>>> interface.
> >>>>>>>>>>>>>>>>> This way, no method is deprecated, but we would not be
> >>>>>>>>>> consistent
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>> the other ExceptionHandler.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new handle
> >>>>>>> signature
> >>>>>>>>>> with
> >>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>> one deprecated), but we could use other opinions on
> >>>>>>> this part.
> >>>>>>>>>>>>>>>>> More information is available directly on the KIP.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> KIP link:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Feedbacks and suggestions are welcome,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Damien, Sebastien and Loic
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> LinkedIn: 
> >>>>>>>>>>>>>> http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>
> >

Reply via email to