Hi Bruno, > BC1 I see your point. I wonder if having a generic and separate tool outside of StreamsReset to reset a topic would make sense. Some community projects have this feature (empty topics) and that's true that it could be quite useful, e.g. AKHQ.
> BC3 Good point, this is not really a builder class, let me update the KIP with the proposed syntax. On Tue, 3 Sept 2024 at 16:00, Bruno Cadonna <cado...@apache.org> wrote: > > Hi Damien, > > BC1 > > > In my opinion DLQ topics should be viewed as a sink topic and > > AFAIK, this tool does not clean up sink topics. > > Maybe, but one could also argue DLQ topics are part of the runtime > because is collects errors occurred during the runtime that might be > specific to given execution of the Streams app. One might want want to > reset the errors when starting from scratch. > > > BC2 > You are right the configs are passed at creation time. My bad! > > > BC3 > I think the `with`-syntax fits well when building objects that contain > kind of configs like Materialized that you usually pass into an API. > However, the handler returns the response. The response instructs > something. It says CONTINUE processing or FAIL the processing. With your > KIP the response gets an additional instruction, namely > `andAddToDeadLetterQueue`. I would not sacrifice better readability for > consistency in this case. > > > Best, > Bruno > > On 9/3/24 3:18 PM, Damien Gasparina wrote: > > Hi Bruno, > > > > Thanks a lot for your comments! > > > >> BC1 > >> Do you also plan to add an argument to the StreamsResetter tool to > >> delete the default DLQ topic when a Streams app is reset? > > Good point, I did not think about the StreamsResetter tool. Thinking > > out loud, I am not sure if it is a good idea to add an option to clean > > them up. In my opinion DLQ topics should be viewed as a sink topic and > > AFAIK, this tool does not clean up sink topics. > > > >> BC2 > > In case of a custom exception handlers, they can get the > > errors.deadletterqueue.topic.name configuration by overriding `void > > configure(Map<String, ?> configs);`. As it is the location where all > > the configuration can be accessed, I think it's the best way no? I > > will think about it a bit further, it might be useful/convenient for > > users. > > > >> BC3 > > The "with" syntax is used in many locations in the Kafka Streams > > public classes, that's why I used it, e.g. Materialized.with(), > > Consumed.withKeySerde(...).withValueSerde(...), Grouped.withName(...). > > I do agree that .andAddToDeadLetterQueue is more intuitive, but I > > would argue that being consistent is better in this situation. What do > > you think? > > > > Cheers, > > Damien > > > > On Tue, 3 Sept 2024 at 14:59, Bruno Cadonna <cado...@apache.org> wrote: > >> > >> Hi, > >> > >> Thanks for the updates! > >> > >> I have a couple of comments. > >> > >> BC1 > >> Do you also plan to add an argument to the StreamsResetter tool to > >> delete the default DLQ topic when a Streams app is reset? > >> > >> > >> BC2 > >> Would it make sense to add errors.deadletterqueue.topic.name to the > >> ErrorHandlerContext in case that a custom exception handler wants to > >> write to the configured DLQ topic? > >> For example if only one of the handler needs to be customized but all > >> handlers should write to the configured DLQ topic. > >> > >> > >> BC3 > >> What do you think about renaming withDeadLetterQueueRecords() to > >> andAddToDeadLetterQueue()? > >> A customized handler would look like > >> > >> public ProcessingHandlerResponse handle( > >> final ErrorHandlerContext context, > >> final Record<?, ?> record, > >> final Exception exception > >> ) { > >> return ProcessingHandlerResponse.CONTINUE > >> .andAddToDeadLetterQueue( > >> Collections.singletonList(new ProducerRecord<>( > >> "app-dlq", > >> "Hello".getBytes(StandardCharsets.UTF_8), > >> "World".getBytes(StandardCharsets.UTF_8) > >> )) > >> ); > >> } > >> > >> I think the code becomes more readable. > >> > >> > >> Best, > >> Bruno > >> > >> On 8/30/24 3:37 PM, Damien Gasparina wrote: > >>> Hi everyone, > >>> > >>> We just updated KIP-1034, we changed the following: > >>> - We included the ProcessingExceptionHandler (KIP-1033) directly in > >>> the KIP; > >>> - We provided examples to clarify the new configuration, and how it > >>> could be leveraged. > >>> > >>> I think we can resume the conversation on this KIP. > >>> > >>> Cheers, > >>> Damien Sebastien and Loic > >>> > >>> > >>> On Tue, 27 Aug 2024 at 15:37, Sebastien Viale > >>> <sebastien.vi...@michelin.com> wrote: > >>>> > >>>> > >>>> Hi Bruno, > >>>> > >>>> We have planned a meeting for next friday to discuss it with Loic and > >>>> Damien. > >>>> > >>>> We will be able to restart discussions about it soon. > >>>> > >>>> regards > >>>> > >>>> ________________________________ > >>>> De : Bruno Cadonna <cado...@apache.org> > >>>> Envoyé : lundi 26 août 2024 11:32 > >>>> À : dev@kafka.apache.org <dev@kafka.apache.org> > >>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >>>> > >>>> Warning External sender Do not click on any links or open any > >>>> attachments unless you trust the sender and know the content is safe. > >>>> > >>>> Hi Loïc, Sebastien, and Damien, > >>>> > >>>> Now that KIP-1033 is going to be released in 3.9, what is the plan to > >>>> progress with this KIP? > >>>> > >>>> Is the KIP up-to-date, so that we can restart discussion? > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> This email was screened for spam and malicious content but exercise > >>>> caution anyway. > >>>> > >>>> > >>>> > >>>> > >>>> On 6/13/24 6:16 PM, Damien Gasparina wrote: > >>>>> Hi Bruno, > >>>>> > >>>>> We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, > >>>>> that's why not much progress has been made on this one yet. > >>>>> Regarding your points: > >>>>> > >>>>> B1: It is up to the user to specify the DLQ topic name and to > >>>>> implement a potential differentiation. I tend to think that having one > >>>>> DLQ per application ID is the wisest, but I encountered cases and > >>>>> applications that preferred having a shared DLQ topic between multiple > >>>>> applications, e.g. to reduce the number of partitions, or to ease > >>>>> monitoring > >>>>> > >>>>> B2 : Goot catch, it should be > >>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" > >>>>> prefix during the discussion, looks like I forgot to update all > >>>>> occurrences in the KIP. > >>>>> > >>>>> B3 :The trigger for sending to the DLQ would be if > >>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user > >>>>> implemented a custom exception handler that returns DLQ records. > >>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the > >>>>> behavior of the default handler, thus custom exception handlers will > >>>>> completely ignore this parameter. > >>>>> > >>>>> I think it's a good trade-off between providing a production-ready > >>>>> default implementation, yet providing sufficient flexibility for > >>>>> complex use-cases. > >>>>> This behavior definitely needs to be documented, but I guess it's safe > >>>>> to push the responsibility of the DLQ records to the user if they > >>>>> implement custom handlers. > >>>>> > >>>>> Cheers, > >>>>> Damien > >>>>> > >>>>> > >>>>> On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna <cado...@apache.org> wrote: > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> since there was not too much activity in this thread recently, I was > >>>>>> wondering what the status of this discussion is. > >>>>>> > >>>>>> I cannot find the examples in the KIP Sébastien mentioned in the last > >>>>>> message to this thread. I can also not find the corresponding > >>>>>> definition > >>>>>> of the following method call in the KIP: > >>>>>> > >>>>>> FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>>> > >>>>>> I have also some comments: > >>>>>> > >>>>>> B1 > >>>>>> Did you consider to prefix the dead letter queue topic names with the > >>>>>> application ID to distinguish the topics between Streams apps? Or is > >>>>>> the > >>>>>> user responsible for the differentiation? If the user is responsible, > >>>>>> we > >>>>>> risk that faulty records of different Streams apps end up in the same > >>>>>> dead letter queue. > >>>>>> > >>>>>> B2 > >>>>>> Is the name of the dead letter queue topic config > >>>>>> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or > >>>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are > >>>>>> used. > >>>>>> > >>>>>> B3 > >>>>>> What is exactly the trigger to send a record to the dead letter queue? > >>>>>> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a > >>>>>> record to the return value of the exception handler? > >>>>>> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do > >>>>>> not add a record to the return value of the handler? What happens if I > >>>>>> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to > >>>>>> the return value of the handler? > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> On 4/22/24 10:19 PM, Sebastien Viale wrote: > >>>>>>> Hi, > >>>>>>> > >>>>>>> Thanks for your remarks > >>>>>>> > >>>>>>> L1. I would say "who can do the most can do the least", even though > >>>>>>> most people will fail and stop, we found it interesting to offer the > >>>>>>> possibility to fail-and-send-to-DLQ > >>>>>>> > >>>>>>> L2: We did not consider extending the TimestampExtractor because we > >>>>>>> estimate it out of scope for this KIP. Perhaps it will be possible to > >>>>>>> include it in an ExceptionHandler later. > >>>>>>> > >>>>>>> L3: we will include an example in the KIP, but as we mentioned > >>>>>>> earlier, the DLQ topic can be different in each custom Exception > >>>>>>> Handler: > >>>>>>> > >>>>>>> When providing custom handlers, users would have the possibility to > >>>>>>> return: > >>>>>>> * FAIL > >>>>>>> * CONTINUE > >>>>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>>>> > >>>>>>> cheers ! > >>>>>>> Sébastien > >>>>>>> > >>>>>>> > >>>>>>> ________________________________ > >>>>>>> De : Lucas Brutschy <lbruts...@confluent.io.INVALID> > >>>>>>> Envoyé : lundi 22 avril 2024 14:36 > >>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> > >>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka > >>>>>>> Streams > >>>>>>> > >>>>>>> Warning External sender Do not click on any links or open any > >>>>>>> attachments unless you trust the sender and know the content is safe. > >>>>>>> > >>>>>>> Hi! > >>>>>>> > >>>>>>> Thanks for the KIP, great stuff. > >>>>>>> > >>>>>>> L1. I was a bit confused that the default configuration (once you set > >>>>>>> a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood > >>>>>>> correctly. Is this something that will be a common use-case, and is it > >>>>>>> a configuration that we want to encourage? It expected that you either > >>>>>>> want to fail or skip-and-send-to-DLQ. > >>>>>>> > >>>>>>> L2. Have you considered extending the `TimestampExtractor` interface > >>>>>>> so that it can also produce to DLQ? AFAIK it's not covered by any of > >>>>>>> the existing exception handlers, but it can cause similar failures > >>>>>>> (potentially custom logic, depends on validity input record). There > >>>>>>> could also be a default implementation as a subclass of > >>>>>>> `ExtractRecordMetadataTimestamp`. > >>>>>>> > >>>>>>> L3. It would be nice to include an example of how to produce to > >>>>>>> multiple topics in the KIP, as I can imagine that this will be a > >>>>>>> common use-case. I wasn't sure how much code would be involved to make > >>>>>>> it work. If a lot of code is required, we may want to consider > >>>>>>> exposing some utils that make it easier. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Lucas > >>>>>>> > >>>>>>> This email was screened for spam and malicious content but exercise > >>>>>>> caution anyway. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina > >>>>>>> <d.gaspar...@gmail.com> wrote: > >>>>>>>> > >>>>>>>> Hi everyone, > >>>>>>>> > >>>>>>>> Following all the discussion on this KIP and KIP-1033, we introduced > >>>>>>>> a > >>>>>>>> new container class containing only processing context metadata: > >>>>>>>> ProcessingMetadata. This new container class is actually part of > >>>>>>>> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I > >>>>>>>> think it's the wisest implementation wise. > >>>>>>>> > >>>>>>>> I also clarified the interface of the enums: > >>>>>>>> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], > >>>>>>>> byte[]>> deadLetterQueueRecords) . Very likely most users would just > >>>>>>>> send one DLQ record, but there might be specific use-cases and what > >>>>>>>> can do more can do less, so I added an Iterable. > >>>>>>>> > >>>>>>>> I took some time to think about the impact of storing the > >>>>>>>> ProcessingMetadata on the ProductionExceptionHandler. I think storing > >>>>>>>> the topic/offset/partition should be fine, but I am concerned about > >>>>>>>> storing the rawSourceKey/Value. I think it could impact some specific > >>>>>>>> use-cases, for example, a high-throughput Kafka Streams application > >>>>>>>> "counting" messages could have huge source input messages, and very > >>>>>>>> small sink messages, here, I assume storing the rawSourceKey/Value > >>>>>>>> could significantly require more memory than the actual Kafka > >>>>>>>> Producer > >>>>>>>> buffer. > >>>>>>>> > >>>>>>>> I think the safest approach is actually to only store the fixed-size > >>>>>>>> metadata for the ProductionExceptionHandler.handle: > >>>>>>>> topic/partition/offset/processorNodeId/taskId, it might be confusing > >>>>>>>> for the user, but 1) it is still better than nowaday where there are > >>>>>>>> no context information at all, 2) it would be clearly stated in the > >>>>>>>> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the > >>>>>>>> punctuate case). . > >>>>>>>> > >>>>>>>> Do you think it would be a suitable design Sophie? > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Damien > >>>>>>>> > >>>>>>>> On Sun, 14 Apr 2024 at 21:30, Loic Greffier > >>>>>>>> <loic.greff...@michelin.com> wrote: > >>>>>>>>> > >>>>>>>>> Hi Sophie, > >>>>>>>>> > >>>>>>>>> Thanks for your feedback. > >>>>>>>>> Completing the Damien's comments here for points S1 and S5B. > >>>>>>>>> > >>>>>>>>> S1: > >>>>>>>>>> I'm confused -- are you saying that we're introducing a new kind > >>>>>>>>>> of ProducerRecord class for this? > >>>>>>>>> > >>>>>>>>> I am wondering if it makes sense to alter the ProducerRecord from > >>>>>>>>> Clients API with a "deadLetterQueueTopicName" attribute dedicated > >>>>>>>>> to Kafka Streams DLQ. > >>>>>>>>> Adding "deadLetterQueueTopicName" as an additional parameter to > >>>>>>>>> "withDeadLetterQueueRecord" is a good option, and may allow users > >>>>>>>>> to send records to different DLQ topics depending on conditions: > >>>>>>>>> @Override > >>>>>>>>> public ProductionExceptionHandlerResponse handle(final > >>>>>>>>> ProcessingContext context, > >>>>>>>>> ProducerRecord<byte[], byte[]> record, > >>>>>>>>> Exception exception) { > >>>>>>>>> if (condition1) { > >>>>>>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-a"); > >>>>>>>>> } > >>>>>>>>> if (condition2) { > >>>>>>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-b"); > >>>>>>>>> } > >>>>>>>>> return ProductionExceptionHandlerResponse.CONTINUE > >>>>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-c"); > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> S5B: > >>>>>>>>>> I was having a bit of trouble understanding what the behavior > >>>>>>>>>> would be if someone configured a > >>>>>>>>>> "errors.deadletterqueue.topic.name" but didn't implement the > >>>>>>>>>> handlers. > >>>>>>>>> > >>>>>>>>> The provided LogAndContinueExceptionHandler, > >>>>>>>>> LogAndFailExceptionHandler and DefaultProductionExceptionHandler > >>>>>>>>> should be able to tell if records should be sent to DLQ or not. > >>>>>>>>> The "errors.deadletterqueue.topic.name" takes place to: > >>>>>>>>> > >>>>>>>>> * Specifying if the provided handlers should or should not send > >>>>>>>>> records to DLQ. > >>>>>>>>> * If the value is empty, the handlers should not send records to > >>>>>>>>> DLQ. > >>>>>>>>> * If the value is not empty, the handlers should send records to > >>>>>>>>> DLQ. > >>>>>>>>> * Define the name of the DLQ topic that should be used by the > >>>>>>>>> provided handlers. > >>>>>>>>> > >>>>>>>>> Thus, if "errors.deadletterqueue.topic.name" is defined, the > >>>>>>>>> provided handlers should return either: > >>>>>>>>> > >>>>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue) > >>>>>>>>> * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue). > >>>>>>>>> If "errors.deadletterqueue.topic.name" is defined but neither > >>>>>>>>> DeserializationExceptionHandler nor ProductionExceptionHandler > >>>>>>>>> classes are defined in the configuration, then nothing should > >>>>>>>>> happen as sending to DLQ is based on handlers’ response. > >>>>>>>>> When providing custom handlers, users would have the possibility to > >>>>>>>>> return: > >>>>>>>>> > >>>>>>>>> * FAIL > >>>>>>>>> * CONTINUE > >>>>>>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > >>>>>>>>> > >>>>>>>>> A DLQ topic name is currently required using the two last response > >>>>>>>>> types. > >>>>>>>>> I am wondering if it could benefit users to ease the use of the > >>>>>>>>> default DLQ topic "errors.deadletterqueue.topic.name" when > >>>>>>>>> implementing custom handlers, with such kind of implementation: > >>>>>>>>> > >>>>>>>>> * FAIL.withDefaultDeadLetterQueueRecord(record) > >>>>>>>>> * CONTINUE.withDefaultDeadLetterQueueRecord(record) > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Loïc > >>>>>>>>> > >>>>>>>>> De : Damien Gasparina <d.gaspar...@gmail.com> > >>>>>>>>> Envoyé : dimanche 14 avril 2024 20:24 > >>>>>>>>> À : dev@kafka.apache.org > >>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka > >>>>>>>>> Streams > >>>>>>>>> > >>>>>>>>> Warning External sender Do not click on any links or open any > >>>>>>>>> attachments unless you trust the sender and know the content is > >>>>>>>>> safe. > >>>>>>>>> > >>>>>>>>> Hi Sophie, > >>>>>>>>> > >>>>>>>>> Thanks a lot for your feedback and your detailed comments. > >>>>>>>>> > >>>>>>>>> S1. > >>>>>>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>>>>>> ProducerRecord class for this? > >>>>>>>>> > >>>>>>>>> Sorry for the poor wording, that's not what I meant. While writing > >>>>>>>>> the > >>>>>>>>> KIP, I was hesitating between 1. leveraging the Kafka Producer > >>>>>>>>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name > >>>>>>>>> in > >>>>>>>>> a separate parameter, 3. a new custom interface (e.g. > >>>>>>>>> DeadLetterQueueRecord). > >>>>>>>>> As the KafkaProducer ProducerRecord is not used in the Kafka Streams > >>>>>>>>> API (except ProductionExceptionHandler) and I would like to avoid a > >>>>>>>>> new interface if not strictly required, I leaned toward option 2. > >>>>>>>>> Thinking about it, maybe option 1. would be best, but I assume it > >>>>>>>>> could create confusion with KafkaStreams ProducerRecord. Let me > >>>>>>>>> sleep > >>>>>>>>> on it. > >>>>>>>>> > >>>>>>>>> S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and > >>>>>>>>> your point in S4, it seems more and more likely that we will create > >>>>>>>>> a > >>>>>>>>> new container class containing only the metadata for the exception > >>>>>>>>> handlers. To be consistent, I think we should use this new > >>>>>>>>> implementation in all exception handlers. > >>>>>>>>> The only issue I could think off is that the new interface would > >>>>>>>>> expose less data than the current ProcessorContext in the > >>>>>>>>> DeserializationException(e.g. stateDir(), metrics(), > >>>>>>>>> getStateStore()), > >>>>>>>>> thus it could be hard for some users to migrate to the new > >>>>>>>>> interface. > >>>>>>>>> I do expect that only a few users would be impacted as the javadoc > >>>>>>>>> is > >>>>>>>>> very clear: `Note, that the passed in {@link ProcessorContext} only > >>>>>>>>> allows access to metadata like the task ID.` > >>>>>>>>> > >>>>>>>>> S3. I completely agree with you, it is something that might not be > >>>>>>>>> trivial and should be thoroughly covered by unit tests during the > >>>>>>>>> implementation. > >>>>>>>>> > >>>>>>>>> S4. Good point, I did not notice that the ProductionExceptionHandler > >>>>>>>>> is also invoked in the producer.send() callback. > >>>>>>>>> Capturing the ProcessingContext for each in-flight message is > >>>>>>>>> probably > >>>>>>>>> not possible. I think there is no other way to write a custom > >>>>>>>>> container class holding only the metadata that are essentials, I am > >>>>>>>>> thinking of storing the following attributes: source topic, > >>>>>>>>> partition, > >>>>>>>>> offset, rawKey, rawValue and taskId. > >>>>>>>>> Those metadata should be relatively small, but I assume that there > >>>>>>>>> could be a high number of in-flight messages, especially with at > >>>>>>>>> least > >>>>>>>>> once processing guarantee. Do you think it would be fine memory > >>>>>>>>> wise? > >>>>>>>>> > >>>>>>>>> S5. As many exceptions are only accessible in exception handlers, > >>>>>>>>> and > >>>>>>>>> we wanted to 1) allow users to customize the DLQ records and 2) > >>>>>>>>> have a > >>>>>>>>> suitable DLQ out of the box implementation, we felt it natural to > >>>>>>>>> rely > >>>>>>>>> on exception handlers, that's also why we created KIP-1033. > >>>>>>>>> Piggybacking on the enum response was the cleanest way we could > >>>>>>>>> think > >>>>>>>>> off, but we are completely open to suggestions. > >>>>>>>>> > >>>>>>>>> S5a. Completely agree with you on this point, for this DLQ approach > >>>>>>>>> to > >>>>>>>>> be complete, the ProcessingExceptionHandler introduced in KIP-1033 > >>>>>>>>> is > >>>>>>>>> required. KIP-1033 is definitely our first priority. We decided to > >>>>>>>>> kick-off the KIP-1034 discussion as we expected the discussions to > >>>>>>>>> be > >>>>>>>>> dynamic and could potentially impact some choices of KIP-1033. > >>>>>>>>> > >>>>>>>>> S5b. In this KIP, we wanted to 1. provide as much flexibility to the > >>>>>>>>> user as possible; 2. provide a good default implementation > >>>>>>>>> for the DLQ without having to write custom exception handlers. > >>>>>>>>> For the default implementation, we introduced a new configuration: > >>>>>>>>> errors.deadletterqueue.topic.name. > >>>>>>>>> > >>>>>>>>> If this configuration is set, it changes the behavior of the > >>>>>>>>> provided > >>>>>>>>> exception handlers to return a DLQ record containing the raw > >>>>>>>>> key/value > >>>>>>>>> + headers + exception metadata in headers. > >>>>>>>>> If the out of the box implementation is not suitable for a user, > >>>>>>>>> e.g. > >>>>>>>>> the payload needs to be masked in the DLQ, it could implement their > >>>>>>>>> own exception handlers. The errors.deadletterqueue.topic.name would > >>>>>>>>> only impact Kafka Streams bundled exception handlers (e.g. > >>>>>>>>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler) > >>>>>>>>> > >>>>>>>>> Let me update the KIP to make it clear and also provide examples. > >>>>>>>>> > >>>>>>>>> S6/S7. Good point, mea culpa for the camel case, it must have been a > >>>>>>>>> sugar rush :) > >>>>>>>>> > >>>>>>>>> Thanks again for your detailed comments and pointing out S4 > >>>>>>>>> (production exception & Processing Context)! > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Damien > >>>>>>>>> This email was screened for spam and malicious content but exercise > >>>>>>>>> caution anyway. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman > >>>>>>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote: > >>>>>>>>>> > >>>>>>>>>> Thanks for the KIP, this will make a lot of people very happy. > >>>>>>>>>> > >>>>>>>>>> Wanted to chime in on a few points that have been raised so far > >>>>>>>>>> and add > >>>>>>>>>> some of my own (numbering with an S to distinguish my points from > >>>>>>>>>> the > >>>>>>>>>> previous ones) > >>>>>>>>>> > >>>>>>>>>> S1. > >>>>>>>>>> > >>>>>>>>>>> 1.a I really meant ProducerRecord, that's the class used to > >>>>>>>>>>> forward to > >>>>>>>>>>> downstream processors in the PAPI. The only information missing in > >>>>>>>>>>> this class is the topic name. I also considered relying on the > >>>>>>>>>>> Kafka > >>>>>>>>>>> Producer ProducerRecord, but I assume it would not be consistent > >>>>>>>>>>> with > >>>>>>>>>>> the KafkaStreams API. > >>>>>>>>>> > >>>>>>>>>> I'm confused -- are you saying that we're introducing a new kind of > >>>>>>>>>> ProducerRecord class for this? Why not just use the existing one, > >>>>>>>>>> ie the > >>>>>>>>>> o.a.k.clients.producer.ProducerRecord class? This is what the > >>>>>>>>>> ProductionExceptionHandler uses, so it's definitely "consistent". > >>>>>>>>>> In other > >>>>>>>>>> words, we can remove the "String deadLetterQueueTopicName" > >>>>>>>>>> > >>>>>>>>>> S2. > >>>>>>>>>> I think this would be a good opportunity to also deprecate the > >>>>>>>>>> existing > >>>>>>>>>> #handle method of the DeserializationExceptionHandler, and replace > >>>>>>>>>> it with > >>>>>>>>>> one that uses a ProcessingContext instead of the ProcessorContext. > >>>>>>>>>> Partly > >>>>>>>>>> for the same reasons about guarding access to the #forward > >>>>>>>>>> methods, partly > >>>>>>>>>> because this method needs to be migrated to the new PAPI interface > >>>>>>>>>> anyways, and ProcessingContext is part of the new one. > >>>>>>>>>> > >>>>>>>>>> S3. > >>>>>>>>>> Regarding 2a. -- I'm inclined to agree that records which a > >>>>>>>>>> Punctuator > >>>>>>>>>> failed to produce should also be sent to the DLQ via the > >>>>>>>>>> ProductionExceptionHandler. Users will just need to be careful > >>>>>>>>>> about > >>>>>>>>>> accessing certain fields of the ProcessingContext that aren't > >>>>>>>>>> available in > >>>>>>>>>> the punctuator, and need to check the Optional returned by the > >>>>>>>>>> ProcessingContext#recordMetadata API. > >>>>>>>>>> Also, from an implementation standpoint, it will be really hard to > >>>>>>>>>> distinguish between a record created by a punctuator vs a > >>>>>>>>>> processor from > >>>>>>>>>> within the RecordCollector, which is the class that actually > >>>>>>>>>> handles > >>>>>>>>>> sending records to the Streams Producer and invoking the > >>>>>>>>>> ProductionExceptionHandler. This is because the RecordCollector is > >>>>>>>>>> at the > >>>>>>>>>> "end" of the topology graph and doesn't have any context about > >>>>>>>>>> which of the > >>>>>>>>>> upstream processors actually attempted to forward a record. > >>>>>>>>>> > >>>>>>>>>> This in itself is at least theoretically solvable, but it leads > >>>>>>>>>> into my > >>>>>>>>>> first major new point: > >>>>>>>>>> > >>>>>>>>>> S4: > >>>>>>>>>> I'm deeply worried about passing the ProcessingContext in as a > >>>>>>>>>> means of > >>>>>>>>>> forwarding metadata. The problem is that the processing/processor > >>>>>>>>>> context > >>>>>>>>>> is a mutable class and is inherently meaningless outside the > >>>>>>>>>> context of a > >>>>>>>>>> specific task. And when I said earlier that the RecordCollector > >>>>>>>>>> sits at > >>>>>>>>>> the "end" of the topology, I meant that it's literally outside the > >>>>>>>>>> task's > >>>>>>>>>> subtopology and is used/shared by all tasks on that StreamThread. > >>>>>>>>>> So to > >>>>>>>>>> begin with, there's no guarantee what will actually be returned for > >>>>>>>>>> essential methods such as the new #rawSourceKey/Value or the > >>>>>>>>>> existing > >>>>>>>>>> #recordMetadata > >>>>>>>>>> > >>>>>>>>>> For serialization exceptions it'll probably be correct, but for > >>>>>>>>>> general > >>>>>>>>>> send errors it almost definitely won't be. In short, this is > >>>>>>>>>> because we > >>>>>>>>>> send records to the producer after the sink node, but don't check > >>>>>>>>>> for send > >>>>>>>>>> errors right away since obviously it takes some time for the > >>>>>>>>>> producer to > >>>>>>>>>> actually send. In other words, sending/producing records is > >>>>>>>>>> actually done > >>>>>>>>>> asynchronously with processing, and we simply check for errors on > >>>>>>>>>> any > >>>>>>>>>> previously-sent records > >>>>>>>>>> during the send on a new record in a sink node. This means the > >>>>>>>>>> context we > >>>>>>>>>> would be passing in to a (non-serialization) exception would > >>>>>>>>>> pretty much > >>>>>>>>>> always correspond not the the record that experienced the error, > >>>>>>>>>> but the > >>>>>>>>>> random record that happened to be being sent when we checked and > >>>>>>>>>> saw the > >>>>>>>>>> error for the failed record. > >>>>>>>>>> > >>>>>>>>>> This discrepancy, in addition to the whole "sourceRawKey/Value and > >>>>>>>>>> recordMetadata are null for punctuators" issue, seems like an > >>>>>>>>>> insurmountable inconsistency that is more likely to cause users > >>>>>>>>>> confusion > >>>>>>>>>> or problems than be helpful. > >>>>>>>>>> We could create a new metadata object and copy over the relevant > >>>>>>>>>> info from > >>>>>>>>>> the ProcessingContext, but I worry that has the potential to > >>>>>>>>>> explode memory > >>>>>>>>>> since we'd need to hold on to it for all in-flight records up > >>>>>>>>>> until they > >>>>>>>>>> are either successfully sent or failed and passed in to the > >>>>>>>>>> ProductionExceptionHandler. But if the metadata is relatively > >>>>>>>>>> small, it's > >>>>>>>>>> probably fine. Especially if it's just the raw source key/value. > >>>>>>>>>> Are > >>>>>>>>>> there any other parts of the ProcessingContext you think should be > >>>>>>>>>> made > >>>>>>>>>> available? > >>>>>>>>>> > >>>>>>>>>> Note that this only applies to the ProductionExceptionHandler, as > >>>>>>>>>> the > >>>>>>>>>> DeserializationExceptionHandler (and the newly proposed > >>>>>>>>>> ProcessingExceptionHandler) would both be invoked immediately and > >>>>>>>>>> therefore > >>>>>>>>>> with the failed record's context. However, I'm also a bit > >>>>>>>>>> uncomfortable > >>>>>>>>>> with adding the rawSourceKey/rawSourceValue to the > >>>>>>>>>> ProcessingContext. So > >>>>>>>>>> I'd propose to just wrap those (and any other metadata you might > >>>>>>>>>> want) in a > >>>>>>>>>> container class and pass that in instead of the ProcessingContext, > >>>>>>>>>> to all > >>>>>>>>>> of the exception handlers. > >>>>>>>>>> > >>>>>>>>>> S5: > >>>>>>>>>> For some reason I'm finding the proposed API a little bit awkward, > >>>>>>>>>> although > >>>>>>>>>> it's entirely possible that the problem is with me, not the > >>>>>>>>>> proposal :) > >>>>>>>>>> Specifically I'm struggling with the approach of piggybacking on > >>>>>>>>>> the > >>>>>>>>>> exception handlers and their response enums to dictate how records > >>>>>>>>>> are > >>>>>>>>>> forwarded to the DLQ. I think this comes down to two things, > >>>>>>>>>> though again, > >>>>>>>>>> these aren't necessarily problems with the API and probably just > >>>>>>>>>> need to be > >>>>>>>>>> hashed out: > >>>>>>>>>> > >>>>>>>>>> S5a. > >>>>>>>>>> When I envision a DLQ, to me, the most common use case would be to > >>>>>>>>>> forward > >>>>>>>>>> input records that failed somewhere along the processing graph. > >>>>>>>>>> But it > >>>>>>>>>> seems like all the focus here is on the two far ends of the > >>>>>>>>>> subtopology -- > >>>>>>>>>> the input/consumer, and the output/producer. I get that > >>>>>>>>>> the ProcessingExceptionHandler is really the missing piece here, > >>>>>>>>>> and it's > >>>>>>>>>> hard to say anything specific since it's not yet accepted, but > >>>>>>>>>> maybe a > >>>>>>>>>> somewhat more concrete example would help. FWIW I think/hope to > >>>>>>>>>> get that > >>>>>>>>>> KIP accepted and implementation ASAP, so I'm not worried about the > >>>>>>>>>> "what if > >>>>>>>>>> it doesn't happen" case -- more just want to know what it will > >>>>>>>>>> look like > >>>>>>>>>> when it does. Imo it's fine to build KIPs on top of future ones, > >>>>>>>>>> it feels > >>>>>>>>>> clear that this part will just have to wait for that KIP to > >>>>>>>>>> actually be > >>>>>>>>>> added. > >>>>>>>>>> > >>>>>>>>>> S5b: > >>>>>>>>>> Why do users have to define the entire ProducerRecord -- shouldn't > >>>>>>>>>> Streams > >>>>>>>>>> handle all this for them? Or will we just automatically send every > >>>>>>>>>> record > >>>>>>>>>> on failure to the default global DLQ, and users only have to > >>>>>>>>>> implement the > >>>>>>>>>> handlers if they want to change the headers or send to a different > >>>>>>>>>> topic? I > >>>>>>>>>> was having a bit of trouble understanding what the behavior would > >>>>>>>>>> be if > >>>>>>>>>> someone configured a "errors.deadletterqueue.topic.name" but didn't > >>>>>>>>>> implement the handlers. Apologies if it's somewhere in the KIP and > >>>>>>>>>> I > >>>>>>>>>> happened to miss it! > >>>>>>>>>> > >>>>>>>>>> Either way, I really think an example would help me to better > >>>>>>>>>> imagine what > >>>>>>>>>> this will look like in practice, and evaluate whether it actually > >>>>>>>>>> involves > >>>>>>>>>> as much overhead as I'm worried it will. Can you add a section that > >>>>>>>>>> includes a basic implementation of all the features here? Nothing > >>>>>>>>>> too > >>>>>>>>>> complicated, just the most bare-bones code needed to actually > >>>>>>>>>> implement > >>>>>>>>>> forwarding to a dead-letter-queue via the handlers. > >>>>>>>>>> > >>>>>>>>>> Lastly, two super small things: > >>>>>>>>>> > >>>>>>>>>> S6: > >>>>>>>>>> We use camel case in Streams, so it should be rawSourceKey/Value > >>>>>>>>>> rather > >>>>>>>>>> than raw_source_key/value > >>>>>>>>>> > >>>>>>>>>> S7: > >>>>>>>>>> Can you add javadocs for the #withDeadLetterQueueRecord? For > >>>>>>>>>> example, it > >>>>>>>>>> seems to me that if the topic to be sent to here is different than > >>>>>>>>>> the > >>>>>>>>>> default/global DLQ, then the user will need to make sure to have > >>>>>>>>>> created > >>>>>>>>>> this themselves up front. > >>>>>>>>>> > >>>>>>>>>> That's it from me...sorry for the long response, it's just because > >>>>>>>>>> I'm > >>>>>>>>>> excited for this feature and have been waiting on a KIP for this > >>>>>>>>>> for years. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Sophie > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina > >>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Andrew, > >>>>>>>>>>> > >>>>>>>>>>> Thanks a lot for your review, plenty of good points! > >>>>>>>>>>> > >>>>>>>>>>> 11. Typo fixed, good cach. > >>>>>>>>>>> > >>>>>>>>>>> 12. I do agree with you and Nick also mentioned it, I updated the > >>>>>>>>>>> KIP > >>>>>>>>>>> to mention that context headers should be forwarded. > >>>>>>>>>>> > >>>>>>>>>>> 13. Good catch, to be consistent with KIP-298, and without a > >>>>>>>>>>> strong > >>>>>>>>>>> opinion from my side, I updated the KIP with your prefix proposal. > >>>>>>>>>>> > >>>>>>>>>>> 14. I am not sure about this point, a big difference between > >>>>>>>>>>> KIP-298 > >>>>>>>>>>> and this one is that the handlers can easily be overridden, > >>>>>>>>>>> something > >>>>>>>>>>> that is not doable in Kafka Connect. > >>>>>>>>>>> If someone would like a different behavior, e.g. to mask the > >>>>>>>>>>> payload > >>>>>>>>>>> or include further headers, I think we should encourage them to > >>>>>>>>>>> write > >>>>>>>>>>> their own exception handlers to build the DLQ Record the way they > >>>>>>>>>>> expect. > >>>>>>>>>>> > >>>>>>>>>>> 15. Yeah, that's a good point, I was not fully convinced about > >>>>>>>>>>> putting > >>>>>>>>>>> a String in it, I do assume that "null" is also a valid value. I > >>>>>>>>>>> do > >>>>>>>>>>> assume that the Stacktrace and the Exception in this case are the > >>>>>>>>>>> key > >>>>>>>>>>> metadata for the user to troubleshoot the problem. > >>>>>>>>>>> I updated the KIP to mention that the value should be null if > >>>>>>>>>>> triggered in a punctuate. > >>>>>>>>>>> > >>>>>>>>>>> 16. I added a session to mention that Kafka Streams would not try > >>>>>>>>>>> to > >>>>>>>>>>> automatically create the topic and the topic should either be > >>>>>>>>>>> automatically created, or pre-created. > >>>>>>>>>>> > >>>>>>>>>>> 17. If a DLQ record can not be sent, the exception should go to > >>>>>>>>>>> the > >>>>>>>>>>> uncaughtExceptionHandler. Let me clearly state it in the KIP. > >>>>>>>>>>> > >>>>>>>>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina > >>>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Good point, that's less impactful than a custom interface, I > >>>>>>>>>>>> just > >>>>>>>>>>>> updated the KIP with the new signature. > >>>>>>>>>>>> > >>>>>>>>>>>> 1.a I really meant ProducerRecord, that's the class used to > >>>>>>>>>>>> forward to > >>>>>>>>>>>> downstream processors in the PAPI. The only information missing > >>>>>>>>>>>> in > >>>>>>>>>>>> this class is the topic name. I also considered relying on the > >>>>>>>>>>>> Kafka > >>>>>>>>>>>> Producer ProducerRecord, but I assume it would not be consistent > >>>>>>>>>>>> with > >>>>>>>>>>>> the KafkaStreams API. > >>>>>>>>>>>> > >>>>>>>>>>>> 2. Agreed > >>>>>>>>>>>> > >>>>>>>>>>>> 2.a I do think exceptions occurring during punctuate should be > >>>>>>>>>>>> included in the DLQ. > >>>>>>>>>>>> Even if building a suitable payload is almost impossible, even > >>>>>>>>>>>> with > >>>>>>>>>>>> custom code; those exceptions are still fatal for Kafka Streams > >>>>>>>>>>>> by > >>>>>>>>>>>> default and are something that can not be ignored safely. > >>>>>>>>>>>> I do assume that most users would want to be informed if an error > >>>>>>>>>>>> happened during a punctuate, even if only the metadata (e.g. > >>>>>>>>>>>> stacktrace, exception) is provided. > >>>>>>>>>>>> I am only concerned flooding the DLQ topic as, if a scheduled > >>>>>>>>>>>> operation failed, very likely it will fails during the next > >>>>>>>>>>>> invocation, but > >>>>>>>>>>>> > >>>>>>>>>>>> 4. Good point, I clarified the wording in the KIP to make it > >>>>>>>>>>>> explicit. > >>>>>>>>>>>> > >>>>>>>>>>>> 5. Good point, I will clearly mention that it is out of scope as > >>>>>>>>>>>> part > >>>>>>>>>>>> of the KIP and might not be as trivial as people could expect. I > >>>>>>>>>>>> will > >>>>>>>>>>>> update the KIP once I do have some spare time. > >>>>>>>>>>>> > >>>>>>>>>>>> 6. Oh yeah, I didn't think about it, but forwarding input headers > >>>>>>>>>>>> would definitely make sense. Confluent Schema Registry ID is > >>>>>>>>>>>> actually > >>>>>>>>>>>> part of the payload, but many correlation ID and technical > >>>>>>>>>>>> metadata > >>>>>>>>>>>> are passed through headers, it makes sense to forward them, > >>>>>>>>>>>> specially > >>>>>>>>>>>> as it is the default behavior of Kafka Streams, > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford > >>>>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Damien and Sebastien, > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. > >>>>>>>>>>>>> I think you can just add a `String topic` argument to the > >>>>>>>>>>>>> existing > >>>>>>>>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>>>>>>>> deadLetterQueueRecord)` method, and then the implementation of > >>>>>>>>>>>>> the > >>>>>>>>>>>>> exception handler could choose the topic to send records to > >>>>>>>>>>>>> using > >>>>>>>>>>> whatever > >>>>>>>>>>>>> logic the user desires. You could perhaps provide a built-in > >>>>>>>>>>> implementation > >>>>>>>>>>>>> that leverages your new config to send all records to an > >>>>>>>>>>>>> untyped DLQ > >>>>>>>>>>> topic? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1a. > >>>>>>>>>>>>> BTW you have a typo: in your DeserializationExceptionHandler, > >>>>>>>>>>>>> the type > >>>>>>>>>>> of > >>>>>>>>>>>>> your `deadLetterQueueRecord` argument is `ProducerRecord`, when > >>>>>>>>>>>>> it > >>>>>>>>>>> should > >>>>>>>>>>>>> probably be `ConsumerRecord`. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. > >>>>>>>>>>>>> Agreed. I think it's a good idea to provide an implementation > >>>>>>>>>>>>> that > >>>>>>>>>>> sends to > >>>>>>>>>>>>> a single DLQ by default, but it's important to enable users to > >>>>>>>>>>> customize > >>>>>>>>>>>>> this with their own exception handlers. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2a. > >>>>>>>>>>>>> I'm not convinced that "errors" (e.g. failed punctuate) should > >>>>>>>>>>>>> be sent > >>>>>>>>>>> to a > >>>>>>>>>>>>> DLQ topic like it's a bad record. To me, a DLQ should only > >>>>>>>>>>>>> contain > >>>>>>>>>>> records > >>>>>>>>>>>>> that failed to process. I'm not even sure how a user would > >>>>>>>>>>>>> re-process/action one of these other errors; it seems like the > >>>>>>>>>>>>> purview > >>>>>>>>>>> of > >>>>>>>>>>>>> error logging to me? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4. > >>>>>>>>>>>>> My point here was that I think it would be useful for the KIP to > >>>>>>>>>>> contain an > >>>>>>>>>>>>> explanation of the behavior both with KIP-1033 and without it. > >>>>>>>>>>>>> i.e. > >>>>>>>>>>> clarify > >>>>>>>>>>>>> if/how records that throw an exception in a processor are > >>>>>>>>>>>>> handled. At > >>>>>>>>>>> the > >>>>>>>>>>>>> moment, I'm assuming that without KIP-1033, processing > >>>>>>>>>>>>> exceptions > >>>>>>>>>>> would not > >>>>>>>>>>>>> cause records to be sent to the DLQ, but with KIP-1033, they > >>>>>>>>>>>>> would. If > >>>>>>>>>>> this > >>>>>>>>>>>>> assumption is correct, I think it should be made explicit in > >>>>>>>>>>>>> the KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 5. > >>>>>>>>>>>>> Understood. You may want to make this explicit in the > >>>>>>>>>>>>> documentation for > >>>>>>>>>>>>> users, so they understand the consequences of re-processing > >>>>>>>>>>>>> data sent > >>>>>>>>>>> to > >>>>>>>>>>>>> their DLQ. The main reason I raised this point is it's > >>>>>>>>>>>>> something that's > >>>>>>>>>>>>> tripped me up in numerous KIPs that that committers frequently > >>>>>>>>>>>>> remind > >>>>>>>>>>> me > >>>>>>>>>>>>> of; so I wanted to get ahead of it for once! :D > >>>>>>>>>>>>> > >>>>>>>>>>>>> And one new point: > >>>>>>>>>>>>> 6. > >>>>>>>>>>>>> The DLQ record schema appears to discard all custom headers set > >>>>>>>>>>>>> on the > >>>>>>>>>>>>> source record. Is there a way these can be included? In > >>>>>>>>>>>>> particular, I'm > >>>>>>>>>>>>> concerned with "schema pointer" headers (like those set by > >>>>>>>>>>>>> Schema > >>>>>>>>>>>>> Registry), that may need to be propagated, especially if the > >>>>>>>>>>>>> records > >>>>>>>>>>> are > >>>>>>>>>>>>> fed back into the source topics for re-processing by the user. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Nick > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina > >>>>>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks a lot for your review and your useful comments! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. It is a good point, as you mentioned, I think it would make > >>>>>>>>>>>>>> sense > >>>>>>>>>>>>>> in some use cases to have potentially multiple DLQ topics, so > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>> should provide an API to let users do it. > >>>>>>>>>>>>>> Thinking out-loud here, maybe it is a better approach to > >>>>>>>>>>>>>> create a new > >>>>>>>>>>>>>> Record class containing the topic name, e.g. > >>>>>>>>>>>>>> DeadLetterQueueRecord > >>>>>>>>>>> and > >>>>>>>>>>>>>> changing the signature to > >>>>>>>>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord> > >>>>>>>>>>>>>> deadLetterQueueRecords) instead of > >>>>>>>>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > >>>>>>>>>>>>>> deadLetterQueueRecord). What do you think? > >>>>>>>>>>>>>> DeadLetterQueueRecord > >>>>>>>>>>> would > >>>>>>>>>>>>>> be something like "class DeadLetterQueueRecord extends > >>>>>>>>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords { > >>>>>>>>>>>>>> String > >>>>>>>>>>>>>> topic; /* + getter/setter + */ } " > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. I think the root question here is: should we have one DLQ > >>>>>>>>>>>>>> topic or > >>>>>>>>>>>>>> multiple DLQ topics by default. This question highly depends > >>>>>>>>>>>>>> on the > >>>>>>>>>>>>>> context, but implementing a default implementation to handle > >>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>> DLQ topics would be opinionated, e.g. how to manage errors in a > >>>>>>>>>>>>>> punctuate? > >>>>>>>>>>>>>> I think it makes sense to have the default implementation > >>>>>>>>>>>>>> writing all > >>>>>>>>>>>>>> faulty records to a single DLQ, that's at least the approach I > >>>>>>>>>>>>>> used > >>>>>>>>>>> in > >>>>>>>>>>>>>> past applications: one DLQ per Kafka Streams application. Of > >>>>>>>>>>>>>> course > >>>>>>>>>>>>>> the message format could change in the DLQ e.g. due to the > >>>>>>>>>>>>>> source > >>>>>>>>>>>>>> topic, but those DLQ records will be very likely > >>>>>>>>>>>>>> troubleshooted, and > >>>>>>>>>>>>>> maybe replay, manually anyway. > >>>>>>>>>>>>>> If a user needs to have multiple DLQ topics or want to enforce > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>> specific schema, it's still possible, but they would need to > >>>>>>>>>>> implement > >>>>>>>>>>>>>> custom Exception Handlers. > >>>>>>>>>>>>>> Coming back to 1. I do agree that it would make sense to have > >>>>>>>>>>>>>> the > >>>>>>>>>>> user > >>>>>>>>>>>>>> set the DLQ topic name in the handlers for more flexibility. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. Good point, sorry it was a typo, the ProcessingContext > >>>>>>>>>>>>>> makes much > >>>>>>>>>>>>>> more sense here indeed. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4. I do assume that we could implement KIP-1033 (Processing > >>>>>>>>>>>>>> exception > >>>>>>>>>>>>>> handler) independently from KIP-1034. I do hope that KIP-1033 > >>>>>>>>>>>>>> would > >>>>>>>>>>> be > >>>>>>>>>>>>>> adopted and implemented before KIP-1034, but if that's not the > >>>>>>>>>>>>>> case, > >>>>>>>>>>>>>> we could implement KIP-1034 indepantly and update KIP-1033 to > >>>>>>>>>>>>>> include > >>>>>>>>>>>>>> the DLQ record afterward (in the same KIP or in a new one if > >>>>>>>>>>>>>> not > >>>>>>>>>>>>>> possible). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 5. I think we should be clear that this KIP only covers the DLQ > >>>>>>>>>>> record > >>>>>>>>>>>>>> produced. > >>>>>>>>>>>>>> Everything related to replay messages or recovery plan should > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>> considered out-of-scope as it is use-case and error specific. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Let me know if that's not clear, there are definitely points > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>> highly debatable. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>> Damien > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford > >>>>>>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Oh, and one more thing: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 5. > >>>>>>>>>>>>>>> Whenever you take a record out of the stream, and then > >>>>>>>>>>>>>>> potentially > >>>>>>>>>>>>>>> re-introduce it at a later date, you introduce the potential > >>>>>>>>>>>>>>> for > >>>>>>>>>>> record > >>>>>>>>>>>>>>> ordering issues. For example, that record could have been > >>>>>>>>>>>>>>> destined > >>>>>>>>>>> for a > >>>>>>>>>>>>>>> Window that has been closed by the time it's re-processed. I'd > >>>>>>>>>>> like to > >>>>>>>>>>>>>> see > >>>>>>>>>>>>>>> a section that considers these consequences, and perhaps make > >>>>>>>>>>> those risks > >>>>>>>>>>>>>>> clear to users. For the record, this is exactly what sunk > >>>>>>>>>>>>>>> KIP-990, > >>>>>>>>>>> which > >>>>>>>>>>>>>>> was an alternative approach to error handling that introduced > >>>>>>>>>>>>>>> the > >>>>>>>>>>> same > >>>>>>>>>>>>>>> issues. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford > >>>>>>>>>>>>>>> <nick.telf...@gmail.com > >>>>>>>>> <mailto:nick.telf...@gmail.com%0b>> > > > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Damien, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the KIP! Dead-letter queues are something that I > >>>>>>>>>>> think a > >>>>>>>>>>>>>> lot of > >>>>>>>>>>>>>>>> users would like. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I think there are a few points with this KIP that concern me: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>> It looks like you can only define a single, global DLQ for > >>>>>>>>>>>>>>>> the > >>>>>>>>>>> entire > >>>>>>>>>>>>>>>> Kafka Streams application? What about applications that would > >>>>>>>>>>> like to > >>>>>>>>>>>>>>>> define different DLQs for different data flows? This is > >>>>>>>>>>> especially > >>>>>>>>>>>>>>>> important when dealing with multiple source topics that have > >>>>>>>>>>> different > >>>>>>>>>>>>>>>> record schemas. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>> Your DLQ payload value can either be the record value that > >>>>>>>>>>> failed, or > >>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>> error string (such as "error during punctuate"). This is > >>>>>>>>>>>>>>>> likely > >>>>>>>>>>> to > >>>>>>>>>>>>>> cause > >>>>>>>>>>>>>>>> problems when users try to process the records from the DLQ, > >>>>>>>>>>>>>>>> as > >>>>>>>>>>> they > >>>>>>>>>>>>>> can't > >>>>>>>>>>>>>>>> guarantee the format of every record value will be the same. > >>>>>>>>>>> This is > >>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>> loosely related to point 1. above. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 3. > >>>>>>>>>>>>>>>> You provide a ProcessorContext to both exception handlers, > >>>>>>>>>>>>>>>> but > >>>>>>>>>>> state > >>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>> cannot be used to forward records. In that case, I believe > >>>>>>>>>>>>>>>> you > >>>>>>>>>>> should > >>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>> ProcessingContext instead, which statically guarantees that > >>>>>>>>>>>>>>>> it > >>>>>>>>>>> can't be > >>>>>>>>>>>>>>>> used to forward records. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 4. > >>>>>>>>>>>>>>>> You mention the KIP-1033 ProcessingExceptionHandler, but > >>>>>>>>>>>>>>>> what's > >>>>>>>>>>> the > >>>>>>>>>>>>>> plan > >>>>>>>>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina < > >>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> In a general way, if the user does not configure the right > >>>>>>>>>>>>>>>>> ACL, > >>>>>>>>>>> that > >>>>>>>>>>>>>>>>> would be a security issue, but that's true for any topic. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> This KIP allows users to configure a Dead Letter Queue > >>>>>>>>>>>>>>>>> without > >>>>>>>>>>> writing > >>>>>>>>>>>>>>>>> custom Java code in Kafka Streams, not at the topic level. > >>>>>>>>>>>>>>>>> A lot of applications are already implementing this pattern, > >>>>>>>>>>> but the > >>>>>>>>>>>>>>>>> required code to do it is quite painful and error prone, for > >>>>>>>>>>> example > >>>>>>>>>>>>>>>>> most apps I have seen created a new KafkaProducer to send > >>>>>>>>>>> records to > >>>>>>>>>>>>>>>>> their DLQ. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> As it would be disabled by default for backward > >>>>>>>>>>>>>>>>> compatibility, > >>>>>>>>>>> I doubt > >>>>>>>>>>>>>>>>> it would generate any security concern. > >>>>>>>>>>>>>>>>> If a user explicitly configures a Deal Letter Queue, it > >>>>>>>>>>>>>>>>> would > >>>>>>>>>>> be up to > >>>>>>>>>>>>>>>>> him to configure the relevant ACLs to ensure that the right > >>>>>>>>>>> principal > >>>>>>>>>>>>>>>>> can access it. > >>>>>>>>>>>>>>>>> It is already the case for all internal, input and output > >>>>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>> Streams topics (e.g. repartition, changelog topics) that > >>>>>>>>>>>>>>>>> also > >>>>>>>>>>> could > >>>>>>>>>>>>>>>>> contain confidential data, so I do not think we should > >>>>>>>>>>> implement a > >>>>>>>>>>>>>>>>> different behavior for this one. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> In this KIP, we configured the default DLQ record to have > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>> initial > >>>>>>>>>>>>>>>>> record key/value as we assume that it is the expected and > >>>>>>>>>>>>>>>>> wanted > >>>>>>>>>>>>>>>>> behavior for most applications. > >>>>>>>>>>>>>>>>> If a user does not want to have the key/value in the DLQ > >>>>>>>>>>>>>>>>> record > >>>>>>>>>>> for > >>>>>>>>>>>>>>>>> any reason, they could still implement exception handlers to > >>>>>>>>>>> build > >>>>>>>>>>>>>>>>> their own DLQ record. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regarding ACL, maybe something smarter could be done in > >>>>>>>>>>>>>>>>> Kafka > >>>>>>>>>>> Streams, > >>>>>>>>>>>>>>>>> but this is out of scope for this KIP. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren > >>>>>>>>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> My concern is that someone would create a dead letter queue > >>>>>>>>>>> on a > >>>>>>>>>>>>>>>>> sensitive > >>>>>>>>>>>>>>>>>> topic and not get the ACL correct from the start. Thus > >>>>>>>>>>> causing > >>>>>>>>>>>>>>>>> potential > >>>>>>>>>>>>>>>>>> confidential data leak. Is there anything in the proposal > >>>>>>>>>>> that > >>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>> prevent that from happening? If so I did not recognize it > >>>>>>>>>>>>>>>>>> as > >>>>>>>>>>> such. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina < > >>>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Claude, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> In this KIP, the Dead Letter Queue is materialized by a > >>>>>>>>>>> standard > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> independant topic, thus normal ACL applies to it like any > >>>>>>>>>>> other > >>>>>>>>>>>>>> topic. > >>>>>>>>>>>>>>>>>>> This should not introduce any security issues, obviously, > >>>>>>>>>>> the > >>>>>>>>>>>>>> right > >>>>>>>>>>>>>>>>>>> ACL would need to be provided to write to the DLQ if > >>>>>>>>>>> configured. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> Damien > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > >>>>>>>>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I am new to the Kafka codebase so please excuse any > >>>>>>>>>>> ignorance > >>>>>>>>>>>>>> on my > >>>>>>>>>>>>>>>>> part. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> When a dead letter queue is established is there a > >>>>>>>>>>> process to > >>>>>>>>>>>>>>>>> ensure that > >>>>>>>>>>>>>>>>>>>> it at least is defined with the same ACL as the original > >>>>>>>>>>> queue? > >>>>>>>>>>>>>>>>> Without > >>>>>>>>>>>>>>>>>>>> such a guarantee at the start it seems that managing dead > >>>>>>>>>>> letter > >>>>>>>>>>>>>>>>> queues > >>>>>>>>>>>>>>>>>>>> will be fraught with security issues. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < > >>>>>>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> To continue on our effort to improve Kafka Streams error > >>>>>>>>>>>>>>>>> handling, we > >>>>>>>>>>>>>>>>>>>>> propose a new KIP to add out of the box support for Dead > >>>>>>>>>>>>>> Letter > >>>>>>>>>>>>>>>>> Queue. > >>>>>>>>>>>>>>>>>>>>> The goal of this KIP is to provide a default > >>>>>>>>>>> implementation > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> should be suitable for most applications and allow > >>>>>>>>>>> users to > >>>>>>>>>>>>>>>>> override > >>>>>>>>>>>>>>>>>>>>> it if they have specific requirements. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> In order to build a suitable payload, some additional > >>>>>>>>>>> changes > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>> included in this KIP: > >>>>>>>>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, when > >>>>>>>>>>> available, the > >>>>>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>>>> node raw key/value byte[] > >>>>>>>>>>>>>>>>>>>>> 2. expose the ProcessingContext to the > >>>>>>>>>>>>>>>>> ProductionExceptionHandler, > >>>>>>>>>>>>>>>>>>>>> it is currently not available in the handle parameters. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Regarding point 2., to expose the ProcessingContext to > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler, we considered two choices: > >>>>>>>>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a parameter in > >>>>>>>>>>> the > >>>>>>>>>>>>>> handle() > >>>>>>>>>>>>>>>>>>>>> method. That's the cleanest way IMHO, but we would need > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>> deprecate > >>>>>>>>>>>>>>>>>>>>> the old method. > >>>>>>>>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an attribute in > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> interface. > >>>>>>>>>>>>>>>>>>>>> This way, no method is deprecated, but we would not be > >>>>>>>>>>>>>> consistent > >>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>> the other ExceptionHandler. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new handle > >>>>>>>>>>> signature > >>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>> old > >>>>>>>>>>>>>>>>>>>>> one deprecated), but we could use other opinions on > >>>>>>>>>>> this part. > >>>>>>>>>>>>>>>>>>>>> More information is available directly on the KIP. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> KIP link: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Feedbacks and suggestions are welcome, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>> Damien, Sebastien and Loic > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> LinkedIn: > >>>>>>>>>>>>>>>>>> http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>