Hi Bruno, We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, that's why not much progress has been made on this one yet. Regarding your points:
B1: It is up to the user to specify the DLQ topic name and to implement a potential differentiation. I tend to think that having one DLQ per application ID is the wisest, but I encountered cases and applications that preferred having a shared DLQ topic between multiple applications, e.g. to reduce the number of partitions, or to ease monitoring B2 : Goot catch, it should be ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" prefix during the discussion, looks like I forgot to update all occurrences in the KIP. B3 :The trigger for sending to the DLQ would be if ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user implemented a custom exception handler that returns DLQ records. ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the behavior of the default handler, thus custom exception handlers will completely ignore this parameter. I think it's a good trade-off between providing a production-ready default implementation, yet providing sufficient flexibility for complex use-cases. This behavior definitely needs to be documented, but I guess it's safe to push the responsibility of the DLQ records to the user if they implement custom handlers. Cheers, Damien On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna <cado...@apache.org> wrote: > > Hi, > > since there was not too much activity in this thread recently, I was > wondering what the status of this discussion is. > > I cannot find the examples in the KIP Sébastien mentioned in the last > message to this thread. I can also not find the corresponding definition > of the following method call in the KIP: > > FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > > I have also some comments: > > B1 > Did you consider to prefix the dead letter queue topic names with the > application ID to distinguish the topics between Streams apps? Or is the > user responsible for the differentiation? If the user is responsible, we > risk that faulty records of different Streams apps end up in the same > dead letter queue. > > B2 > Is the name of the dead letter queue topic config > DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. > > B3 > What is exactly the trigger to send a record to the dead letter queue? > Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a > record to the return value of the exception handler? > What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do > not add a record to the return value of the handler? What happens if I > do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to > the return value of the handler? > > Best, > Bruno > > On 4/22/24 10:19 PM, Sebastien Viale wrote: > > Hi, > > > > Thanks for your remarks > > > > L1. I would say "who can do the most can do the least", even though most > > people will fail and stop, we found it interesting to offer the possibility > > to fail-and-send-to-DLQ > > > > L2: We did not consider extending the TimestampExtractor because we > > estimate it out of scope for this KIP. Perhaps it will be possible to > > include it in an ExceptionHandler later. > > > > L3: we will include an example in the KIP, but as we mentioned earlier, the > > DLQ topic can be different in each custom Exception Handler: > > > > When providing custom handlers, users would have the possibility to return: > > * FAIL > > * CONTINUE > > * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > > * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > > > > cheers ! > > Sébastien > > > > > > ________________________________ > > De : Lucas Brutschy <lbruts...@confluent.io.INVALID> > > Envoyé : lundi 22 avril 2024 14:36 > > À : dev@kafka.apache.org <dev@kafka.apache.org> > > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > > > > Warning External sender Do not click on any links or open any attachments > > unless you trust the sender and know the content is safe. > > > > Hi! > > > > Thanks for the KIP, great stuff. > > > > L1. I was a bit confused that the default configuration (once you set > > a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood > > correctly. Is this something that will be a common use-case, and is it > > a configuration that we want to encourage? It expected that you either > > want to fail or skip-and-send-to-DLQ. > > > > L2. Have you considered extending the `TimestampExtractor` interface > > so that it can also produce to DLQ? AFAIK it's not covered by any of > > the existing exception handlers, but it can cause similar failures > > (potentially custom logic, depends on validity input record). There > > could also be a default implementation as a subclass of > > `ExtractRecordMetadataTimestamp`. > > > > L3. It would be nice to include an example of how to produce to > > multiple topics in the KIP, as I can imagine that this will be a > > common use-case. I wasn't sure how much code would be involved to make > > it work. If a lot of code is required, we may want to consider > > exposing some utils that make it easier. > > > > Cheers, > > Lucas > > > > This email was screened for spam and malicious content but exercise caution > > anyway. > > > > > > > > On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina <d.gaspar...@gmail.com> > > wrote: > >> > >> Hi everyone, > >> > >> Following all the discussion on this KIP and KIP-1033, we introduced a > >> new container class containing only processing context metadata: > >> ProcessingMetadata. This new container class is actually part of > >> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I > >> think it's the wisest implementation wise. > >> > >> I also clarified the interface of the enums: > >> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], > >> byte[]>> deadLetterQueueRecords) . Very likely most users would just > >> send one DLQ record, but there might be specific use-cases and what > >> can do more can do less, so I added an Iterable. > >> > >> I took some time to think about the impact of storing the > >> ProcessingMetadata on the ProductionExceptionHandler. I think storing > >> the topic/offset/partition should be fine, but I am concerned about > >> storing the rawSourceKey/Value. I think it could impact some specific > >> use-cases, for example, a high-throughput Kafka Streams application > >> "counting" messages could have huge source input messages, and very > >> small sink messages, here, I assume storing the rawSourceKey/Value > >> could significantly require more memory than the actual Kafka Producer > >> buffer. > >> > >> I think the safest approach is actually to only store the fixed-size > >> metadata for the ProductionExceptionHandler.handle: > >> topic/partition/offset/processorNodeId/taskId, it might be confusing > >> for the user, but 1) it is still better than nowaday where there are > >> no context information at all, 2) it would be clearly stated in the > >> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the > >> punctuate case). . > >> > >> Do you think it would be a suitable design Sophie? > >> > >> Cheers, > >> Damien > >> > >> On Sun, 14 Apr 2024 at 21:30, Loic Greffier <loic.greff...@michelin.com> > >> wrote: > >>> > >>> Hi Sophie, > >>> > >>> Thanks for your feedback. > >>> Completing the Damien's comments here for points S1 and S5B. > >>> > >>> S1: > >>>> I'm confused -- are you saying that we're introducing a new kind of > >>>> ProducerRecord class for this? > >>> > >>> I am wondering if it makes sense to alter the ProducerRecord from Clients > >>> API with a "deadLetterQueueTopicName" attribute dedicated to Kafka > >>> Streams DLQ. > >>> Adding "deadLetterQueueTopicName" as an additional parameter to > >>> "withDeadLetterQueueRecord" is a good option, and may allow users to send > >>> records to different DLQ topics depending on conditions: > >>> @Override > >>> public ProductionExceptionHandlerResponse handle(final ProcessingContext > >>> context, > >>> ProducerRecord<byte[], byte[]> record, > >>> Exception exception) { > >>> if (condition1) { > >>> return ProductionExceptionHandlerResponse.CONTINUE > >>> .withDeadLetterQueueRecord(record, "dlq-topic-a"); > >>> } > >>> if (condition2) { > >>> return ProductionExceptionHandlerResponse.CONTINUE > >>> .withDeadLetterQueueRecord(record, "dlq-topic-b"); > >>> } > >>> return ProductionExceptionHandlerResponse.CONTINUE > >>> .withDeadLetterQueueRecord(record, "dlq-topic-c"); > >>> } > >>> > >>> S5B: > >>>> I was having a bit of trouble understanding what the behavior would be > >>>> if someone configured a "errors.deadletterqueue.topic.name" but didn't > >>>> implement the handlers. > >>> > >>> The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler > >>> and DefaultProductionExceptionHandler should be able to tell if records > >>> should be sent to DLQ or not. > >>> The "errors.deadletterqueue.topic.name" takes place to: > >>> > >>> * Specifying if the provided handlers should or should not send records > >>> to DLQ. > >>> * If the value is empty, the handlers should not send records to DLQ. > >>> * If the value is not empty, the handlers should send records to DLQ. > >>> * Define the name of the DLQ topic that should be used by the provided > >>> handlers. > >>> > >>> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided > >>> handlers should return either: > >>> > >>> * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue) > >>> * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue). > >>> If "errors.deadletterqueue.topic.name" is defined but neither > >>> DeserializationExceptionHandler nor ProductionExceptionHandler classes > >>> are defined in the configuration, then nothing should happen as sending > >>> to DLQ is based on handlers’ response. > >>> When providing custom handlers, users would have the possibility to > >>> return: > >>> > >>> * FAIL > >>> * CONTINUE > >>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > >>> > >>> A DLQ topic name is currently required using the two last response types. > >>> I am wondering if it could benefit users to ease the use of the default > >>> DLQ topic "errors.deadletterqueue.topic.name" when implementing custom > >>> handlers, with such kind of implementation: > >>> > >>> * FAIL.withDefaultDeadLetterQueueRecord(record) > >>> * CONTINUE.withDefaultDeadLetterQueueRecord(record) > >>> > >>> Regards, > >>> Loïc > >>> > >>> De : Damien Gasparina <d.gaspar...@gmail.com> > >>> Envoyé : dimanche 14 avril 2024 20:24 > >>> À : dev@kafka.apache.org > >>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >>> > >>> Warning External sender Do not click on any links or open any attachments > >>> unless you trust the sender and know the content is safe. > >>> > >>> Hi Sophie, > >>> > >>> Thanks a lot for your feedback and your detailed comments. > >>> > >>> S1. > >>>> I'm confused -- are you saying that we're introducing a new kind of > >>> ProducerRecord class for this? > >>> > >>> Sorry for the poor wording, that's not what I meant. While writing the > >>> KIP, I was hesitating between 1. leveraging the Kafka Producer > >>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in > >>> a separate parameter, 3. a new custom interface (e.g. > >>> DeadLetterQueueRecord). > >>> As the KafkaProducer ProducerRecord is not used in the Kafka Streams > >>> API (except ProductionExceptionHandler) and I would like to avoid a > >>> new interface if not strictly required, I leaned toward option 2. > >>> Thinking about it, maybe option 1. would be best, but I assume it > >>> could create confusion with KafkaStreams ProducerRecord. Let me sleep > >>> on it. > >>> > >>> S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and > >>> your point in S4, it seems more and more likely that we will create a > >>> new container class containing only the metadata for the exception > >>> handlers. To be consistent, I think we should use this new > >>> implementation in all exception handlers. > >>> The only issue I could think off is that the new interface would > >>> expose less data than the current ProcessorContext in the > >>> DeserializationException(e.g. stateDir(), metrics(), getStateStore()), > >>> thus it could be hard for some users to migrate to the new interface. > >>> I do expect that only a few users would be impacted as the javadoc is > >>> very clear: `Note, that the passed in {@link ProcessorContext} only > >>> allows access to metadata like the task ID.` > >>> > >>> S3. I completely agree with you, it is something that might not be > >>> trivial and should be thoroughly covered by unit tests during the > >>> implementation. > >>> > >>> S4. Good point, I did not notice that the ProductionExceptionHandler > >>> is also invoked in the producer.send() callback. > >>> Capturing the ProcessingContext for each in-flight message is probably > >>> not possible. I think there is no other way to write a custom > >>> container class holding only the metadata that are essentials, I am > >>> thinking of storing the following attributes: source topic, partition, > >>> offset, rawKey, rawValue and taskId. > >>> Those metadata should be relatively small, but I assume that there > >>> could be a high number of in-flight messages, especially with at least > >>> once processing guarantee. Do you think it would be fine memory wise? > >>> > >>> S5. As many exceptions are only accessible in exception handlers, and > >>> we wanted to 1) allow users to customize the DLQ records and 2) have a > >>> suitable DLQ out of the box implementation, we felt it natural to rely > >>> on exception handlers, that's also why we created KIP-1033. > >>> Piggybacking on the enum response was the cleanest way we could think > >>> off, but we are completely open to suggestions. > >>> > >>> S5a. Completely agree with you on this point, for this DLQ approach to > >>> be complete, the ProcessingExceptionHandler introduced in KIP-1033 is > >>> required. KIP-1033 is definitely our first priority. We decided to > >>> kick-off the KIP-1034 discussion as we expected the discussions to be > >>> dynamic and could potentially impact some choices of KIP-1033. > >>> > >>> S5b. In this KIP, we wanted to 1. provide as much flexibility to the > >>> user as possible; 2. provide a good default implementation > >>> for the DLQ without having to write custom exception handlers. > >>> For the default implementation, we introduced a new configuration: > >>> errors.deadletterqueue.topic.name. > >>> > >>> If this configuration is set, it changes the behavior of the provided > >>> exception handlers to return a DLQ record containing the raw key/value > >>> + headers + exception metadata in headers. > >>> If the out of the box implementation is not suitable for a user, e.g. > >>> the payload needs to be masked in the DLQ, it could implement their > >>> own exception handlers. The errors.deadletterqueue.topic.name would > >>> only impact Kafka Streams bundled exception handlers (e.g. > >>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler) > >>> > >>> Let me update the KIP to make it clear and also provide examples. > >>> > >>> S6/S7. Good point, mea culpa for the camel case, it must have been a > >>> sugar rush :) > >>> > >>> Thanks again for your detailed comments and pointing out S4 > >>> (production exception & Processing Context)! > >>> > >>> Cheers, > >>> Damien > >>> This email was screened for spam and malicious content but exercise > >>> caution anyway. > >>> > >>> > >>> > >>> > >>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman > >>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote: > >>>> > >>>> Thanks for the KIP, this will make a lot of people very happy. > >>>> > >>>> Wanted to chime in on a few points that have been raised so far and add > >>>> some of my own (numbering with an S to distinguish my points from the > >>>> previous ones) > >>>> > >>>> S1. > >>>> > >>>>> 1.a I really meant ProducerRecord, that's the class used to forward to > >>>>> downstream processors in the PAPI. The only information missing in > >>>>> this class is the topic name. I also considered relying on the Kafka > >>>>> Producer ProducerRecord, but I assume it would not be consistent with > >>>>> the KafkaStreams API. > >>>> > >>>> I'm confused -- are you saying that we're introducing a new kind of > >>>> ProducerRecord class for this? Why not just use the existing one, ie the > >>>> o.a.k.clients.producer.ProducerRecord class? This is what the > >>>> ProductionExceptionHandler uses, so it's definitely "consistent". In > >>>> other > >>>> words, we can remove the "String deadLetterQueueTopicName" > >>>> > >>>> S2. > >>>> I think this would be a good opportunity to also deprecate the existing > >>>> #handle method of the DeserializationExceptionHandler, and replace it > >>>> with > >>>> one that uses a ProcessingContext instead of the ProcessorContext. Partly > >>>> for the same reasons about guarding access to the #forward methods, > >>>> partly > >>>> because this method needs to be migrated to the new PAPI interface > >>>> anyways, and ProcessingContext is part of the new one. > >>>> > >>>> S3. > >>>> Regarding 2a. -- I'm inclined to agree that records which a Punctuator > >>>> failed to produce should also be sent to the DLQ via the > >>>> ProductionExceptionHandler. Users will just need to be careful about > >>>> accessing certain fields of the ProcessingContext that aren't available > >>>> in > >>>> the punctuator, and need to check the Optional returned by the > >>>> ProcessingContext#recordMetadata API. > >>>> Also, from an implementation standpoint, it will be really hard to > >>>> distinguish between a record created by a punctuator vs a processor from > >>>> within the RecordCollector, which is the class that actually handles > >>>> sending records to the Streams Producer and invoking the > >>>> ProductionExceptionHandler. This is because the RecordCollector is at the > >>>> "end" of the topology graph and doesn't have any context about which of > >>>> the > >>>> upstream processors actually attempted to forward a record. > >>>> > >>>> This in itself is at least theoretically solvable, but it leads into my > >>>> first major new point: > >>>> > >>>> S4: > >>>> I'm deeply worried about passing the ProcessingContext in as a means of > >>>> forwarding metadata. The problem is that the processing/processor context > >>>> is a mutable class and is inherently meaningless outside the context of a > >>>> specific task. And when I said earlier that the RecordCollector sits at > >>>> the "end" of the topology, I meant that it's literally outside the task's > >>>> subtopology and is used/shared by all tasks on that StreamThread. So to > >>>> begin with, there's no guarantee what will actually be returned for > >>>> essential methods such as the new #rawSourceKey/Value or the existing > >>>> #recordMetadata > >>>> > >>>> For serialization exceptions it'll probably be correct, but for general > >>>> send errors it almost definitely won't be. In short, this is because we > >>>> send records to the producer after the sink node, but don't check for > >>>> send > >>>> errors right away since obviously it takes some time for the producer to > >>>> actually send. In other words, sending/producing records is actually done > >>>> asynchronously with processing, and we simply check for errors on any > >>>> previously-sent records > >>>> during the send on a new record in a sink node. This means the context we > >>>> would be passing in to a (non-serialization) exception would pretty much > >>>> always correspond not the the record that experienced the error, but the > >>>> random record that happened to be being sent when we checked and saw the > >>>> error for the failed record. > >>>> > >>>> This discrepancy, in addition to the whole "sourceRawKey/Value and > >>>> recordMetadata are null for punctuators" issue, seems like an > >>>> insurmountable inconsistency that is more likely to cause users confusion > >>>> or problems than be helpful. > >>>> We could create a new metadata object and copy over the relevant info > >>>> from > >>>> the ProcessingContext, but I worry that has the potential to explode > >>>> memory > >>>> since we'd need to hold on to it for all in-flight records up until they > >>>> are either successfully sent or failed and passed in to the > >>>> ProductionExceptionHandler. But if the metadata is relatively small, it's > >>>> probably fine. Especially if it's just the raw source key/value. Are > >>>> there any other parts of the ProcessingContext you think should be made > >>>> available? > >>>> > >>>> Note that this only applies to the ProductionExceptionHandler, as the > >>>> DeserializationExceptionHandler (and the newly proposed > >>>> ProcessingExceptionHandler) would both be invoked immediately and > >>>> therefore > >>>> with the failed record's context. However, I'm also a bit uncomfortable > >>>> with adding the rawSourceKey/rawSourceValue to the ProcessingContext. So > >>>> I'd propose to just wrap those (and any other metadata you might want) > >>>> in a > >>>> container class and pass that in instead of the ProcessingContext, to all > >>>> of the exception handlers. > >>>> > >>>> S5: > >>>> For some reason I'm finding the proposed API a little bit awkward, > >>>> although > >>>> it's entirely possible that the problem is with me, not the proposal :) > >>>> Specifically I'm struggling with the approach of piggybacking on the > >>>> exception handlers and their response enums to dictate how records are > >>>> forwarded to the DLQ. I think this comes down to two things, though > >>>> again, > >>>> these aren't necessarily problems with the API and probably just need to > >>>> be > >>>> hashed out: > >>>> > >>>> S5a. > >>>> When I envision a DLQ, to me, the most common use case would be to > >>>> forward > >>>> input records that failed somewhere along the processing graph. But it > >>>> seems like all the focus here is on the two far ends of the subtopology > >>>> -- > >>>> the input/consumer, and the output/producer. I get that > >>>> the ProcessingExceptionHandler is really the missing piece here, and it's > >>>> hard to say anything specific since it's not yet accepted, but maybe a > >>>> somewhat more concrete example would help. FWIW I think/hope to get that > >>>> KIP accepted and implementation ASAP, so I'm not worried about the "what > >>>> if > >>>> it doesn't happen" case -- more just want to know what it will look like > >>>> when it does. Imo it's fine to build KIPs on top of future ones, it feels > >>>> clear that this part will just have to wait for that KIP to actually be > >>>> added. > >>>> > >>>> S5b: > >>>> Why do users have to define the entire ProducerRecord -- shouldn't > >>>> Streams > >>>> handle all this for them? Or will we just automatically send every record > >>>> on failure to the default global DLQ, and users only have to implement > >>>> the > >>>> handlers if they want to change the headers or send to a different > >>>> topic? I > >>>> was having a bit of trouble understanding what the behavior would be if > >>>> someone configured a "errors.deadletterqueue.topic.name" but didn't > >>>> implement the handlers. Apologies if it's somewhere in the KIP and I > >>>> happened to miss it! > >>>> > >>>> Either way, I really think an example would help me to better imagine > >>>> what > >>>> this will look like in practice, and evaluate whether it actually > >>>> involves > >>>> as much overhead as I'm worried it will. Can you add a section that > >>>> includes a basic implementation of all the features here? Nothing too > >>>> complicated, just the most bare-bones code needed to actually implement > >>>> forwarding to a dead-letter-queue via the handlers. > >>>> > >>>> Lastly, two super small things: > >>>> > >>>> S6: > >>>> We use camel case in Streams, so it should be rawSourceKey/Value rather > >>>> than raw_source_key/value > >>>> > >>>> S7: > >>>> Can you add javadocs for the #withDeadLetterQueueRecord? For example, it > >>>> seems to me that if the topic to be sent to here is different than the > >>>> default/global DLQ, then the user will need to make sure to have created > >>>> this themselves up front. > >>>> > >>>> That's it from me...sorry for the long response, it's just because I'm > >>>> excited for this feature and have been waiting on a KIP for this for > >>>> years. > >>>> > >>>> Cheers, > >>>> Sophie > >>>> > >>>> > >>>> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina > >>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>> wrote: > >>>> > >>>>> Hi Andrew, > >>>>> > >>>>> Thanks a lot for your review, plenty of good points! > >>>>> > >>>>> 11. Typo fixed, good cach. > >>>>> > >>>>> 12. I do agree with you and Nick also mentioned it, I updated the KIP > >>>>> to mention that context headers should be forwarded. > >>>>> > >>>>> 13. Good catch, to be consistent with KIP-298, and without a strong > >>>>> opinion from my side, I updated the KIP with your prefix proposal. > >>>>> > >>>>> 14. I am not sure about this point, a big difference between KIP-298 > >>>>> and this one is that the handlers can easily be overridden, something > >>>>> that is not doable in Kafka Connect. > >>>>> If someone would like a different behavior, e.g. to mask the payload > >>>>> or include further headers, I think we should encourage them to write > >>>>> their own exception handlers to build the DLQ Record the way they > >>>>> expect. > >>>>> > >>>>> 15. Yeah, that's a good point, I was not fully convinced about putting > >>>>> a String in it, I do assume that "null" is also a valid value. I do > >>>>> assume that the Stacktrace and the Exception in this case are the key > >>>>> metadata for the user to troubleshoot the problem. > >>>>> I updated the KIP to mention that the value should be null if > >>>>> triggered in a punctuate. > >>>>> > >>>>> 16. I added a session to mention that Kafka Streams would not try to > >>>>> automatically create the topic and the topic should either be > >>>>> automatically created, or pre-created. > >>>>> > >>>>> 17. If a DLQ record can not be sent, the exception should go to the > >>>>> uncaughtExceptionHandler. Let me clearly state it in the KIP. > >>>>> > >>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina > >>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>> wrote: > >>>>>> > >>>>>> Hi Nick, > >>>>>> > >>>>>> 1. Good point, that's less impactful than a custom interface, I just > >>>>>> updated the KIP with the new signature. > >>>>>> > >>>>>> 1.a I really meant ProducerRecord, that's the class used to forward to > >>>>>> downstream processors in the PAPI. The only information missing in > >>>>>> this class is the topic name. I also considered relying on the Kafka > >>>>>> Producer ProducerRecord, but I assume it would not be consistent with > >>>>>> the KafkaStreams API. > >>>>>> > >>>>>> 2. Agreed > >>>>>> > >>>>>> 2.a I do think exceptions occurring during punctuate should be > >>>>>> included in the DLQ. > >>>>>> Even if building a suitable payload is almost impossible, even with > >>>>>> custom code; those exceptions are still fatal for Kafka Streams by > >>>>>> default and are something that can not be ignored safely. > >>>>>> I do assume that most users would want to be informed if an error > >>>>>> happened during a punctuate, even if only the metadata (e.g. > >>>>>> stacktrace, exception) is provided. > >>>>>> I am only concerned flooding the DLQ topic as, if a scheduled > >>>>>> operation failed, very likely it will fails during the next > >>>>>> invocation, but > >>>>>> > >>>>>> 4. Good point, I clarified the wording in the KIP to make it explicit. > >>>>>> > >>>>>> 5. Good point, I will clearly mention that it is out of scope as part > >>>>>> of the KIP and might not be as trivial as people could expect. I will > >>>>>> update the KIP once I do have some spare time. > >>>>>> > >>>>>> 6. Oh yeah, I didn't think about it, but forwarding input headers > >>>>>> would definitely make sense. Confluent Schema Registry ID is actually > >>>>>> part of the payload, but many correlation ID and technical metadata > >>>>>> are passed through headers, it makes sense to forward them, specially > >>>>>> as it is the default behavior of Kafka Streams, > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford > >>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>> wrote: > >>>>>>> > >>>>>>> Hi Damien and Sebastien, > >>>>>>> > >>>>>>> 1. > >>>>>>> I think you can just add a `String topic` argument to the existing > >>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>> deadLetterQueueRecord)` method, and then the implementation of the > >>>>>>> exception handler could choose the topic to send records to using > >>>>> whatever > >>>>>>> logic the user desires. You could perhaps provide a built-in > >>>>> implementation > >>>>>>> that leverages your new config to send all records to an untyped DLQ > >>>>> topic? > >>>>>>> > >>>>>>> 1a. > >>>>>>> BTW you have a typo: in your DeserializationExceptionHandler, the type > >>>>> of > >>>>>>> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it > >>>>> should > >>>>>>> probably be `ConsumerRecord`. > >>>>>>> > >>>>>>> 2. > >>>>>>> Agreed. I think it's a good idea to provide an implementation that > >>>>> sends to > >>>>>>> a single DLQ by default, but it's important to enable users to > >>>>> customize > >>>>>>> this with their own exception handlers. > >>>>>>> > >>>>>>> 2a. > >>>>>>> I'm not convinced that "errors" (e.g. failed punctuate) should be sent > >>>>> to a > >>>>>>> DLQ topic like it's a bad record. To me, a DLQ should only contain > >>>>> records > >>>>>>> that failed to process. I'm not even sure how a user would > >>>>>>> re-process/action one of these other errors; it seems like the purview > >>>>> of > >>>>>>> error logging to me? > >>>>>>> > >>>>>>> 4. > >>>>>>> My point here was that I think it would be useful for the KIP to > >>>>> contain an > >>>>>>> explanation of the behavior both with KIP-1033 and without it. i.e. > >>>>> clarify > >>>>>>> if/how records that throw an exception in a processor are handled. At > >>>>> the > >>>>>>> moment, I'm assuming that without KIP-1033, processing exceptions > >>>>> would not > >>>>>>> cause records to be sent to the DLQ, but with KIP-1033, they would. If > >>>>> this > >>>>>>> assumption is correct, I think it should be made explicit in the KIP. > >>>>>>> > >>>>>>> 5. > >>>>>>> Understood. You may want to make this explicit in the documentation > >>>>>>> for > >>>>>>> users, so they understand the consequences of re-processing data sent > >>>>> to > >>>>>>> their DLQ. The main reason I raised this point is it's something > >>>>>>> that's > >>>>>>> tripped me up in numerous KIPs that that committers frequently remind > >>>>> me > >>>>>>> of; so I wanted to get ahead of it for once! :D > >>>>>>> > >>>>>>> And one new point: > >>>>>>> 6. > >>>>>>> The DLQ record schema appears to discard all custom headers set on the > >>>>>>> source record. Is there a way these can be included? In particular, > >>>>>>> I'm > >>>>>>> concerned with "schema pointer" headers (like those set by Schema > >>>>>>> Registry), that may need to be propagated, especially if the records > >>>>> are > >>>>>>> fed back into the source topics for re-processing by the user. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Nick > >>>>>>> > >>>>>>> > >>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina > >>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Nick, > >>>>>>>> > >>>>>>>> Thanks a lot for your review and your useful comments! > >>>>>>>> > >>>>>>>> 1. It is a good point, as you mentioned, I think it would make sense > >>>>>>>> in some use cases to have potentially multiple DLQ topics, so we > >>>>>>>> should provide an API to let users do it. > >>>>>>>> Thinking out-loud here, maybe it is a better approach to create a new > >>>>>>>> Record class containing the topic name, e.g. DeadLetterQueueRecord > >>>>> and > >>>>>>>> changing the signature to > >>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord> > >>>>>>>> deadLetterQueueRecords) instead of > >>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>>> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord > >>>>> would > >>>>>>>> be something like "class DeadLetterQueueRecord extends > >>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords { String > >>>>>>>> topic; /* + getter/setter + */ } " > >>>>>>>> > >>>>>>>> 2. I think the root question here is: should we have one DLQ topic or > >>>>>>>> multiple DLQ topics by default. This question highly depends on the > >>>>>>>> context, but implementing a default implementation to handle multiple > >>>>>>>> DLQ topics would be opinionated, e.g. how to manage errors in a > >>>>>>>> punctuate? > >>>>>>>> I think it makes sense to have the default implementation writing all > >>>>>>>> faulty records to a single DLQ, that's at least the approach I used > >>>>> in > >>>>>>>> past applications: one DLQ per Kafka Streams application. Of course > >>>>>>>> the message format could change in the DLQ e.g. due to the source > >>>>>>>> topic, but those DLQ records will be very likely troubleshooted, and > >>>>>>>> maybe replay, manually anyway. > >>>>>>>> If a user needs to have multiple DLQ topics or want to enforce a > >>>>>>>> specific schema, it's still possible, but they would need to > >>>>> implement > >>>>>>>> custom Exception Handlers. > >>>>>>>> Coming back to 1. I do agree that it would make sense to have the > >>>>> user > >>>>>>>> set the DLQ topic name in the handlers for more flexibility. > >>>>>>>> > >>>>>>>> 3. Good point, sorry it was a typo, the ProcessingContext makes much > >>>>>>>> more sense here indeed. > >>>>>>>> > >>>>>>>> 4. I do assume that we could implement KIP-1033 (Processing exception > >>>>>>>> handler) independently from KIP-1034. I do hope that KIP-1033 would > >>>>> be > >>>>>>>> adopted and implemented before KIP-1034, but if that's not the case, > >>>>>>>> we could implement KIP-1034 indepantly and update KIP-1033 to include > >>>>>>>> the DLQ record afterward (in the same KIP or in a new one if not > >>>>>>>> possible). > >>>>>>>> > >>>>>>>> 5. I think we should be clear that this KIP only covers the DLQ > >>>>> record > >>>>>>>> produced. > >>>>>>>> Everything related to replay messages or recovery plan should be > >>>>>>>> considered out-of-scope as it is use-case and error specific. > >>>>>>>> > >>>>>>>> Let me know if that's not clear, there are definitely points that > >>>>>>>> highly debatable. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Damien > >>>>>>>> > >>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford > >>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>> wrote: > >>>>>>>>> > >>>>>>>>> Oh, and one more thing: > >>>>>>>>> > >>>>>>>>> 5. > >>>>>>>>> Whenever you take a record out of the stream, and then potentially > >>>>>>>>> re-introduce it at a later date, you introduce the potential for > >>>>> record > >>>>>>>>> ordering issues. For example, that record could have been destined > >>>>> for a > >>>>>>>>> Window that has been closed by the time it's re-processed. I'd > >>>>> like to > >>>>>>>> see > >>>>>>>>> a section that considers these consequences, and perhaps make > >>>>> those risks > >>>>>>>>> clear to users. For the record, this is exactly what sunk KIP-990, > >>>>> which > >>>>>>>>> was an alternative approach to error handling that introduced the > >>>>> same > >>>>>>>>> issues. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> > >>>>>>>>> Nick > >>>>>>>>> > >>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com > >>> <mailto:nick.telf...@gmail.com%0b>> > > > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Damien, > >>>>>>>>>> > >>>>>>>>>> Thanks for the KIP! Dead-letter queues are something that I > >>>>> think a > >>>>>>>> lot of > >>>>>>>>>> users would like. > >>>>>>>>>> > >>>>>>>>>> I think there are a few points with this KIP that concern me: > >>>>>>>>>> > >>>>>>>>>> 1. > >>>>>>>>>> It looks like you can only define a single, global DLQ for the > >>>>> entire > >>>>>>>>>> Kafka Streams application? What about applications that would > >>>>> like to > >>>>>>>>>> define different DLQs for different data flows? This is > >>>>> especially > >>>>>>>>>> important when dealing with multiple source topics that have > >>>>> different > >>>>>>>>>> record schemas. > >>>>>>>>>> > >>>>>>>>>> 2. > >>>>>>>>>> Your DLQ payload value can either be the record value that > >>>>> failed, or > >>>>>>>> an > >>>>>>>>>> error string (such as "error during punctuate"). This is likely > >>>>> to > >>>>>>>> cause > >>>>>>>>>> problems when users try to process the records from the DLQ, as > >>>>> they > >>>>>>>> can't > >>>>>>>>>> guarantee the format of every record value will be the same. > >>>>> This is > >>>>>>>> very > >>>>>>>>>> loosely related to point 1. above. > >>>>>>>>>> > >>>>>>>>>> 3. > >>>>>>>>>> You provide a ProcessorContext to both exception handlers, but > >>>>> state > >>>>>>>> they > >>>>>>>>>> cannot be used to forward records. In that case, I believe you > >>>>> should > >>>>>>>> use > >>>>>>>>>> ProcessingContext instead, which statically guarantees that it > >>>>> can't be > >>>>>>>>>> used to forward records. > >>>>>>>>>> > >>>>>>>>>> 4. > >>>>>>>>>> You mention the KIP-1033 ProcessingExceptionHandler, but what's > >>>>> the > >>>>>>>> plan > >>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> > >>>>>>>>>> Nick > >>>>>>>>>> > >>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina < > >>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> In a general way, if the user does not configure the right ACL, > >>>>> that > >>>>>>>>>>> would be a security issue, but that's true for any topic. > >>>>>>>>>>> > >>>>>>>>>>> This KIP allows users to configure a Dead Letter Queue without > >>>>> writing > >>>>>>>>>>> custom Java code in Kafka Streams, not at the topic level. > >>>>>>>>>>> A lot of applications are already implementing this pattern, > >>>>> but the > >>>>>>>>>>> required code to do it is quite painful and error prone, for > >>>>> example > >>>>>>>>>>> most apps I have seen created a new KafkaProducer to send > >>>>> records to > >>>>>>>>>>> their DLQ. > >>>>>>>>>>> > >>>>>>>>>>> As it would be disabled by default for backward compatibility, > >>>>> I doubt > >>>>>>>>>>> it would generate any security concern. > >>>>>>>>>>> If a user explicitly configures a Deal Letter Queue, it would > >>>>> be up to > >>>>>>>>>>> him to configure the relevant ACLs to ensure that the right > >>>>> principal > >>>>>>>>>>> can access it. > >>>>>>>>>>> It is already the case for all internal, input and output Kafka > >>>>>>>>>>> Streams topics (e.g. repartition, changelog topics) that also > >>>>> could > >>>>>>>>>>> contain confidential data, so I do not think we should > >>>>> implement a > >>>>>>>>>>> different behavior for this one. > >>>>>>>>>>> > >>>>>>>>>>> In this KIP, we configured the default DLQ record to have the > >>>>> initial > >>>>>>>>>>> record key/value as we assume that it is the expected and wanted > >>>>>>>>>>> behavior for most applications. > >>>>>>>>>>> If a user does not want to have the key/value in the DLQ record > >>>>> for > >>>>>>>>>>> any reason, they could still implement exception handlers to > >>>>> build > >>>>>>>>>>> their own DLQ record. > >>>>>>>>>>> > >>>>>>>>>>> Regarding ACL, maybe something smarter could be done in Kafka > >>>>> Streams, > >>>>>>>>>>> but this is out of scope for this KIP. > >>>>>>>>>>> > >>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren > >>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>> > >>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> My concern is that someone would create a dead letter queue > >>>>> on a > >>>>>>>>>>> sensitive > >>>>>>>>>>>> topic and not get the ACL correct from the start. Thus > >>>>> causing > >>>>>>>>>>> potential > >>>>>>>>>>>> confidential data leak. Is there anything in the proposal > >>>>> that > >>>>>>>> would > >>>>>>>>>>>> prevent that from happening? If so I did not recognize it as > >>>>> such. > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina < > >>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Claude, > >>>>>>>>>>>>> > >>>>>>>>>>>>> In this KIP, the Dead Letter Queue is materialized by a > >>>>> standard > >>>>>>>> and > >>>>>>>>>>>>> independant topic, thus normal ACL applies to it like any > >>>>> other > >>>>>>>> topic. > >>>>>>>>>>>>> This should not introduce any security issues, obviously, > >>>>> the > >>>>>>>> right > >>>>>>>>>>>>> ACL would need to be provided to write to the DLQ if > >>>>> configured. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> Damien > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > >>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I am new to the Kafka codebase so please excuse any > >>>>> ignorance > >>>>>>>> on my > >>>>>>>>>>> part. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> When a dead letter queue is established is there a > >>>>> process to > >>>>>>>>>>> ensure that > >>>>>>>>>>>>>> it at least is defined with the same ACL as the original > >>>>> queue? > >>>>>>>>>>> Without > >>>>>>>>>>>>>> such a guarantee at the start it seems that managing dead > >>>>> letter > >>>>>>>>>>> queues > >>>>>>>>>>>>>> will be fraught with security issues. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < > >>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> To continue on our effort to improve Kafka Streams error > >>>>>>>>>>> handling, we > >>>>>>>>>>>>>>> propose a new KIP to add out of the box support for Dead > >>>>>>>> Letter > >>>>>>>>>>> Queue. > >>>>>>>>>>>>>>> The goal of this KIP is to provide a default > >>>>> implementation > >>>>>>>> that > >>>>>>>>>>>>>>> should be suitable for most applications and allow > >>>>> users to > >>>>>>>>>>> override > >>>>>>>>>>>>>>> it if they have specific requirements. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In order to build a suitable payload, some additional > >>>>> changes > >>>>>>>> are > >>>>>>>>>>>>>>> included in this KIP: > >>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, when > >>>>> available, the > >>>>>>>>>>> source > >>>>>>>>>>>>>>> node raw key/value byte[] > >>>>>>>>>>>>>>> 2. expose the ProcessingContext to the > >>>>>>>>>>> ProductionExceptionHandler, > >>>>>>>>>>>>>>> it is currently not available in the handle parameters. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regarding point 2., to expose the ProcessingContext to > >>>>> the > >>>>>>>>>>>>>>> ProductionExceptionHandler, we considered two choices: > >>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a parameter in > >>>>> the > >>>>>>>> handle() > >>>>>>>>>>>>>>> method. That's the cleanest way IMHO, but we would need > >>>>> to > >>>>>>>>>>> deprecate > >>>>>>>>>>>>>>> the old method. > >>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an attribute in > >>>>> the > >>>>>>>>>>> interface. > >>>>>>>>>>>>>>> This way, no method is deprecated, but we would not be > >>>>>>>> consistent > >>>>>>>>>>> with > >>>>>>>>>>>>>>> the other ExceptionHandler. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new handle > >>>>> signature > >>>>>>>> with > >>>>>>>>>>> old > >>>>>>>>>>>>>>> one deprecated), but we could use other opinions on > >>>>> this part. > >>>>>>>>>>>>>>> More information is available directly on the KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> KIP link: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Feedbacks and suggestions are welcome, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Damien, Sebastien and Loic > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> LinkedIn: > >>>>>>>>>>>> http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>> > >>