Hi everyone, We just updated KIP-1034, we changed the following: - We included the ProcessingExceptionHandler (KIP-1033) directly in the KIP; - We provided examples to clarify the new configuration, and how it could be leveraged.
I think we can resume the conversation on this KIP. Cheers, Damien Sebastien and Loic On Tue, 27 Aug 2024 at 15:37, Sebastien Viale <sebastien.vi...@michelin.com> wrote: > > > Hi Bruno, > > We have planned a meeting for next friday to discuss it with Loic and Damien. > > We will be able to restart discussions about it soon. > > regards > > ________________________________ > De : Bruno Cadonna <cado...@apache.org> > Envoyé : lundi 26 août 2024 11:32 > À : dev@kafka.apache.org <dev@kafka.apache.org> > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > > Warning External sender Do not click on any links or open any attachments > unless you trust the sender and know the content is safe. > > Hi Loïc, Sebastien, and Damien, > > Now that KIP-1033 is going to be released in 3.9, what is the plan to > progress with this KIP? > > Is the KIP up-to-date, so that we can restart discussion? > > Best, > Bruno > > This email was screened for spam and malicious content but exercise caution > anyway. > > > > > On 6/13/24 6:16 PM, Damien Gasparina wrote: > > Hi Bruno, > > > > We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, > > that's why not much progress has been made on this one yet. > > Regarding your points: > > > > B1: It is up to the user to specify the DLQ topic name and to > > implement a potential differentiation. I tend to think that having one > > DLQ per application ID is the wisest, but I encountered cases and > > applications that preferred having a shared DLQ topic between multiple > > applications, e.g. to reduce the number of partitions, or to ease > > monitoring > > > > B2 : Goot catch, it should be > > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" > > prefix during the discussion, looks like I forgot to update all > > occurrences in the KIP. > > > > B3 :The trigger for sending to the DLQ would be if > > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user > > implemented a custom exception handler that returns DLQ records. > > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the > > behavior of the default handler, thus custom exception handlers will > > completely ignore this parameter. > > > > I think it's a good trade-off between providing a production-ready > > default implementation, yet providing sufficient flexibility for > > complex use-cases. > > This behavior definitely needs to be documented, but I guess it's safe > > to push the responsibility of the DLQ records to the user if they > > implement custom handlers. > > > > Cheers, > > Damien > > > > > > On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna <cado...@apache.org> wrote: > >> > >> Hi, > >> > >> since there was not too much activity in this thread recently, I was > >> wondering what the status of this discussion is. > >> > >> I cannot find the examples in the KIP Sébastien mentioned in the last > >> message to this thread. I can also not find the corresponding definition > >> of the following method call in the KIP: > >> > >> FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >> > >> I have also some comments: > >> > >> B1 > >> Did you consider to prefix the dead letter queue topic names with the > >> application ID to distinguish the topics between Streams apps? Or is the > >> user responsible for the differentiation? If the user is responsible, we > >> risk that faulty records of different Streams apps end up in the same > >> dead letter queue. > >> > >> B2 > >> Is the name of the dead letter queue topic config > >> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or > >> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. > >> > >> B3 > >> What is exactly the trigger to send a record to the dead letter queue? > >> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a > >> record to the return value of the exception handler? > >> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do > >> not add a record to the return value of the handler? What happens if I > >> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to > >> the return value of the handler? > >> > >> Best, > >> Bruno > >> > >> On 4/22/24 10:19 PM, Sebastien Viale wrote: > >>> Hi, > >>> > >>> Thanks for your remarks > >>> > >>> L1. I would say "who can do the most can do the least", even though most > >>> people will fail and stop, we found it interesting to offer the > >>> possibility to fail-and-send-to-DLQ > >>> > >>> L2: We did not consider extending the TimestampExtractor because we > >>> estimate it out of scope for this KIP. Perhaps it will be possible to > >>> include it in an ExceptionHandler later. > >>> > >>> L3: we will include an example in the KIP, but as we mentioned earlier, > >>> the DLQ topic can be different in each custom Exception Handler: > >>> > >>> When providing custom handlers, users would have the possibility to > >>> return: > >>> * FAIL > >>> * CONTINUE > >>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > >>> > >>> cheers ! > >>> Sébastien > >>> > >>> > >>> ________________________________ > >>> De : Lucas Brutschy <lbruts...@confluent.io.INVALID> > >>> Envoyé : lundi 22 avril 2024 14:36 > >>> À : dev@kafka.apache.org <dev@kafka.apache.org> > >>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >>> > >>> Warning External sender Do not click on any links or open any attachments > >>> unless you trust the sender and know the content is safe. > >>> > >>> Hi! > >>> > >>> Thanks for the KIP, great stuff. > >>> > >>> L1. I was a bit confused that the default configuration (once you set > >>> a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood > >>> correctly. Is this something that will be a common use-case, and is it > >>> a configuration that we want to encourage? It expected that you either > >>> want to fail or skip-and-send-to-DLQ. > >>> > >>> L2. Have you considered extending the `TimestampExtractor` interface > >>> so that it can also produce to DLQ? AFAIK it's not covered by any of > >>> the existing exception handlers, but it can cause similar failures > >>> (potentially custom logic, depends on validity input record). There > >>> could also be a default implementation as a subclass of > >>> `ExtractRecordMetadataTimestamp`. > >>> > >>> L3. It would be nice to include an example of how to produce to > >>> multiple topics in the KIP, as I can imagine that this will be a > >>> common use-case. I wasn't sure how much code would be involved to make > >>> it work. If a lot of code is required, we may want to consider > >>> exposing some utils that make it easier. > >>> > >>> Cheers, > >>> Lucas > >>> > >>> This email was screened for spam and malicious content but exercise > >>> caution anyway. > >>> > >>> > >>> > >>> On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina <d.gaspar...@gmail.com> > >>> wrote: > >>>> > >>>> Hi everyone, > >>>> > >>>> Following all the discussion on this KIP and KIP-1033, we introduced a > >>>> new container class containing only processing context metadata: > >>>> ProcessingMetadata. This new container class is actually part of > >>>> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I > >>>> think it's the wisest implementation wise. > >>>> > >>>> I also clarified the interface of the enums: > >>>> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], > >>>> byte[]>> deadLetterQueueRecords) . Very likely most users would just > >>>> send one DLQ record, but there might be specific use-cases and what > >>>> can do more can do less, so I added an Iterable. > >>>> > >>>> I took some time to think about the impact of storing the > >>>> ProcessingMetadata on the ProductionExceptionHandler. I think storing > >>>> the topic/offset/partition should be fine, but I am concerned about > >>>> storing the rawSourceKey/Value. I think it could impact some specific > >>>> use-cases, for example, a high-throughput Kafka Streams application > >>>> "counting" messages could have huge source input messages, and very > >>>> small sink messages, here, I assume storing the rawSourceKey/Value > >>>> could significantly require more memory than the actual Kafka Producer > >>>> buffer. > >>>> > >>>> I think the safest approach is actually to only store the fixed-size > >>>> metadata for the ProductionExceptionHandler.handle: > >>>> topic/partition/offset/processorNodeId/taskId, it might be confusing > >>>> for the user, but 1) it is still better than nowaday where there are > >>>> no context information at all, 2) it would be clearly stated in the > >>>> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the > >>>> punctuate case). . > >>>> > >>>> Do you think it would be a suitable design Sophie? > >>>> > >>>> Cheers, > >>>> Damien > >>>> > >>>> On Sun, 14 Apr 2024 at 21:30, Loic Greffier <loic.greff...@michelin.com> > >>>> wrote: > >>>>> > >>>>> Hi Sophie, > >>>>> > >>>>> Thanks for your feedback. > >>>>> Completing the Damien's comments here for points S1 and S5B. > >>>>> > >>>>> S1: > >>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>>> ProducerRecord class for this? > >>>>> > >>>>> I am wondering if it makes sense to alter the ProducerRecord from > >>>>> Clients API with a "deadLetterQueueTopicName" attribute dedicated to > >>>>> Kafka Streams DLQ. > >>>>> Adding "deadLetterQueueTopicName" as an additional parameter to > >>>>> "withDeadLetterQueueRecord" is a good option, and may allow users to > >>>>> send records to different DLQ topics depending on conditions: > >>>>> @Override > >>>>> public ProductionExceptionHandlerResponse handle(final > >>>>> ProcessingContext context, > >>>>> ProducerRecord<byte[], byte[]> record, > >>>>> Exception exception) { > >>>>> if (condition1) { > >>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>> .withDeadLetterQueueRecord(record, "dlq-topic-a"); > >>>>> } > >>>>> if (condition2) { > >>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>> .withDeadLetterQueueRecord(record, "dlq-topic-b"); > >>>>> } > >>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>> .withDeadLetterQueueRecord(record, "dlq-topic-c"); > >>>>> } > >>>>> > >>>>> S5B: > >>>>>> I was having a bit of trouble understanding what the behavior would be > >>>>>> if someone configured a "errors.deadletterqueue.topic.name" but didn't > >>>>>> implement the handlers. > >>>>> > >>>>> The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler > >>>>> and DefaultProductionExceptionHandler should be able to tell if records > >>>>> should be sent to DLQ or not. > >>>>> The "errors.deadletterqueue.topic.name" takes place to: > >>>>> > >>>>> * Specifying if the provided handlers should or should not send records > >>>>> to DLQ. > >>>>> * If the value is empty, the handlers should not send records to DLQ. > >>>>> * If the value is not empty, the handlers should send records to DLQ. > >>>>> * Define the name of the DLQ topic that should be used by the provided > >>>>> handlers. > >>>>> > >>>>> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided > >>>>> handlers should return either: > >>>>> > >>>>> * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue) > >>>>> * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue). > >>>>> If "errors.deadletterqueue.topic.name" is defined but neither > >>>>> DeserializationExceptionHandler nor ProductionExceptionHandler classes > >>>>> are defined in the configuration, then nothing should happen as sending > >>>>> to DLQ is based on handlers’ response. > >>>>> When providing custom handlers, users would have the possibility to > >>>>> return: > >>>>> > >>>>> * FAIL > >>>>> * CONTINUE > >>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>> > >>>>> A DLQ topic name is currently required using the two last response > >>>>> types. > >>>>> I am wondering if it could benefit users to ease the use of the default > >>>>> DLQ topic "errors.deadletterqueue.topic.name" when implementing custom > >>>>> handlers, with such kind of implementation: > >>>>> > >>>>> * FAIL.withDefaultDeadLetterQueueRecord(record) > >>>>> * CONTINUE.withDefaultDeadLetterQueueRecord(record) > >>>>> > >>>>> Regards, > >>>>> Loïc > >>>>> > >>>>> De : Damien Gasparina <d.gaspar...@gmail.com> > >>>>> Envoyé : dimanche 14 avril 2024 20:24 > >>>>> À : dev@kafka.apache.org > >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >>>>> > >>>>> Warning External sender Do not click on any links or open any > >>>>> attachments unless you trust the sender and know the content is safe. > >>>>> > >>>>> Hi Sophie, > >>>>> > >>>>> Thanks a lot for your feedback and your detailed comments. > >>>>> > >>>>> S1. > >>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>> ProducerRecord class for this? > >>>>> > >>>>> Sorry for the poor wording, that's not what I meant. While writing the > >>>>> KIP, I was hesitating between 1. leveraging the Kafka Producer > >>>>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in > >>>>> a separate parameter, 3. a new custom interface (e.g. > >>>>> DeadLetterQueueRecord). > >>>>> As the KafkaProducer ProducerRecord is not used in the Kafka Streams > >>>>> API (except ProductionExceptionHandler) and I would like to avoid a > >>>>> new interface if not strictly required, I leaned toward option 2. > >>>>> Thinking about it, maybe option 1. would be best, but I assume it > >>>>> could create confusion with KafkaStreams ProducerRecord. Let me sleep > >>>>> on it. > >>>>> > >>>>> S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and > >>>>> your point in S4, it seems more and more likely that we will create a > >>>>> new container class containing only the metadata for the exception > >>>>> handlers. To be consistent, I think we should use this new > >>>>> implementation in all exception handlers. > >>>>> The only issue I could think off is that the new interface would > >>>>> expose less data than the current ProcessorContext in the > >>>>> DeserializationException(e.g. stateDir(), metrics(), getStateStore()), > >>>>> thus it could be hard for some users to migrate to the new interface. > >>>>> I do expect that only a few users would be impacted as the javadoc is > >>>>> very clear: `Note, that the passed in {@link ProcessorContext} only > >>>>> allows access to metadata like the task ID.` > >>>>> > >>>>> S3. I completely agree with you, it is something that might not be > >>>>> trivial and should be thoroughly covered by unit tests during the > >>>>> implementation. > >>>>> > >>>>> S4. Good point, I did not notice that the ProductionExceptionHandler > >>>>> is also invoked in the producer.send() callback. > >>>>> Capturing the ProcessingContext for each in-flight message is probably > >>>>> not possible. I think there is no other way to write a custom > >>>>> container class holding only the metadata that are essentials, I am > >>>>> thinking of storing the following attributes: source topic, partition, > >>>>> offset, rawKey, rawValue and taskId. > >>>>> Those metadata should be relatively small, but I assume that there > >>>>> could be a high number of in-flight messages, especially with at least > >>>>> once processing guarantee. Do you think it would be fine memory wise? > >>>>> > >>>>> S5. As many exceptions are only accessible in exception handlers, and > >>>>> we wanted to 1) allow users to customize the DLQ records and 2) have a > >>>>> suitable DLQ out of the box implementation, we felt it natural to rely > >>>>> on exception handlers, that's also why we created KIP-1033. > >>>>> Piggybacking on the enum response was the cleanest way we could think > >>>>> off, but we are completely open to suggestions. > >>>>> > >>>>> S5a. Completely agree with you on this point, for this DLQ approach to > >>>>> be complete, the ProcessingExceptionHandler introduced in KIP-1033 is > >>>>> required. KIP-1033 is definitely our first priority. We decided to > >>>>> kick-off the KIP-1034 discussion as we expected the discussions to be > >>>>> dynamic and could potentially impact some choices of KIP-1033. > >>>>> > >>>>> S5b. In this KIP, we wanted to 1. provide as much flexibility to the > >>>>> user as possible; 2. provide a good default implementation > >>>>> for the DLQ without having to write custom exception handlers. > >>>>> For the default implementation, we introduced a new configuration: > >>>>> errors.deadletterqueue.topic.name. > >>>>> > >>>>> If this configuration is set, it changes the behavior of the provided > >>>>> exception handlers to return a DLQ record containing the raw key/value > >>>>> + headers + exception metadata in headers. > >>>>> If the out of the box implementation is not suitable for a user, e.g. > >>>>> the payload needs to be masked in the DLQ, it could implement their > >>>>> own exception handlers. The errors.deadletterqueue.topic.name would > >>>>> only impact Kafka Streams bundled exception handlers (e.g. > >>>>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler) > >>>>> > >>>>> Let me update the KIP to make it clear and also provide examples. > >>>>> > >>>>> S6/S7. Good point, mea culpa for the camel case, it must have been a > >>>>> sugar rush :) > >>>>> > >>>>> Thanks again for your detailed comments and pointing out S4 > >>>>> (production exception & Processing Context)! > >>>>> > >>>>> Cheers, > >>>>> Damien > >>>>> This email was screened for spam and malicious content but exercise > >>>>> caution anyway. > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman > >>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote: > >>>>>> > >>>>>> Thanks for the KIP, this will make a lot of people very happy. > >>>>>> > >>>>>> Wanted to chime in on a few points that have been raised so far and add > >>>>>> some of my own (numbering with an S to distinguish my points from the > >>>>>> previous ones) > >>>>>> > >>>>>> S1. > >>>>>> > >>>>>>> 1.a I really meant ProducerRecord, that's the class used to forward to > >>>>>>> downstream processors in the PAPI. The only information missing in > >>>>>>> this class is the topic name. I also considered relying on the Kafka > >>>>>>> Producer ProducerRecord, but I assume it would not be consistent with > >>>>>>> the KafkaStreams API. > >>>>>> > >>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>>> ProducerRecord class for this? Why not just use the existing one, ie > >>>>>> the > >>>>>> o.a.k.clients.producer.ProducerRecord class? This is what the > >>>>>> ProductionExceptionHandler uses, so it's definitely "consistent". In > >>>>>> other > >>>>>> words, we can remove the "String deadLetterQueueTopicName" > >>>>>> > >>>>>> S2. > >>>>>> I think this would be a good opportunity to also deprecate the existing > >>>>>> #handle method of the DeserializationExceptionHandler, and replace it > >>>>>> with > >>>>>> one that uses a ProcessingContext instead of the ProcessorContext. > >>>>>> Partly > >>>>>> for the same reasons about guarding access to the #forward methods, > >>>>>> partly > >>>>>> because this method needs to be migrated to the new PAPI interface > >>>>>> anyways, and ProcessingContext is part of the new one. > >>>>>> > >>>>>> S3. > >>>>>> Regarding 2a. -- I'm inclined to agree that records which a Punctuator > >>>>>> failed to produce should also be sent to the DLQ via the > >>>>>> ProductionExceptionHandler. Users will just need to be careful about > >>>>>> accessing certain fields of the ProcessingContext that aren't > >>>>>> available in > >>>>>> the punctuator, and need to check the Optional returned by the > >>>>>> ProcessingContext#recordMetadata API. > >>>>>> Also, from an implementation standpoint, it will be really hard to > >>>>>> distinguish between a record created by a punctuator vs a processor > >>>>>> from > >>>>>> within the RecordCollector, which is the class that actually handles > >>>>>> sending records to the Streams Producer and invoking the > >>>>>> ProductionExceptionHandler. This is because the RecordCollector is at > >>>>>> the > >>>>>> "end" of the topology graph and doesn't have any context about which > >>>>>> of the > >>>>>> upstream processors actually attempted to forward a record. > >>>>>> > >>>>>> This in itself is at least theoretically solvable, but it leads into my > >>>>>> first major new point: > >>>>>> > >>>>>> S4: > >>>>>> I'm deeply worried about passing the ProcessingContext in as a means of > >>>>>> forwarding metadata. The problem is that the processing/processor > >>>>>> context > >>>>>> is a mutable class and is inherently meaningless outside the context > >>>>>> of a > >>>>>> specific task. And when I said earlier that the RecordCollector sits at > >>>>>> the "end" of the topology, I meant that it's literally outside the > >>>>>> task's > >>>>>> subtopology and is used/shared by all tasks on that StreamThread. So to > >>>>>> begin with, there's no guarantee what will actually be returned for > >>>>>> essential methods such as the new #rawSourceKey/Value or the existing > >>>>>> #recordMetadata > >>>>>> > >>>>>> For serialization exceptions it'll probably be correct, but for general > >>>>>> send errors it almost definitely won't be. In short, this is because we > >>>>>> send records to the producer after the sink node, but don't check for > >>>>>> send > >>>>>> errors right away since obviously it takes some time for the producer > >>>>>> to > >>>>>> actually send. In other words, sending/producing records is actually > >>>>>> done > >>>>>> asynchronously with processing, and we simply check for errors on any > >>>>>> previously-sent records > >>>>>> during the send on a new record in a sink node. This means the context > >>>>>> we > >>>>>> would be passing in to a (non-serialization) exception would pretty > >>>>>> much > >>>>>> always correspond not the the record that experienced the error, but > >>>>>> the > >>>>>> random record that happened to be being sent when we checked and saw > >>>>>> the > >>>>>> error for the failed record. > >>>>>> > >>>>>> This discrepancy, in addition to the whole "sourceRawKey/Value and > >>>>>> recordMetadata are null for punctuators" issue, seems like an > >>>>>> insurmountable inconsistency that is more likely to cause users > >>>>>> confusion > >>>>>> or problems than be helpful. > >>>>>> We could create a new metadata object and copy over the relevant info > >>>>>> from > >>>>>> the ProcessingContext, but I worry that has the potential to explode > >>>>>> memory > >>>>>> since we'd need to hold on to it for all in-flight records up until > >>>>>> they > >>>>>> are either successfully sent or failed and passed in to the > >>>>>> ProductionExceptionHandler. But if the metadata is relatively small, > >>>>>> it's > >>>>>> probably fine. Especially if it's just the raw source key/value. Are > >>>>>> there any other parts of the ProcessingContext you think should be made > >>>>>> available? > >>>>>> > >>>>>> Note that this only applies to the ProductionExceptionHandler, as the > >>>>>> DeserializationExceptionHandler (and the newly proposed > >>>>>> ProcessingExceptionHandler) would both be invoked immediately and > >>>>>> therefore > >>>>>> with the failed record's context. However, I'm also a bit uncomfortable > >>>>>> with adding the rawSourceKey/rawSourceValue to the ProcessingContext. > >>>>>> So > >>>>>> I'd propose to just wrap those (and any other metadata you might want) > >>>>>> in a > >>>>>> container class and pass that in instead of the ProcessingContext, to > >>>>>> all > >>>>>> of the exception handlers. > >>>>>> > >>>>>> S5: > >>>>>> For some reason I'm finding the proposed API a little bit awkward, > >>>>>> although > >>>>>> it's entirely possible that the problem is with me, not the proposal :) > >>>>>> Specifically I'm struggling with the approach of piggybacking on the > >>>>>> exception handlers and their response enums to dictate how records are > >>>>>> forwarded to the DLQ. I think this comes down to two things, though > >>>>>> again, > >>>>>> these aren't necessarily problems with the API and probably just need > >>>>>> to be > >>>>>> hashed out: > >>>>>> > >>>>>> S5a. > >>>>>> When I envision a DLQ, to me, the most common use case would be to > >>>>>> forward > >>>>>> input records that failed somewhere along the processing graph. But it > >>>>>> seems like all the focus here is on the two far ends of the > >>>>>> subtopology -- > >>>>>> the input/consumer, and the output/producer. I get that > >>>>>> the ProcessingExceptionHandler is really the missing piece here, and > >>>>>> it's > >>>>>> hard to say anything specific since it's not yet accepted, but maybe a > >>>>>> somewhat more concrete example would help. FWIW I think/hope to get > >>>>>> that > >>>>>> KIP accepted and implementation ASAP, so I'm not worried about the > >>>>>> "what if > >>>>>> it doesn't happen" case -- more just want to know what it will look > >>>>>> like > >>>>>> when it does. Imo it's fine to build KIPs on top of future ones, it > >>>>>> feels > >>>>>> clear that this part will just have to wait for that KIP to actually be > >>>>>> added. > >>>>>> > >>>>>> S5b: > >>>>>> Why do users have to define the entire ProducerRecord -- shouldn't > >>>>>> Streams > >>>>>> handle all this for them? Or will we just automatically send every > >>>>>> record > >>>>>> on failure to the default global DLQ, and users only have to implement > >>>>>> the > >>>>>> handlers if they want to change the headers or send to a different > >>>>>> topic? I > >>>>>> was having a bit of trouble understanding what the behavior would be if > >>>>>> someone configured a "errors.deadletterqueue.topic.name" but didn't > >>>>>> implement the handlers. Apologies if it's somewhere in the KIP and I > >>>>>> happened to miss it! > >>>>>> > >>>>>> Either way, I really think an example would help me to better imagine > >>>>>> what > >>>>>> this will look like in practice, and evaluate whether it actually > >>>>>> involves > >>>>>> as much overhead as I'm worried it will. Can you add a section that > >>>>>> includes a basic implementation of all the features here? Nothing too > >>>>>> complicated, just the most bare-bones code needed to actually implement > >>>>>> forwarding to a dead-letter-queue via the handlers. > >>>>>> > >>>>>> Lastly, two super small things: > >>>>>> > >>>>>> S6: > >>>>>> We use camel case in Streams, so it should be rawSourceKey/Value rather > >>>>>> than raw_source_key/value > >>>>>> > >>>>>> S7: > >>>>>> Can you add javadocs for the #withDeadLetterQueueRecord? For example, > >>>>>> it > >>>>>> seems to me that if the topic to be sent to here is different than the > >>>>>> default/global DLQ, then the user will need to make sure to have > >>>>>> created > >>>>>> this themselves up front. > >>>>>> > >>>>>> That's it from me...sorry for the long response, it's just because I'm > >>>>>> excited for this feature and have been waiting on a KIP for this for > >>>>>> years. > >>>>>> > >>>>>> Cheers, > >>>>>> Sophie > >>>>>> > >>>>>> > >>>>>> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina > >>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Andrew, > >>>>>>> > >>>>>>> Thanks a lot for your review, plenty of good points! > >>>>>>> > >>>>>>> 11. Typo fixed, good cach. > >>>>>>> > >>>>>>> 12. I do agree with you and Nick also mentioned it, I updated the KIP > >>>>>>> to mention that context headers should be forwarded. > >>>>>>> > >>>>>>> 13. Good catch, to be consistent with KIP-298, and without a strong > >>>>>>> opinion from my side, I updated the KIP with your prefix proposal. > >>>>>>> > >>>>>>> 14. I am not sure about this point, a big difference between KIP-298 > >>>>>>> and this one is that the handlers can easily be overridden, something > >>>>>>> that is not doable in Kafka Connect. > >>>>>>> If someone would like a different behavior, e.g. to mask the payload > >>>>>>> or include further headers, I think we should encourage them to write > >>>>>>> their own exception handlers to build the DLQ Record the way they > >>>>>>> expect. > >>>>>>> > >>>>>>> 15. Yeah, that's a good point, I was not fully convinced about putting > >>>>>>> a String in it, I do assume that "null" is also a valid value. I do > >>>>>>> assume that the Stacktrace and the Exception in this case are the key > >>>>>>> metadata for the user to troubleshoot the problem. > >>>>>>> I updated the KIP to mention that the value should be null if > >>>>>>> triggered in a punctuate. > >>>>>>> > >>>>>>> 16. I added a session to mention that Kafka Streams would not try to > >>>>>>> automatically create the topic and the topic should either be > >>>>>>> automatically created, or pre-created. > >>>>>>> > >>>>>>> 17. If a DLQ record can not be sent, the exception should go to the > >>>>>>> uncaughtExceptionHandler. Let me clearly state it in the KIP. > >>>>>>> > >>>>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina > >>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi Nick, > >>>>>>>> > >>>>>>>> 1. Good point, that's less impactful than a custom interface, I just > >>>>>>>> updated the KIP with the new signature. > >>>>>>>> > >>>>>>>> 1.a I really meant ProducerRecord, that's the class used to forward > >>>>>>>> to > >>>>>>>> downstream processors in the PAPI. The only information missing in > >>>>>>>> this class is the topic name. I also considered relying on the Kafka > >>>>>>>> Producer ProducerRecord, but I assume it would not be consistent with > >>>>>>>> the KafkaStreams API. > >>>>>>>> > >>>>>>>> 2. Agreed > >>>>>>>> > >>>>>>>> 2.a I do think exceptions occurring during punctuate should be > >>>>>>>> included in the DLQ. > >>>>>>>> Even if building a suitable payload is almost impossible, even with > >>>>>>>> custom code; those exceptions are still fatal for Kafka Streams by > >>>>>>>> default and are something that can not be ignored safely. > >>>>>>>> I do assume that most users would want to be informed if an error > >>>>>>>> happened during a punctuate, even if only the metadata (e.g. > >>>>>>>> stacktrace, exception) is provided. > >>>>>>>> I am only concerned flooding the DLQ topic as, if a scheduled > >>>>>>>> operation failed, very likely it will fails during the next > >>>>>>>> invocation, but > >>>>>>>> > >>>>>>>> 4. Good point, I clarified the wording in the KIP to make it > >>>>>>>> explicit. > >>>>>>>> > >>>>>>>> 5. Good point, I will clearly mention that it is out of scope as part > >>>>>>>> of the KIP and might not be as trivial as people could expect. I will > >>>>>>>> update the KIP once I do have some spare time. > >>>>>>>> > >>>>>>>> 6. Oh yeah, I didn't think about it, but forwarding input headers > >>>>>>>> would definitely make sense. Confluent Schema Registry ID is actually > >>>>>>>> part of the payload, but many correlation ID and technical metadata > >>>>>>>> are passed through headers, it makes sense to forward them, specially > >>>>>>>> as it is the default behavior of Kafka Streams, > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford > >>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi Damien and Sebastien, > >>>>>>>>> > >>>>>>>>> 1. > >>>>>>>>> I think you can just add a `String topic` argument to the existing > >>>>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>>>> deadLetterQueueRecord)` method, and then the implementation of the > >>>>>>>>> exception handler could choose the topic to send records to using > >>>>>>> whatever > >>>>>>>>> logic the user desires. You could perhaps provide a built-in > >>>>>>> implementation > >>>>>>>>> that leverages your new config to send all records to an untyped DLQ > >>>>>>> topic? > >>>>>>>>> > >>>>>>>>> 1a. > >>>>>>>>> BTW you have a typo: in your DeserializationExceptionHandler, the > >>>>>>>>> type > >>>>>>> of > >>>>>>>>> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it > >>>>>>> should > >>>>>>>>> probably be `ConsumerRecord`. > >>>>>>>>> > >>>>>>>>> 2. > >>>>>>>>> Agreed. I think it's a good idea to provide an implementation that > >>>>>>> sends to > >>>>>>>>> a single DLQ by default, but it's important to enable users to > >>>>>>> customize > >>>>>>>>> this with their own exception handlers. > >>>>>>>>> > >>>>>>>>> 2a. > >>>>>>>>> I'm not convinced that "errors" (e.g. failed punctuate) should be > >>>>>>>>> sent > >>>>>>> to a > >>>>>>>>> DLQ topic like it's a bad record. To me, a DLQ should only contain > >>>>>>> records > >>>>>>>>> that failed to process. I'm not even sure how a user would > >>>>>>>>> re-process/action one of these other errors; it seems like the > >>>>>>>>> purview > >>>>>>> of > >>>>>>>>> error logging to me? > >>>>>>>>> > >>>>>>>>> 4. > >>>>>>>>> My point here was that I think it would be useful for the KIP to > >>>>>>> contain an > >>>>>>>>> explanation of the behavior both with KIP-1033 and without it. i.e. > >>>>>>> clarify > >>>>>>>>> if/how records that throw an exception in a processor are handled. > >>>>>>>>> At > >>>>>>> the > >>>>>>>>> moment, I'm assuming that without KIP-1033, processing exceptions > >>>>>>> would not > >>>>>>>>> cause records to be sent to the DLQ, but with KIP-1033, they would. > >>>>>>>>> If > >>>>>>> this > >>>>>>>>> assumption is correct, I think it should be made explicit in the > >>>>>>>>> KIP. > >>>>>>>>> > >>>>>>>>> 5. > >>>>>>>>> Understood. You may want to make this explicit in the documentation > >>>>>>>>> for > >>>>>>>>> users, so they understand the consequences of re-processing data > >>>>>>>>> sent > >>>>>>> to > >>>>>>>>> their DLQ. The main reason I raised this point is it's something > >>>>>>>>> that's > >>>>>>>>> tripped me up in numerous KIPs that that committers frequently > >>>>>>>>> remind > >>>>>>> me > >>>>>>>>> of; so I wanted to get ahead of it for once! :D > >>>>>>>>> > >>>>>>>>> And one new point: > >>>>>>>>> 6. > >>>>>>>>> The DLQ record schema appears to discard all custom headers set on > >>>>>>>>> the > >>>>>>>>> source record. Is there a way these can be included? In particular, > >>>>>>>>> I'm > >>>>>>>>> concerned with "schema pointer" headers (like those set by Schema > >>>>>>>>> Registry), that may need to be propagated, especially if the records > >>>>>>> are > >>>>>>>>> fed back into the source topics for re-processing by the user. > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Nick > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina > >>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Nick, > >>>>>>>>>> > >>>>>>>>>> Thanks a lot for your review and your useful comments! > >>>>>>>>>> > >>>>>>>>>> 1. It is a good point, as you mentioned, I think it would make > >>>>>>>>>> sense > >>>>>>>>>> in some use cases to have potentially multiple DLQ topics, so we > >>>>>>>>>> should provide an API to let users do it. > >>>>>>>>>> Thinking out-loud here, maybe it is a better approach to create a > >>>>>>>>>> new > >>>>>>>>>> Record class containing the topic name, e.g. DeadLetterQueueRecord > >>>>>>> and > >>>>>>>>>> changing the signature to > >>>>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord> > >>>>>>>>>> deadLetterQueueRecords) instead of > >>>>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>>>>> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord > >>>>>>> would > >>>>>>>>>> be something like "class DeadLetterQueueRecord extends > >>>>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords { String > >>>>>>>>>> topic; /* + getter/setter + */ } " > >>>>>>>>>> > >>>>>>>>>> 2. I think the root question here is: should we have one DLQ topic > >>>>>>>>>> or > >>>>>>>>>> multiple DLQ topics by default. This question highly depends on the > >>>>>>>>>> context, but implementing a default implementation to handle > >>>>>>>>>> multiple > >>>>>>>>>> DLQ topics would be opinionated, e.g. how to manage errors in a > >>>>>>>>>> punctuate? > >>>>>>>>>> I think it makes sense to have the default implementation writing > >>>>>>>>>> all > >>>>>>>>>> faulty records to a single DLQ, that's at least the approach I used > >>>>>>> in > >>>>>>>>>> past applications: one DLQ per Kafka Streams application. Of course > >>>>>>>>>> the message format could change in the DLQ e.g. due to the source > >>>>>>>>>> topic, but those DLQ records will be very likely troubleshooted, > >>>>>>>>>> and > >>>>>>>>>> maybe replay, manually anyway. > >>>>>>>>>> If a user needs to have multiple DLQ topics or want to enforce a > >>>>>>>>>> specific schema, it's still possible, but they would need to > >>>>>>> implement > >>>>>>>>>> custom Exception Handlers. > >>>>>>>>>> Coming back to 1. I do agree that it would make sense to have the > >>>>>>> user > >>>>>>>>>> set the DLQ topic name in the handlers for more flexibility. > >>>>>>>>>> > >>>>>>>>>> 3. Good point, sorry it was a typo, the ProcessingContext makes > >>>>>>>>>> much > >>>>>>>>>> more sense here indeed. > >>>>>>>>>> > >>>>>>>>>> 4. I do assume that we could implement KIP-1033 (Processing > >>>>>>>>>> exception > >>>>>>>>>> handler) independently from KIP-1034. I do hope that KIP-1033 would > >>>>>>> be > >>>>>>>>>> adopted and implemented before KIP-1034, but if that's not the > >>>>>>>>>> case, > >>>>>>>>>> we could implement KIP-1034 indepantly and update KIP-1033 to > >>>>>>>>>> include > >>>>>>>>>> the DLQ record afterward (in the same KIP or in a new one if not > >>>>>>>>>> possible). > >>>>>>>>>> > >>>>>>>>>> 5. I think we should be clear that this KIP only covers the DLQ > >>>>>>> record > >>>>>>>>>> produced. > >>>>>>>>>> Everything related to replay messages or recovery plan should be > >>>>>>>>>> considered out-of-scope as it is use-case and error specific. > >>>>>>>>>> > >>>>>>>>>> Let me know if that's not clear, there are definitely points that > >>>>>>>>>> highly debatable. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Damien > >>>>>>>>>> > >>>>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford > >>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Oh, and one more thing: > >>>>>>>>>>> > >>>>>>>>>>> 5. > >>>>>>>>>>> Whenever you take a record out of the stream, and then potentially > >>>>>>>>>>> re-introduce it at a later date, you introduce the potential for > >>>>>>> record > >>>>>>>>>>> ordering issues. For example, that record could have been destined > >>>>>>> for a > >>>>>>>>>>> Window that has been closed by the time it's re-processed. I'd > >>>>>>> like to > >>>>>>>>>> see > >>>>>>>>>>> a section that considers these consequences, and perhaps make > >>>>>>> those risks > >>>>>>>>>>> clear to users. For the record, this is exactly what sunk KIP-990, > >>>>>>> which > >>>>>>>>>>> was an alternative approach to error handling that introduced the > >>>>>>> same > >>>>>>>>>>> issues. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> > >>>>>>>>>>> Nick > >>>>>>>>>>> > >>>>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com > >>>>> <mailto:nick.telf...@gmail.com%0b>> > > > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Damien, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the KIP! Dead-letter queues are something that I > >>>>>>> think a > >>>>>>>>>> lot of > >>>>>>>>>>>> users would like. > >>>>>>>>>>>> > >>>>>>>>>>>> I think there are a few points with this KIP that concern me: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. > >>>>>>>>>>>> It looks like you can only define a single, global DLQ for the > >>>>>>> entire > >>>>>>>>>>>> Kafka Streams application? What about applications that would > >>>>>>> like to > >>>>>>>>>>>> define different DLQs for different data flows? This is > >>>>>>> especially > >>>>>>>>>>>> important when dealing with multiple source topics that have > >>>>>>> different > >>>>>>>>>>>> record schemas. > >>>>>>>>>>>> > >>>>>>>>>>>> 2. > >>>>>>>>>>>> Your DLQ payload value can either be the record value that > >>>>>>> failed, or > >>>>>>>>>> an > >>>>>>>>>>>> error string (such as "error during punctuate"). This is likely > >>>>>>> to > >>>>>>>>>> cause > >>>>>>>>>>>> problems when users try to process the records from the DLQ, as > >>>>>>> they > >>>>>>>>>> can't > >>>>>>>>>>>> guarantee the format of every record value will be the same. > >>>>>>> This is > >>>>>>>>>> very > >>>>>>>>>>>> loosely related to point 1. above. > >>>>>>>>>>>> > >>>>>>>>>>>> 3. > >>>>>>>>>>>> You provide a ProcessorContext to both exception handlers, but > >>>>>>> state > >>>>>>>>>> they > >>>>>>>>>>>> cannot be used to forward records. In that case, I believe you > >>>>>>> should > >>>>>>>>>> use > >>>>>>>>>>>> ProcessingContext instead, which statically guarantees that it > >>>>>>> can't be > >>>>>>>>>>>> used to forward records. > >>>>>>>>>>>> > >>>>>>>>>>>> 4. > >>>>>>>>>>>> You mention the KIP-1033 ProcessingExceptionHandler, but what's > >>>>>>> the > >>>>>>>>>> plan > >>>>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> > >>>>>>>>>>>> Nick > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina < > >>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> In a general way, if the user does not configure the right ACL, > >>>>>>> that > >>>>>>>>>>>>> would be a security issue, but that's true for any topic. > >>>>>>>>>>>>> > >>>>>>>>>>>>> This KIP allows users to configure a Dead Letter Queue without > >>>>>>> writing > >>>>>>>>>>>>> custom Java code in Kafka Streams, not at the topic level. > >>>>>>>>>>>>> A lot of applications are already implementing this pattern, > >>>>>>> but the > >>>>>>>>>>>>> required code to do it is quite painful and error prone, for > >>>>>>> example > >>>>>>>>>>>>> most apps I have seen created a new KafkaProducer to send > >>>>>>> records to > >>>>>>>>>>>>> their DLQ. > >>>>>>>>>>>>> > >>>>>>>>>>>>> As it would be disabled by default for backward compatibility, > >>>>>>> I doubt > >>>>>>>>>>>>> it would generate any security concern. > >>>>>>>>>>>>> If a user explicitly configures a Deal Letter Queue, it would > >>>>>>> be up to > >>>>>>>>>>>>> him to configure the relevant ACLs to ensure that the right > >>>>>>> principal > >>>>>>>>>>>>> can access it. > >>>>>>>>>>>>> It is already the case for all internal, input and output Kafka > >>>>>>>>>>>>> Streams topics (e.g. repartition, changelog topics) that also > >>>>>>> could > >>>>>>>>>>>>> contain confidential data, so I do not think we should > >>>>>>> implement a > >>>>>>>>>>>>> different behavior for this one. > >>>>>>>>>>>>> > >>>>>>>>>>>>> In this KIP, we configured the default DLQ record to have the > >>>>>>> initial > >>>>>>>>>>>>> record key/value as we assume that it is the expected and wanted > >>>>>>>>>>>>> behavior for most applications. > >>>>>>>>>>>>> If a user does not want to have the key/value in the DLQ record > >>>>>>> for > >>>>>>>>>>>>> any reason, they could still implement exception handlers to > >>>>>>> build > >>>>>>>>>>>>> their own DLQ record. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regarding ACL, maybe something smarter could be done in Kafka > >>>>>>> Streams, > >>>>>>>>>>>>> but this is out of scope for this KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren > >>>>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>> > >>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> My concern is that someone would create a dead letter queue > >>>>>>> on a > >>>>>>>>>>>>> sensitive > >>>>>>>>>>>>>> topic and not get the ACL correct from the start. Thus > >>>>>>> causing > >>>>>>>>>>>>> potential > >>>>>>>>>>>>>> confidential data leak. Is there anything in the proposal > >>>>>>> that > >>>>>>>>>> would > >>>>>>>>>>>>>> prevent that from happening? If so I did not recognize it as > >>>>>>> such. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina < > >>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Claude, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In this KIP, the Dead Letter Queue is materialized by a > >>>>>>> standard > >>>>>>>>>> and > >>>>>>>>>>>>>>> independant topic, thus normal ACL applies to it like any > >>>>>>> other > >>>>>>>>>> topic. > >>>>>>>>>>>>>>> This should not introduce any security issues, obviously, > >>>>>>> the > >>>>>>>>>> right > >>>>>>>>>>>>>>> ACL would need to be provided to write to the DLQ if > >>>>>>> configured. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Damien > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > >>>>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I am new to the Kafka codebase so please excuse any > >>>>>>> ignorance > >>>>>>>>>> on my > >>>>>>>>>>>>> part. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> When a dead letter queue is established is there a > >>>>>>> process to > >>>>>>>>>>>>> ensure that > >>>>>>>>>>>>>>>> it at least is defined with the same ACL as the original > >>>>>>> queue? > >>>>>>>>>>>>> Without > >>>>>>>>>>>>>>>> such a guarantee at the start it seems that managing dead > >>>>>>> letter > >>>>>>>>>>>>> queues > >>>>>>>>>>>>>>>> will be fraught with security issues. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < > >>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> To continue on our effort to improve Kafka Streams error > >>>>>>>>>>>>> handling, we > >>>>>>>>>>>>>>>>> propose a new KIP to add out of the box support for Dead > >>>>>>>>>> Letter > >>>>>>>>>>>>> Queue. > >>>>>>>>>>>>>>>>> The goal of this KIP is to provide a default > >>>>>>> implementation > >>>>>>>>>> that > >>>>>>>>>>>>>>>>> should be suitable for most applications and allow > >>>>>>> users to > >>>>>>>>>>>>> override > >>>>>>>>>>>>>>>>> it if they have specific requirements. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> In order to build a suitable payload, some additional > >>>>>>> changes > >>>>>>>>>> are > >>>>>>>>>>>>>>>>> included in this KIP: > >>>>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, when > >>>>>>> available, the > >>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>> node raw key/value byte[] > >>>>>>>>>>>>>>>>> 2. expose the ProcessingContext to the > >>>>>>>>>>>>> ProductionExceptionHandler, > >>>>>>>>>>>>>>>>> it is currently not available in the handle parameters. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regarding point 2., to expose the ProcessingContext to > >>>>>>> the > >>>>>>>>>>>>>>>>> ProductionExceptionHandler, we considered two choices: > >>>>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a parameter in > >>>>>>> the > >>>>>>>>>> handle() > >>>>>>>>>>>>>>>>> method. That's the cleanest way IMHO, but we would need > >>>>>>> to > >>>>>>>>>>>>> deprecate > >>>>>>>>>>>>>>>>> the old method. > >>>>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an attribute in > >>>>>>> the > >>>>>>>>>>>>> interface. > >>>>>>>>>>>>>>>>> This way, no method is deprecated, but we would not be > >>>>>>>>>> consistent > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>> the other ExceptionHandler. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new handle > >>>>>>> signature > >>>>>>>>>> with > >>>>>>>>>>>>> old > >>>>>>>>>>>>>>>>> one deprecated), but we could use other opinions on > >>>>>>> this part. > >>>>>>>>>>>>>>>>> More information is available directly on the KIP. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> KIP link: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Feedbacks and suggestions are welcome, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Damien, Sebastien and Loic > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> LinkedIn: > >>>>>>>>>>>>>> http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>> > >