Hi Bruno, Thanks a lot for your comments!
> BC1 > Do you also plan to add an argument to the StreamsResetter tool to > delete the default DLQ topic when a Streams app is reset? Good point, I did not think about the StreamsResetter tool. Thinking out loud, I am not sure if it is a good idea to add an option to clean them up. In my opinion DLQ topics should be viewed as a sink topic and AFAIK, this tool does not clean up sink topics. > BC2 In case of a custom exception handlers, they can get the errors.deadletterqueue.topic.name configuration by overriding `void configure(Map<String, ?> configs);`. As it is the location where all the configuration can be accessed, I think it's the best way no? I will think about it a bit further, it might be useful/convenient for users. > BC3 The "with" syntax is used in many locations in the Kafka Streams public classes, that's why I used it, e.g. Materialized.with(), Consumed.withKeySerde(...).withValueSerde(...), Grouped.withName(...). I do agree that .andAddToDeadLetterQueue is more intuitive, but I would argue that being consistent is better in this situation. What do you think? Cheers, Damien On Tue, 3 Sept 2024 at 14:59, Bruno Cadonna <cado...@apache.org> wrote: > > Hi, > > Thanks for the updates! > > I have a couple of comments. > > BC1 > Do you also plan to add an argument to the StreamsResetter tool to > delete the default DLQ topic when a Streams app is reset? > > > BC2 > Would it make sense to add errors.deadletterqueue.topic.name to the > ErrorHandlerContext in case that a custom exception handler wants to > write to the configured DLQ topic? > For example if only one of the handler needs to be customized but all > handlers should write to the configured DLQ topic. > > > BC3 > What do you think about renaming withDeadLetterQueueRecords() to > andAddToDeadLetterQueue()? > A customized handler would look like > > public ProcessingHandlerResponse handle( > final ErrorHandlerContext context, > final Record<?, ?> record, > final Exception exception > ) { > return ProcessingHandlerResponse.CONTINUE > .andAddToDeadLetterQueue( > Collections.singletonList(new ProducerRecord<>( > "app-dlq", > "Hello".getBytes(StandardCharsets.UTF_8), > "World".getBytes(StandardCharsets.UTF_8) > )) > ); > } > > I think the code becomes more readable. > > > Best, > Bruno > > On 8/30/24 3:37 PM, Damien Gasparina wrote: > > Hi everyone, > > > > We just updated KIP-1034, we changed the following: > > - We included the ProcessingExceptionHandler (KIP-1033) directly in the > > KIP; > > - We provided examples to clarify the new configuration, and how it > > could be leveraged. > > > > I think we can resume the conversation on this KIP. > > > > Cheers, > > Damien Sebastien and Loic > > > > > > On Tue, 27 Aug 2024 at 15:37, Sebastien Viale > > <sebastien.vi...@michelin.com> wrote: > >> > >> > >> Hi Bruno, > >> > >> We have planned a meeting for next friday to discuss it with Loic and > >> Damien. > >> > >> We will be able to restart discussions about it soon. > >> > >> regards > >> > >> ________________________________ > >> De : Bruno Cadonna <cado...@apache.org> > >> Envoyé : lundi 26 août 2024 11:32 > >> À : dev@kafka.apache.org <dev@kafka.apache.org> > >> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >> > >> Warning External sender Do not click on any links or open any attachments > >> unless you trust the sender and know the content is safe. > >> > >> Hi Loïc, Sebastien, and Damien, > >> > >> Now that KIP-1033 is going to be released in 3.9, what is the plan to > >> progress with this KIP? > >> > >> Is the KIP up-to-date, so that we can restart discussion? > >> > >> Best, > >> Bruno > >> > >> This email was screened for spam and malicious content but exercise > >> caution anyway. > >> > >> > >> > >> > >> On 6/13/24 6:16 PM, Damien Gasparina wrote: > >>> Hi Bruno, > >>> > >>> We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, > >>> that's why not much progress has been made on this one yet. > >>> Regarding your points: > >>> > >>> B1: It is up to the user to specify the DLQ topic name and to > >>> implement a potential differentiation. I tend to think that having one > >>> DLQ per application ID is the wisest, but I encountered cases and > >>> applications that preferred having a shared DLQ topic between multiple > >>> applications, e.g. to reduce the number of partitions, or to ease > >>> monitoring > >>> > >>> B2 : Goot catch, it should be > >>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" > >>> prefix during the discussion, looks like I forgot to update all > >>> occurrences in the KIP. > >>> > >>> B3 :The trigger for sending to the DLQ would be if > >>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user > >>> implemented a custom exception handler that returns DLQ records. > >>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the > >>> behavior of the default handler, thus custom exception handlers will > >>> completely ignore this parameter. > >>> > >>> I think it's a good trade-off between providing a production-ready > >>> default implementation, yet providing sufficient flexibility for > >>> complex use-cases. > >>> This behavior definitely needs to be documented, but I guess it's safe > >>> to push the responsibility of the DLQ records to the user if they > >>> implement custom handlers. > >>> > >>> Cheers, > >>> Damien > >>> > >>> > >>> On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna <cado...@apache.org> wrote: > >>>> > >>>> Hi, > >>>> > >>>> since there was not too much activity in this thread recently, I was > >>>> wondering what the status of this discussion is. > >>>> > >>>> I cannot find the examples in the KIP Sébastien mentioned in the last > >>>> message to this thread. I can also not find the corresponding definition > >>>> of the following method call in the KIP: > >>>> > >>>> FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>>> > >>>> I have also some comments: > >>>> > >>>> B1 > >>>> Did you consider to prefix the dead letter queue topic names with the > >>>> application ID to distinguish the topics between Streams apps? Or is the > >>>> user responsible for the differentiation? If the user is responsible, we > >>>> risk that faulty records of different Streams apps end up in the same > >>>> dead letter queue. > >>>> > >>>> B2 > >>>> Is the name of the dead letter queue topic config > >>>> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or > >>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. > >>>> > >>>> B3 > >>>> What is exactly the trigger to send a record to the dead letter queue? > >>>> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a > >>>> record to the return value of the exception handler? > >>>> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do > >>>> not add a record to the return value of the handler? What happens if I > >>>> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to > >>>> the return value of the handler? > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On 4/22/24 10:19 PM, Sebastien Viale wrote: > >>>>> Hi, > >>>>> > >>>>> Thanks for your remarks > >>>>> > >>>>> L1. I would say "who can do the most can do the least", even though > >>>>> most people will fail and stop, we found it interesting to offer the > >>>>> possibility to fail-and-send-to-DLQ > >>>>> > >>>>> L2: We did not consider extending the TimestampExtractor because we > >>>>> estimate it out of scope for this KIP. Perhaps it will be possible to > >>>>> include it in an ExceptionHandler later. > >>>>> > >>>>> L3: we will include an example in the KIP, but as we mentioned earlier, > >>>>> the DLQ topic can be different in each custom Exception Handler: > >>>>> > >>>>> When providing custom handlers, users would have the possibility to > >>>>> return: > >>>>> * FAIL > >>>>> * CONTINUE > >>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>> > >>>>> cheers ! > >>>>> Sébastien > >>>>> > >>>>> > >>>>> ________________________________ > >>>>> De : Lucas Brutschy <lbruts...@confluent.io.INVALID> > >>>>> Envoyé : lundi 22 avril 2024 14:36 > >>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> > >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >>>>> > >>>>> Warning External sender Do not click on any links or open any > >>>>> attachments unless you trust the sender and know the content is safe. > >>>>> > >>>>> Hi! > >>>>> > >>>>> Thanks for the KIP, great stuff. > >>>>> > >>>>> L1. I was a bit confused that the default configuration (once you set > >>>>> a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood > >>>>> correctly. Is this something that will be a common use-case, and is it > >>>>> a configuration that we want to encourage? It expected that you either > >>>>> want to fail or skip-and-send-to-DLQ. > >>>>> > >>>>> L2. Have you considered extending the `TimestampExtractor` interface > >>>>> so that it can also produce to DLQ? AFAIK it's not covered by any of > >>>>> the existing exception handlers, but it can cause similar failures > >>>>> (potentially custom logic, depends on validity input record). There > >>>>> could also be a default implementation as a subclass of > >>>>> `ExtractRecordMetadataTimestamp`. > >>>>> > >>>>> L3. It would be nice to include an example of how to produce to > >>>>> multiple topics in the KIP, as I can imagine that this will be a > >>>>> common use-case. I wasn't sure how much code would be involved to make > >>>>> it work. If a lot of code is required, we may want to consider > >>>>> exposing some utils that make it easier. > >>>>> > >>>>> Cheers, > >>>>> Lucas > >>>>> > >>>>> This email was screened for spam and malicious content but exercise > >>>>> caution anyway. > >>>>> > >>>>> > >>>>> > >>>>> On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina > >>>>> <d.gaspar...@gmail.com> wrote: > >>>>>> > >>>>>> Hi everyone, > >>>>>> > >>>>>> Following all the discussion on this KIP and KIP-1033, we introduced a > >>>>>> new container class containing only processing context metadata: > >>>>>> ProcessingMetadata. This new container class is actually part of > >>>>>> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I > >>>>>> think it's the wisest implementation wise. > >>>>>> > >>>>>> I also clarified the interface of the enums: > >>>>>> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], > >>>>>> byte[]>> deadLetterQueueRecords) . Very likely most users would just > >>>>>> send one DLQ record, but there might be specific use-cases and what > >>>>>> can do more can do less, so I added an Iterable. > >>>>>> > >>>>>> I took some time to think about the impact of storing the > >>>>>> ProcessingMetadata on the ProductionExceptionHandler. I think storing > >>>>>> the topic/offset/partition should be fine, but I am concerned about > >>>>>> storing the rawSourceKey/Value. I think it could impact some specific > >>>>>> use-cases, for example, a high-throughput Kafka Streams application > >>>>>> "counting" messages could have huge source input messages, and very > >>>>>> small sink messages, here, I assume storing the rawSourceKey/Value > >>>>>> could significantly require more memory than the actual Kafka Producer > >>>>>> buffer. > >>>>>> > >>>>>> I think the safest approach is actually to only store the fixed-size > >>>>>> metadata for the ProductionExceptionHandler.handle: > >>>>>> topic/partition/offset/processorNodeId/taskId, it might be confusing > >>>>>> for the user, but 1) it is still better than nowaday where there are > >>>>>> no context information at all, 2) it would be clearly stated in the > >>>>>> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the > >>>>>> punctuate case). . > >>>>>> > >>>>>> Do you think it would be a suitable design Sophie? > >>>>>> > >>>>>> Cheers, > >>>>>> Damien > >>>>>> > >>>>>> On Sun, 14 Apr 2024 at 21:30, Loic Greffier > >>>>>> <loic.greff...@michelin.com> wrote: > >>>>>>> > >>>>>>> Hi Sophie, > >>>>>>> > >>>>>>> Thanks for your feedback. > >>>>>>> Completing the Damien's comments here for points S1 and S5B. > >>>>>>> > >>>>>>> S1: > >>>>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>>>>> ProducerRecord class for this? > >>>>>>> > >>>>>>> I am wondering if it makes sense to alter the ProducerRecord from > >>>>>>> Clients API with a "deadLetterQueueTopicName" attribute dedicated to > >>>>>>> Kafka Streams DLQ. > >>>>>>> Adding "deadLetterQueueTopicName" as an additional parameter to > >>>>>>> "withDeadLetterQueueRecord" is a good option, and may allow users to > >>>>>>> send records to different DLQ topics depending on conditions: > >>>>>>> @Override > >>>>>>> public ProductionExceptionHandlerResponse handle(final > >>>>>>> ProcessingContext context, > >>>>>>> ProducerRecord<byte[], byte[]> record, > >>>>>>> Exception exception) { > >>>>>>> if (condition1) { > >>>>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-a"); > >>>>>>> } > >>>>>>> if (condition2) { > >>>>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-b"); > >>>>>>> } > >>>>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-c"); > >>>>>>> } > >>>>>>> > >>>>>>> S5B: > >>>>>>>> I was having a bit of trouble understanding what the behavior would > >>>>>>>> be if someone configured a "errors.deadletterqueue.topic.name" but > >>>>>>>> didn't implement the handlers. > >>>>>>> > >>>>>>> The provided LogAndContinueExceptionHandler, > >>>>>>> LogAndFailExceptionHandler and DefaultProductionExceptionHandler > >>>>>>> should be able to tell if records should be sent to DLQ or not. > >>>>>>> The "errors.deadletterqueue.topic.name" takes place to: > >>>>>>> > >>>>>>> * Specifying if the provided handlers should or should not send > >>>>>>> records to DLQ. > >>>>>>> * If the value is empty, the handlers should not send records to DLQ. > >>>>>>> * If the value is not empty, the handlers should send records to DLQ. > >>>>>>> * Define the name of the DLQ topic that should be used by the > >>>>>>> provided handlers. > >>>>>>> > >>>>>>> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided > >>>>>>> handlers should return either: > >>>>>>> > >>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue) > >>>>>>> * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue). > >>>>>>> If "errors.deadletterqueue.topic.name" is defined but neither > >>>>>>> DeserializationExceptionHandler nor ProductionExceptionHandler > >>>>>>> classes are defined in the configuration, then nothing should happen > >>>>>>> as sending to DLQ is based on handlers’ response. > >>>>>>> When providing custom handlers, users would have the possibility to > >>>>>>> return: > >>>>>>> > >>>>>>> * FAIL > >>>>>>> * CONTINUE > >>>>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>>>> > >>>>>>> A DLQ topic name is currently required using the two last response > >>>>>>> types. > >>>>>>> I am wondering if it could benefit users to ease the use of the > >>>>>>> default DLQ topic "errors.deadletterqueue.topic.name" when > >>>>>>> implementing custom handlers, with such kind of implementation: > >>>>>>> > >>>>>>> * FAIL.withDefaultDeadLetterQueueRecord(record) > >>>>>>> * CONTINUE.withDefaultDeadLetterQueueRecord(record) > >>>>>>> > >>>>>>> Regards, > >>>>>>> Loïc > >>>>>>> > >>>>>>> De : Damien Gasparina <d.gaspar...@gmail.com> > >>>>>>> Envoyé : dimanche 14 avril 2024 20:24 > >>>>>>> À : dev@kafka.apache.org > >>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka > >>>>>>> Streams > >>>>>>> > >>>>>>> Warning External sender Do not click on any links or open any > >>>>>>> attachments unless you trust the sender and know the content is safe. > >>>>>>> > >>>>>>> Hi Sophie, > >>>>>>> > >>>>>>> Thanks a lot for your feedback and your detailed comments. > >>>>>>> > >>>>>>> S1. > >>>>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>>>> ProducerRecord class for this? > >>>>>>> > >>>>>>> Sorry for the poor wording, that's not what I meant. While writing the > >>>>>>> KIP, I was hesitating between 1. leveraging the Kafka Producer > >>>>>>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in > >>>>>>> a separate parameter, 3. a new custom interface (e.g. > >>>>>>> DeadLetterQueueRecord). > >>>>>>> As the KafkaProducer ProducerRecord is not used in the Kafka Streams > >>>>>>> API (except ProductionExceptionHandler) and I would like to avoid a > >>>>>>> new interface if not strictly required, I leaned toward option 2. > >>>>>>> Thinking about it, maybe option 1. would be best, but I assume it > >>>>>>> could create confusion with KafkaStreams ProducerRecord. Let me sleep > >>>>>>> on it. > >>>>>>> > >>>>>>> S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and > >>>>>>> your point in S4, it seems more and more likely that we will create a > >>>>>>> new container class containing only the metadata for the exception > >>>>>>> handlers. To be consistent, I think we should use this new > >>>>>>> implementation in all exception handlers. > >>>>>>> The only issue I could think off is that the new interface would > >>>>>>> expose less data than the current ProcessorContext in the > >>>>>>> DeserializationException(e.g. stateDir(), metrics(), getStateStore()), > >>>>>>> thus it could be hard for some users to migrate to the new interface. > >>>>>>> I do expect that only a few users would be impacted as the javadoc is > >>>>>>> very clear: `Note, that the passed in {@link ProcessorContext} only > >>>>>>> allows access to metadata like the task ID.` > >>>>>>> > >>>>>>> S3. I completely agree with you, it is something that might not be > >>>>>>> trivial and should be thoroughly covered by unit tests during the > >>>>>>> implementation. > >>>>>>> > >>>>>>> S4. Good point, I did not notice that the ProductionExceptionHandler > >>>>>>> is also invoked in the producer.send() callback. > >>>>>>> Capturing the ProcessingContext for each in-flight message is probably > >>>>>>> not possible. I think there is no other way to write a custom > >>>>>>> container class holding only the metadata that are essentials, I am > >>>>>>> thinking of storing the following attributes: source topic, partition, > >>>>>>> offset, rawKey, rawValue and taskId. > >>>>>>> Those metadata should be relatively small, but I assume that there > >>>>>>> could be a high number of in-flight messages, especially with at least > >>>>>>> once processing guarantee. Do you think it would be fine memory wise? > >>>>>>> > >>>>>>> S5. As many exceptions are only accessible in exception handlers, and > >>>>>>> we wanted to 1) allow users to customize the DLQ records and 2) have a > >>>>>>> suitable DLQ out of the box implementation, we felt it natural to rely > >>>>>>> on exception handlers, that's also why we created KIP-1033. > >>>>>>> Piggybacking on the enum response was the cleanest way we could think > >>>>>>> off, but we are completely open to suggestions. > >>>>>>> > >>>>>>> S5a. Completely agree with you on this point, for this DLQ approach to > >>>>>>> be complete, the ProcessingExceptionHandler introduced in KIP-1033 is > >>>>>>> required. KIP-1033 is definitely our first priority. We decided to > >>>>>>> kick-off the KIP-1034 discussion as we expected the discussions to be > >>>>>>> dynamic and could potentially impact some choices of KIP-1033. > >>>>>>> > >>>>>>> S5b. In this KIP, we wanted to 1. provide as much flexibility to the > >>>>>>> user as possible; 2. provide a good default implementation > >>>>>>> for the DLQ without having to write custom exception handlers. > >>>>>>> For the default implementation, we introduced a new configuration: > >>>>>>> errors.deadletterqueue.topic.name. > >>>>>>> > >>>>>>> If this configuration is set, it changes the behavior of the provided > >>>>>>> exception handlers to return a DLQ record containing the raw key/value > >>>>>>> + headers + exception metadata in headers. > >>>>>>> If the out of the box implementation is not suitable for a user, e.g. > >>>>>>> the payload needs to be masked in the DLQ, it could implement their > >>>>>>> own exception handlers. The errors.deadletterqueue.topic.name would > >>>>>>> only impact Kafka Streams bundled exception handlers (e.g. > >>>>>>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler) > >>>>>>> > >>>>>>> Let me update the KIP to make it clear and also provide examples. > >>>>>>> > >>>>>>> S6/S7. Good point, mea culpa for the camel case, it must have been a > >>>>>>> sugar rush :) > >>>>>>> > >>>>>>> Thanks again for your detailed comments and pointing out S4 > >>>>>>> (production exception & Processing Context)! > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Damien > >>>>>>> This email was screened for spam and malicious content but exercise > >>>>>>> caution anyway. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman > >>>>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote: > >>>>>>>> > >>>>>>>> Thanks for the KIP, this will make a lot of people very happy. > >>>>>>>> > >>>>>>>> Wanted to chime in on a few points that have been raised so far and > >>>>>>>> add > >>>>>>>> some of my own (numbering with an S to distinguish my points from the > >>>>>>>> previous ones) > >>>>>>>> > >>>>>>>> S1. > >>>>>>>> > >>>>>>>>> 1.a I really meant ProducerRecord, that's the class used to forward > >>>>>>>>> to > >>>>>>>>> downstream processors in the PAPI. The only information missing in > >>>>>>>>> this class is the topic name. I also considered relying on the Kafka > >>>>>>>>> Producer ProducerRecord, but I assume it would not be consistent > >>>>>>>>> with > >>>>>>>>> the KafkaStreams API. > >>>>>>>> > >>>>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>>>>> ProducerRecord class for this? Why not just use the existing one, ie > >>>>>>>> the > >>>>>>>> o.a.k.clients.producer.ProducerRecord class? This is what the > >>>>>>>> ProductionExceptionHandler uses, so it's definitely "consistent". In > >>>>>>>> other > >>>>>>>> words, we can remove the "String deadLetterQueueTopicName" > >>>>>>>> > >>>>>>>> S2. > >>>>>>>> I think this would be a good opportunity to also deprecate the > >>>>>>>> existing > >>>>>>>> #handle method of the DeserializationExceptionHandler, and replace > >>>>>>>> it with > >>>>>>>> one that uses a ProcessingContext instead of the ProcessorContext. > >>>>>>>> Partly > >>>>>>>> for the same reasons about guarding access to the #forward methods, > >>>>>>>> partly > >>>>>>>> because this method needs to be migrated to the new PAPI interface > >>>>>>>> anyways, and ProcessingContext is part of the new one. > >>>>>>>> > >>>>>>>> S3. > >>>>>>>> Regarding 2a. -- I'm inclined to agree that records which a > >>>>>>>> Punctuator > >>>>>>>> failed to produce should also be sent to the DLQ via the > >>>>>>>> ProductionExceptionHandler. Users will just need to be careful about > >>>>>>>> accessing certain fields of the ProcessingContext that aren't > >>>>>>>> available in > >>>>>>>> the punctuator, and need to check the Optional returned by the > >>>>>>>> ProcessingContext#recordMetadata API. > >>>>>>>> Also, from an implementation standpoint, it will be really hard to > >>>>>>>> distinguish between a record created by a punctuator vs a processor > >>>>>>>> from > >>>>>>>> within the RecordCollector, which is the class that actually handles > >>>>>>>> sending records to the Streams Producer and invoking the > >>>>>>>> ProductionExceptionHandler. This is because the RecordCollector is > >>>>>>>> at the > >>>>>>>> "end" of the topology graph and doesn't have any context about which > >>>>>>>> of the > >>>>>>>> upstream processors actually attempted to forward a record. > >>>>>>>> > >>>>>>>> This in itself is at least theoretically solvable, but it leads into > >>>>>>>> my > >>>>>>>> first major new point: > >>>>>>>> > >>>>>>>> S4: > >>>>>>>> I'm deeply worried about passing the ProcessingContext in as a means > >>>>>>>> of > >>>>>>>> forwarding metadata. The problem is that the processing/processor > >>>>>>>> context > >>>>>>>> is a mutable class and is inherently meaningless outside the context > >>>>>>>> of a > >>>>>>>> specific task. And when I said earlier that the RecordCollector sits > >>>>>>>> at > >>>>>>>> the "end" of the topology, I meant that it's literally outside the > >>>>>>>> task's > >>>>>>>> subtopology and is used/shared by all tasks on that StreamThread. So > >>>>>>>> to > >>>>>>>> begin with, there's no guarantee what will actually be returned for > >>>>>>>> essential methods such as the new #rawSourceKey/Value or the existing > >>>>>>>> #recordMetadata > >>>>>>>> > >>>>>>>> For serialization exceptions it'll probably be correct, but for > >>>>>>>> general > >>>>>>>> send errors it almost definitely won't be. In short, this is because > >>>>>>>> we > >>>>>>>> send records to the producer after the sink node, but don't check > >>>>>>>> for send > >>>>>>>> errors right away since obviously it takes some time for the > >>>>>>>> producer to > >>>>>>>> actually send. In other words, sending/producing records is actually > >>>>>>>> done > >>>>>>>> asynchronously with processing, and we simply check for errors on any > >>>>>>>> previously-sent records > >>>>>>>> during the send on a new record in a sink node. This means the > >>>>>>>> context we > >>>>>>>> would be passing in to a (non-serialization) exception would pretty > >>>>>>>> much > >>>>>>>> always correspond not the the record that experienced the error, but > >>>>>>>> the > >>>>>>>> random record that happened to be being sent when we checked and saw > >>>>>>>> the > >>>>>>>> error for the failed record. > >>>>>>>> > >>>>>>>> This discrepancy, in addition to the whole "sourceRawKey/Value and > >>>>>>>> recordMetadata are null for punctuators" issue, seems like an > >>>>>>>> insurmountable inconsistency that is more likely to cause users > >>>>>>>> confusion > >>>>>>>> or problems than be helpful. > >>>>>>>> We could create a new metadata object and copy over the relevant > >>>>>>>> info from > >>>>>>>> the ProcessingContext, but I worry that has the potential to explode > >>>>>>>> memory > >>>>>>>> since we'd need to hold on to it for all in-flight records up until > >>>>>>>> they > >>>>>>>> are either successfully sent or failed and passed in to the > >>>>>>>> ProductionExceptionHandler. But if the metadata is relatively small, > >>>>>>>> it's > >>>>>>>> probably fine. Especially if it's just the raw source key/value. Are > >>>>>>>> there any other parts of the ProcessingContext you think should be > >>>>>>>> made > >>>>>>>> available? > >>>>>>>> > >>>>>>>> Note that this only applies to the ProductionExceptionHandler, as the > >>>>>>>> DeserializationExceptionHandler (and the newly proposed > >>>>>>>> ProcessingExceptionHandler) would both be invoked immediately and > >>>>>>>> therefore > >>>>>>>> with the failed record's context. However, I'm also a bit > >>>>>>>> uncomfortable > >>>>>>>> with adding the rawSourceKey/rawSourceValue to the > >>>>>>>> ProcessingContext. So > >>>>>>>> I'd propose to just wrap those (and any other metadata you might > >>>>>>>> want) in a > >>>>>>>> container class and pass that in instead of the ProcessingContext, > >>>>>>>> to all > >>>>>>>> of the exception handlers. > >>>>>>>> > >>>>>>>> S5: > >>>>>>>> For some reason I'm finding the proposed API a little bit awkward, > >>>>>>>> although > >>>>>>>> it's entirely possible that the problem is with me, not the proposal > >>>>>>>> :) > >>>>>>>> Specifically I'm struggling with the approach of piggybacking on the > >>>>>>>> exception handlers and their response enums to dictate how records > >>>>>>>> are > >>>>>>>> forwarded to the DLQ. I think this comes down to two things, though > >>>>>>>> again, > >>>>>>>> these aren't necessarily problems with the API and probably just > >>>>>>>> need to be > >>>>>>>> hashed out: > >>>>>>>> > >>>>>>>> S5a. > >>>>>>>> When I envision a DLQ, to me, the most common use case would be to > >>>>>>>> forward > >>>>>>>> input records that failed somewhere along the processing graph. But > >>>>>>>> it > >>>>>>>> seems like all the focus here is on the two far ends of the > >>>>>>>> subtopology -- > >>>>>>>> the input/consumer, and the output/producer. I get that > >>>>>>>> the ProcessingExceptionHandler is really the missing piece here, and > >>>>>>>> it's > >>>>>>>> hard to say anything specific since it's not yet accepted, but maybe > >>>>>>>> a > >>>>>>>> somewhat more concrete example would help. FWIW I think/hope to get > >>>>>>>> that > >>>>>>>> KIP accepted and implementation ASAP, so I'm not worried about the > >>>>>>>> "what if > >>>>>>>> it doesn't happen" case -- more just want to know what it will look > >>>>>>>> like > >>>>>>>> when it does. Imo it's fine to build KIPs on top of future ones, it > >>>>>>>> feels > >>>>>>>> clear that this part will just have to wait for that KIP to actually > >>>>>>>> be > >>>>>>>> added. > >>>>>>>> > >>>>>>>> S5b: > >>>>>>>> Why do users have to define the entire ProducerRecord -- shouldn't > >>>>>>>> Streams > >>>>>>>> handle all this for them? Or will we just automatically send every > >>>>>>>> record > >>>>>>>> on failure to the default global DLQ, and users only have to > >>>>>>>> implement the > >>>>>>>> handlers if they want to change the headers or send to a different > >>>>>>>> topic? I > >>>>>>>> was having a bit of trouble understanding what the behavior would be > >>>>>>>> if > >>>>>>>> someone configured a "errors.deadletterqueue.topic.name" but didn't > >>>>>>>> implement the handlers. Apologies if it's somewhere in the KIP and I > >>>>>>>> happened to miss it! > >>>>>>>> > >>>>>>>> Either way, I really think an example would help me to better > >>>>>>>> imagine what > >>>>>>>> this will look like in practice, and evaluate whether it actually > >>>>>>>> involves > >>>>>>>> as much overhead as I'm worried it will. Can you add a section that > >>>>>>>> includes a basic implementation of all the features here? Nothing too > >>>>>>>> complicated, just the most bare-bones code needed to actually > >>>>>>>> implement > >>>>>>>> forwarding to a dead-letter-queue via the handlers. > >>>>>>>> > >>>>>>>> Lastly, two super small things: > >>>>>>>> > >>>>>>>> S6: > >>>>>>>> We use camel case in Streams, so it should be rawSourceKey/Value > >>>>>>>> rather > >>>>>>>> than raw_source_key/value > >>>>>>>> > >>>>>>>> S7: > >>>>>>>> Can you add javadocs for the #withDeadLetterQueueRecord? For > >>>>>>>> example, it > >>>>>>>> seems to me that if the topic to be sent to here is different than > >>>>>>>> the > >>>>>>>> default/global DLQ, then the user will need to make sure to have > >>>>>>>> created > >>>>>>>> this themselves up front. > >>>>>>>> > >>>>>>>> That's it from me...sorry for the long response, it's just because > >>>>>>>> I'm > >>>>>>>> excited for this feature and have been waiting on a KIP for this for > >>>>>>>> years. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Sophie > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina > >>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Andrew, > >>>>>>>>> > >>>>>>>>> Thanks a lot for your review, plenty of good points! > >>>>>>>>> > >>>>>>>>> 11. Typo fixed, good cach. > >>>>>>>>> > >>>>>>>>> 12. I do agree with you and Nick also mentioned it, I updated the > >>>>>>>>> KIP > >>>>>>>>> to mention that context headers should be forwarded. > >>>>>>>>> > >>>>>>>>> 13. Good catch, to be consistent with KIP-298, and without a strong > >>>>>>>>> opinion from my side, I updated the KIP with your prefix proposal. > >>>>>>>>> > >>>>>>>>> 14. I am not sure about this point, a big difference between KIP-298 > >>>>>>>>> and this one is that the handlers can easily be overridden, > >>>>>>>>> something > >>>>>>>>> that is not doable in Kafka Connect. > >>>>>>>>> If someone would like a different behavior, e.g. to mask the payload > >>>>>>>>> or include further headers, I think we should encourage them to > >>>>>>>>> write > >>>>>>>>> their own exception handlers to build the DLQ Record the way they > >>>>>>>>> expect. > >>>>>>>>> > >>>>>>>>> 15. Yeah, that's a good point, I was not fully convinced about > >>>>>>>>> putting > >>>>>>>>> a String in it, I do assume that "null" is also a valid value. I do > >>>>>>>>> assume that the Stacktrace and the Exception in this case are the > >>>>>>>>> key > >>>>>>>>> metadata for the user to troubleshoot the problem. > >>>>>>>>> I updated the KIP to mention that the value should be null if > >>>>>>>>> triggered in a punctuate. > >>>>>>>>> > >>>>>>>>> 16. I added a session to mention that Kafka Streams would not try to > >>>>>>>>> automatically create the topic and the topic should either be > >>>>>>>>> automatically created, or pre-created. > >>>>>>>>> > >>>>>>>>> 17. If a DLQ record can not be sent, the exception should go to the > >>>>>>>>> uncaughtExceptionHandler. Let me clearly state it in the KIP. > >>>>>>>>> > >>>>>>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina > >>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Nick, > >>>>>>>>>> > >>>>>>>>>> 1. Good point, that's less impactful than a custom interface, I > >>>>>>>>>> just > >>>>>>>>>> updated the KIP with the new signature. > >>>>>>>>>> > >>>>>>>>>> 1.a I really meant ProducerRecord, that's the class used to > >>>>>>>>>> forward to > >>>>>>>>>> downstream processors in the PAPI. The only information missing in > >>>>>>>>>> this class is the topic name. I also considered relying on the > >>>>>>>>>> Kafka > >>>>>>>>>> Producer ProducerRecord, but I assume it would not be consistent > >>>>>>>>>> with > >>>>>>>>>> the KafkaStreams API. > >>>>>>>>>> > >>>>>>>>>> 2. Agreed > >>>>>>>>>> > >>>>>>>>>> 2.a I do think exceptions occurring during punctuate should be > >>>>>>>>>> included in the DLQ. > >>>>>>>>>> Even if building a suitable payload is almost impossible, even with > >>>>>>>>>> custom code; those exceptions are still fatal for Kafka Streams by > >>>>>>>>>> default and are something that can not be ignored safely. > >>>>>>>>>> I do assume that most users would want to be informed if an error > >>>>>>>>>> happened during a punctuate, even if only the metadata (e.g. > >>>>>>>>>> stacktrace, exception) is provided. > >>>>>>>>>> I am only concerned flooding the DLQ topic as, if a scheduled > >>>>>>>>>> operation failed, very likely it will fails during the next > >>>>>>>>>> invocation, but > >>>>>>>>>> > >>>>>>>>>> 4. Good point, I clarified the wording in the KIP to make it > >>>>>>>>>> explicit. > >>>>>>>>>> > >>>>>>>>>> 5. Good point, I will clearly mention that it is out of scope as > >>>>>>>>>> part > >>>>>>>>>> of the KIP and might not be as trivial as people could expect. I > >>>>>>>>>> will > >>>>>>>>>> update the KIP once I do have some spare time. > >>>>>>>>>> > >>>>>>>>>> 6. Oh yeah, I didn't think about it, but forwarding input headers > >>>>>>>>>> would definitely make sense. Confluent Schema Registry ID is > >>>>>>>>>> actually > >>>>>>>>>> part of the payload, but many correlation ID and technical metadata > >>>>>>>>>> are passed through headers, it makes sense to forward them, > >>>>>>>>>> specially > >>>>>>>>>> as it is the default behavior of Kafka Streams, > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford > >>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi Damien and Sebastien, > >>>>>>>>>>> > >>>>>>>>>>> 1. > >>>>>>>>>>> I think you can just add a `String topic` argument to the existing > >>>>>>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>>>>>> deadLetterQueueRecord)` method, and then the implementation of the > >>>>>>>>>>> exception handler could choose the topic to send records to using > >>>>>>>>> whatever > >>>>>>>>>>> logic the user desires. You could perhaps provide a built-in > >>>>>>>>> implementation > >>>>>>>>>>> that leverages your new config to send all records to an untyped > >>>>>>>>>>> DLQ > >>>>>>>>> topic? > >>>>>>>>>>> > >>>>>>>>>>> 1a. > >>>>>>>>>>> BTW you have a typo: in your DeserializationExceptionHandler, the > >>>>>>>>>>> type > >>>>>>>>> of > >>>>>>>>>>> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it > >>>>>>>>> should > >>>>>>>>>>> probably be `ConsumerRecord`. > >>>>>>>>>>> > >>>>>>>>>>> 2. > >>>>>>>>>>> Agreed. I think it's a good idea to provide an implementation that > >>>>>>>>> sends to > >>>>>>>>>>> a single DLQ by default, but it's important to enable users to > >>>>>>>>> customize > >>>>>>>>>>> this with their own exception handlers. > >>>>>>>>>>> > >>>>>>>>>>> 2a. > >>>>>>>>>>> I'm not convinced that "errors" (e.g. failed punctuate) should be > >>>>>>>>>>> sent > >>>>>>>>> to a > >>>>>>>>>>> DLQ topic like it's a bad record. To me, a DLQ should only contain > >>>>>>>>> records > >>>>>>>>>>> that failed to process. I'm not even sure how a user would > >>>>>>>>>>> re-process/action one of these other errors; it seems like the > >>>>>>>>>>> purview > >>>>>>>>> of > >>>>>>>>>>> error logging to me? > >>>>>>>>>>> > >>>>>>>>>>> 4. > >>>>>>>>>>> My point here was that I think it would be useful for the KIP to > >>>>>>>>> contain an > >>>>>>>>>>> explanation of the behavior both with KIP-1033 and without it. > >>>>>>>>>>> i.e. > >>>>>>>>> clarify > >>>>>>>>>>> if/how records that throw an exception in a processor are > >>>>>>>>>>> handled. At > >>>>>>>>> the > >>>>>>>>>>> moment, I'm assuming that without KIP-1033, processing exceptions > >>>>>>>>> would not > >>>>>>>>>>> cause records to be sent to the DLQ, but with KIP-1033, they > >>>>>>>>>>> would. If > >>>>>>>>> this > >>>>>>>>>>> assumption is correct, I think it should be made explicit in the > >>>>>>>>>>> KIP. > >>>>>>>>>>> > >>>>>>>>>>> 5. > >>>>>>>>>>> Understood. You may want to make this explicit in the > >>>>>>>>>>> documentation for > >>>>>>>>>>> users, so they understand the consequences of re-processing data > >>>>>>>>>>> sent > >>>>>>>>> to > >>>>>>>>>>> their DLQ. The main reason I raised this point is it's something > >>>>>>>>>>> that's > >>>>>>>>>>> tripped me up in numerous KIPs that that committers frequently > >>>>>>>>>>> remind > >>>>>>>>> me > >>>>>>>>>>> of; so I wanted to get ahead of it for once! :D > >>>>>>>>>>> > >>>>>>>>>>> And one new point: > >>>>>>>>>>> 6. > >>>>>>>>>>> The DLQ record schema appears to discard all custom headers set > >>>>>>>>>>> on the > >>>>>>>>>>> source record. Is there a way these can be included? In > >>>>>>>>>>> particular, I'm > >>>>>>>>>>> concerned with "schema pointer" headers (like those set by Schema > >>>>>>>>>>> Registry), that may need to be propagated, especially if the > >>>>>>>>>>> records > >>>>>>>>> are > >>>>>>>>>>> fed back into the source topics for re-processing by the user. > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Nick > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina > >>>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks a lot for your review and your useful comments! > >>>>>>>>>>>> > >>>>>>>>>>>> 1. It is a good point, as you mentioned, I think it would make > >>>>>>>>>>>> sense > >>>>>>>>>>>> in some use cases to have potentially multiple DLQ topics, so we > >>>>>>>>>>>> should provide an API to let users do it. > >>>>>>>>>>>> Thinking out-loud here, maybe it is a better approach to create > >>>>>>>>>>>> a new > >>>>>>>>>>>> Record class containing the topic name, e.g. > >>>>>>>>>>>> DeadLetterQueueRecord > >>>>>>>>> and > >>>>>>>>>>>> changing the signature to > >>>>>>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord> > >>>>>>>>>>>> deadLetterQueueRecords) instead of > >>>>>>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>>>>>>> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord > >>>>>>>>> would > >>>>>>>>>>>> be something like "class DeadLetterQueueRecord extends > >>>>>>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords { String > >>>>>>>>>>>> topic; /* + getter/setter + */ } " > >>>>>>>>>>>> > >>>>>>>>>>>> 2. I think the root question here is: should we have one DLQ > >>>>>>>>>>>> topic or > >>>>>>>>>>>> multiple DLQ topics by default. This question highly depends on > >>>>>>>>>>>> the > >>>>>>>>>>>> context, but implementing a default implementation to handle > >>>>>>>>>>>> multiple > >>>>>>>>>>>> DLQ topics would be opinionated, e.g. how to manage errors in a > >>>>>>>>>>>> punctuate? > >>>>>>>>>>>> I think it makes sense to have the default implementation > >>>>>>>>>>>> writing all > >>>>>>>>>>>> faulty records to a single DLQ, that's at least the approach I > >>>>>>>>>>>> used > >>>>>>>>> in > >>>>>>>>>>>> past applications: one DLQ per Kafka Streams application. Of > >>>>>>>>>>>> course > >>>>>>>>>>>> the message format could change in the DLQ e.g. due to the source > >>>>>>>>>>>> topic, but those DLQ records will be very likely troubleshooted, > >>>>>>>>>>>> and > >>>>>>>>>>>> maybe replay, manually anyway. > >>>>>>>>>>>> If a user needs to have multiple DLQ topics or want to enforce a > >>>>>>>>>>>> specific schema, it's still possible, but they would need to > >>>>>>>>> implement > >>>>>>>>>>>> custom Exception Handlers. > >>>>>>>>>>>> Coming back to 1. I do agree that it would make sense to have the > >>>>>>>>> user > >>>>>>>>>>>> set the DLQ topic name in the handlers for more flexibility. > >>>>>>>>>>>> > >>>>>>>>>>>> 3. Good point, sorry it was a typo, the ProcessingContext makes > >>>>>>>>>>>> much > >>>>>>>>>>>> more sense here indeed. > >>>>>>>>>>>> > >>>>>>>>>>>> 4. I do assume that we could implement KIP-1033 (Processing > >>>>>>>>>>>> exception > >>>>>>>>>>>> handler) independently from KIP-1034. I do hope that KIP-1033 > >>>>>>>>>>>> would > >>>>>>>>> be > >>>>>>>>>>>> adopted and implemented before KIP-1034, but if that's not the > >>>>>>>>>>>> case, > >>>>>>>>>>>> we could implement KIP-1034 indepantly and update KIP-1033 to > >>>>>>>>>>>> include > >>>>>>>>>>>> the DLQ record afterward (in the same KIP or in a new one if not > >>>>>>>>>>>> possible). > >>>>>>>>>>>> > >>>>>>>>>>>> 5. I think we should be clear that this KIP only covers the DLQ > >>>>>>>>> record > >>>>>>>>>>>> produced. > >>>>>>>>>>>> Everything related to replay messages or recovery plan should be > >>>>>>>>>>>> considered out-of-scope as it is use-case and error specific. > >>>>>>>>>>>> > >>>>>>>>>>>> Let me know if that's not clear, there are definitely points that > >>>>>>>>>>>> highly debatable. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Damien > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford > >>>>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Oh, and one more thing: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 5. > >>>>>>>>>>>>> Whenever you take a record out of the stream, and then > >>>>>>>>>>>>> potentially > >>>>>>>>>>>>> re-introduce it at a later date, you introduce the potential for > >>>>>>>>> record > >>>>>>>>>>>>> ordering issues. For example, that record could have been > >>>>>>>>>>>>> destined > >>>>>>>>> for a > >>>>>>>>>>>>> Window that has been closed by the time it's re-processed. I'd > >>>>>>>>> like to > >>>>>>>>>>>> see > >>>>>>>>>>>>> a section that considers these consequences, and perhaps make > >>>>>>>>> those risks > >>>>>>>>>>>>> clear to users. For the record, this is exactly what sunk > >>>>>>>>>>>>> KIP-990, > >>>>>>>>> which > >>>>>>>>>>>>> was an alternative approach to error handling that introduced > >>>>>>>>>>>>> the > >>>>>>>>> same > >>>>>>>>>>>>> issues. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Nick > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford > >>>>>>>>>>>>> <nick.telf...@gmail.com > >>>>>>> <mailto:nick.telf...@gmail.com%0b>> > > > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Damien, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the KIP! Dead-letter queues are something that I > >>>>>>>>> think a > >>>>>>>>>>>> lot of > >>>>>>>>>>>>>> users would like. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I think there are a few points with this KIP that concern me: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>> It looks like you can only define a single, global DLQ for the > >>>>>>>>> entire > >>>>>>>>>>>>>> Kafka Streams application? What about applications that would > >>>>>>>>> like to > >>>>>>>>>>>>>> define different DLQs for different data flows? This is > >>>>>>>>> especially > >>>>>>>>>>>>>> important when dealing with multiple source topics that have > >>>>>>>>> different > >>>>>>>>>>>>>> record schemas. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>> Your DLQ payload value can either be the record value that > >>>>>>>>> failed, or > >>>>>>>>>>>> an > >>>>>>>>>>>>>> error string (such as "error during punctuate"). This is likely > >>>>>>>>> to > >>>>>>>>>>>> cause > >>>>>>>>>>>>>> problems when users try to process the records from the DLQ, as > >>>>>>>>> they > >>>>>>>>>>>> can't > >>>>>>>>>>>>>> guarantee the format of every record value will be the same. > >>>>>>>>> This is > >>>>>>>>>>>> very > >>>>>>>>>>>>>> loosely related to point 1. above. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>> You provide a ProcessorContext to both exception handlers, but > >>>>>>>>> state > >>>>>>>>>>>> they > >>>>>>>>>>>>>> cannot be used to forward records. In that case, I believe you > >>>>>>>>> should > >>>>>>>>>>>> use > >>>>>>>>>>>>>> ProcessingContext instead, which statically guarantees that it > >>>>>>>>> can't be > >>>>>>>>>>>>>> used to forward records. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4. > >>>>>>>>>>>>>> You mention the KIP-1033 ProcessingExceptionHandler, but what's > >>>>>>>>> the > >>>>>>>>>>>> plan > >>>>>>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina < > >>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In a general way, if the user does not configure the right > >>>>>>>>>>>>>>> ACL, > >>>>>>>>> that > >>>>>>>>>>>>>>> would be a security issue, but that's true for any topic. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> This KIP allows users to configure a Dead Letter Queue without > >>>>>>>>> writing > >>>>>>>>>>>>>>> custom Java code in Kafka Streams, not at the topic level. > >>>>>>>>>>>>>>> A lot of applications are already implementing this pattern, > >>>>>>>>> but the > >>>>>>>>>>>>>>> required code to do it is quite painful and error prone, for > >>>>>>>>> example > >>>>>>>>>>>>>>> most apps I have seen created a new KafkaProducer to send > >>>>>>>>> records to > >>>>>>>>>>>>>>> their DLQ. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> As it would be disabled by default for backward compatibility, > >>>>>>>>> I doubt > >>>>>>>>>>>>>>> it would generate any security concern. > >>>>>>>>>>>>>>> If a user explicitly configures a Deal Letter Queue, it would > >>>>>>>>> be up to > >>>>>>>>>>>>>>> him to configure the relevant ACLs to ensure that the right > >>>>>>>>> principal > >>>>>>>>>>>>>>> can access it. > >>>>>>>>>>>>>>> It is already the case for all internal, input and output > >>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>> Streams topics (e.g. repartition, changelog topics) that also > >>>>>>>>> could > >>>>>>>>>>>>>>> contain confidential data, so I do not think we should > >>>>>>>>> implement a > >>>>>>>>>>>>>>> different behavior for this one. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In this KIP, we configured the default DLQ record to have the > >>>>>>>>> initial > >>>>>>>>>>>>>>> record key/value as we assume that it is the expected and > >>>>>>>>>>>>>>> wanted > >>>>>>>>>>>>>>> behavior for most applications. > >>>>>>>>>>>>>>> If a user does not want to have the key/value in the DLQ > >>>>>>>>>>>>>>> record > >>>>>>>>> for > >>>>>>>>>>>>>>> any reason, they could still implement exception handlers to > >>>>>>>>> build > >>>>>>>>>>>>>>> their own DLQ record. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regarding ACL, maybe something smarter could be done in Kafka > >>>>>>>>> Streams, > >>>>>>>>>>>>>>> but this is out of scope for this KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren > >>>>>>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>> > >>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> My concern is that someone would create a dead letter queue > >>>>>>>>> on a > >>>>>>>>>>>>>>> sensitive > >>>>>>>>>>>>>>>> topic and not get the ACL correct from the start. Thus > >>>>>>>>> causing > >>>>>>>>>>>>>>> potential > >>>>>>>>>>>>>>>> confidential data leak. Is there anything in the proposal > >>>>>>>>> that > >>>>>>>>>>>> would > >>>>>>>>>>>>>>>> prevent that from happening? If so I did not recognize it as > >>>>>>>>> such. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina < > >>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Claude, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> In this KIP, the Dead Letter Queue is materialized by a > >>>>>>>>> standard > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> independant topic, thus normal ACL applies to it like any > >>>>>>>>> other > >>>>>>>>>>>> topic. > >>>>>>>>>>>>>>>>> This should not introduce any security issues, obviously, > >>>>>>>>> the > >>>>>>>>>>>> right > >>>>>>>>>>>>>>>>> ACL would need to be provided to write to the DLQ if > >>>>>>>>> configured. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Damien > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > >>>>>>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I am new to the Kafka codebase so please excuse any > >>>>>>>>> ignorance > >>>>>>>>>>>> on my > >>>>>>>>>>>>>>> part. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> When a dead letter queue is established is there a > >>>>>>>>> process to > >>>>>>>>>>>>>>> ensure that > >>>>>>>>>>>>>>>>>> it at least is defined with the same ACL as the original > >>>>>>>>> queue? > >>>>>>>>>>>>>>> Without > >>>>>>>>>>>>>>>>>> such a guarantee at the start it seems that managing dead > >>>>>>>>> letter > >>>>>>>>>>>>>>> queues > >>>>>>>>>>>>>>>>>> will be fraught with security issues. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < > >>>>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> To continue on our effort to improve Kafka Streams error > >>>>>>>>>>>>>>> handling, we > >>>>>>>>>>>>>>>>>>> propose a new KIP to add out of the box support for Dead > >>>>>>>>>>>> Letter > >>>>>>>>>>>>>>> Queue. > >>>>>>>>>>>>>>>>>>> The goal of this KIP is to provide a default > >>>>>>>>> implementation > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> should be suitable for most applications and allow > >>>>>>>>> users to > >>>>>>>>>>>>>>> override > >>>>>>>>>>>>>>>>>>> it if they have specific requirements. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> In order to build a suitable payload, some additional > >>>>>>>>> changes > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>> included in this KIP: > >>>>>>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, when > >>>>>>>>> available, the > >>>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>> node raw key/value byte[] > >>>>>>>>>>>>>>>>>>> 2. expose the ProcessingContext to the > >>>>>>>>>>>>>>> ProductionExceptionHandler, > >>>>>>>>>>>>>>>>>>> it is currently not available in the handle parameters. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Regarding point 2., to expose the ProcessingContext to > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>> ProductionExceptionHandler, we considered two choices: > >>>>>>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a parameter in > >>>>>>>>> the > >>>>>>>>>>>> handle() > >>>>>>>>>>>>>>>>>>> method. That's the cleanest way IMHO, but we would need > >>>>>>>>> to > >>>>>>>>>>>>>>> deprecate > >>>>>>>>>>>>>>>>>>> the old method. > >>>>>>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an attribute in > >>>>>>>>> the > >>>>>>>>>>>>>>> interface. > >>>>>>>>>>>>>>>>>>> This way, no method is deprecated, but we would not be > >>>>>>>>>>>> consistent > >>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>> the other ExceptionHandler. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new handle > >>>>>>>>> signature > >>>>>>>>>>>> with > >>>>>>>>>>>>>>> old > >>>>>>>>>>>>>>>>>>> one deprecated), but we could use other opinions on > >>>>>>>>> this part. > >>>>>>>>>>>>>>>>>>> More information is available directly on the KIP. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> KIP link: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Feedbacks and suggestions are welcome, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> Damien, Sebastien and Loic > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>> LinkedIn: > >>>>>>>>>>>>>>>> http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>> > >>>>>> > >>>