Hi, I have updated the KIP with the correct names!
Thanks, Bruno, for the clarification regarding Result#from(XxxExceptionHandlerResponse). We can also make it private, as there is no reason for a user to access it. The @deprecated is no longer needed. regards Sébastien ________________________________ De : Bruno Cadonna <cado...@apache.org> Envoyé : jeudi 19 décembre 2024 09:31 À : dev@kafka.apache.org <dev@kafka.apache.org> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi, Putting Result#from(XxxExceptionHandlerResponse) into the interface and deprecating it was kind of my proposal. My thought was to avoid importing internal classes into the public interface and to have everything that is needed to understand the default implementation of the new methods in one place. Result#from(XxxExceptionHandlerResponse) will be removed with all other deprecated methods in this interface, so it does not really make a big difference adding Result#from(XxxExceptionHandlerResponse) to the interface and immediately deprecating it. The value of having everything in one place seems greater to me than avoiding an additional deprecated method. However, I am fine either way. Best, Bruno > Do we need the deprecated methods > `Result#from(XxxExceptionHandlerResponse)`? It seems we could add these > helpers to some internal class; it seems to be more of an implemenation > datail -- or is there a reason to make it public? This email was screened for spam and malicious content but exercise caution anyway. On 19.12.24 04:05, Matthias J. Sax wrote: > I think the KIP missed a few updates: > > For example: > >> Adding a Response nested class,that contains a >> ProductionExceptionHandlerResponse indicating whether to continue >> processing, fail, or retry and the list of records to be sent to the >> dead letter queue topic > > Should be: > > Adding a Response nested class,that contains a Resutl indicating whether > to resume processing, fail, or retry and the list of records to be sent > to the dead letter queue topic > > > Also: > >> Deprecating the handler() and handlerSerializationException() methods >> and adding two new methods: handlerError() and >> handleSerializationException(). The return type is the >> ProductionExceptionResponse > > Should be: > > Deprecating the handler() and handlerSerializationException() methods > and adding two new methods: handlerError() and > handleSerializationException(). The return type is the Response > > (Similar as the two examples above for the other hanlders.) > > > > Question: > > Do we need the deprecated methods > `Result#from(XxxExceptionHandlerResponse)`? It seems we could add these > helpers to some internal class; it seems to be more of an implemenation > datail -- or is there a reason to make it public? > > > Nit: Some code snippest are not well formated > > > > -Matthias > > > On 12/16/24 12:33 AM, Sebastien Viale wrote: >> BC9: >> Thinking about the two options, I prefer the last solution. >> Introducing a new enum Result allows us to use RESUME instead of >> CONTINUE cleanly, without causing source compatibility issues. >> >> It also appears to be the most forward-compatible and maintainable >> approach, as it avoids the complexity of handling two equivalent states >> ( CONTINUE and RESUME) in parallel within the existing codebase. >> Additionally, naming the enum Result simplifies the terminology and >> creates greater homogeneity across all exception handlers. >> >> This approach ensures greater clarity and consistency moving forward, >> while minimizing potential confusion or edge cases. >> >> Thanks >> Sébastien >> >> >> >> ________________________________ >> De : Matthias J. Sax <mj...@apache.org> >> Envoyé : samedi 14 décembre 2024 05:29 >> À : dev@kafka.apache.org <dev@kafka.apache.org> >> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams >> >> Warning External sender Do not click on any links or open any >> attachments unless you trust the sender and know the content is safe. >> >> BC9: I did read up and play aroud a little bit. Enums do have a static >> number which the compiler assings to them, it's call the `ordinal`. And >> yes, switch statements and `==` are base on this orginal. >> >> Inside KS code base, we assing a custom `int id` variable though, that >> we also guarantee to never change. I did mix both up on my end. >> >> We can for sure not just rename `CONTINUE` to `RESUME` as it would not >> be source-compatible. However, I still don't see why we could not add >> `RESUME` in parallel to `CONTINUE` and deprecate `CONTINUE`? Of course, >> we might need to update some code inside KS to support CONTIUNE and >> RESUME as semantic equivalents, but it seems it should work? >> >> However, if we follow Bruno's proposal to introduce a new enum `Result` >> anyway, we could just use `RESUME` instead of `CONTINUE` w/o inroducing >> any issues? >> >> -Matthias >> >> This email was screened for spam and malicious content but exercise >> caution anyway. >> >> >> >> >> On 12/13/24 3:46 PM, Matthias J. Sax wrote: >>> About BC8, thanks for clarifying. I did indeed misunderstand you. >>> >>> I see now what you are saying; I like your proposal. >>> >>> >>> -Matthias >>> >>> >>> On 12/13/24 1:21 AM, Bruno Cadonna wrote: >>>> Hi Matthias, >>>> >>>> I believe there is a misunderstanding regarding BC8. >>>> >>>> BC8 >>>> I propose to rename the enum type from >>>> "ProductionExceptionHandler.ProductionExceptionHandlerResponse" to >>>> "ProductionExceptionHandler.Result". >>>> >>>> Currently in the KIP, the handler returns an object of type >>>> ProductionExceptionHandler.Response. That return object has a method >>>> response() that returns an enum of type >>>> ProductionExceptionHandler.ProductionExceptionHandlerResponse. My >>>> proposal was to create a new enum type >>>> ProductionExceptionHandler.Result that has the same values as >>>> ProductionExceptionHandler.ProductionExceptionHandlerResponse, so that >>>> we can use "ProductionExceptionHandler.Result.FAIL" instead of >>>> "ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL" >>>> in the code. >>>> >>>> >>>> BC9 >>>> I do not think giving the same ID to both enum values work. Enum >>>> values are static, so there is one instance of each value in the JVM. >>>> That is also the reason why we can use "==" when we compare enum >>>> values. >>>> Two different values with the same ID still have different references. >>>> Would be good to validate this. >>>> >>>> >>>> Best, >>>> Bruno >>>> >>>> On 12.12.24 23:03, Matthias J. Sax wrote: >>>>> BC6: Agreed. >>>>> >>>>> BC7: Like it. >>>>> >>>>> BC8: I personally think, `XxxHandler.Response` is fine. I don't see >>>>> how `Result` is any better than `Response`? But I guess both would >>>>> work. >>>>> >>>>> BC9: Good idea. I am wondering if we should rename the enum, too, >>>>> though? We just need to give both the old and new one the same id, so >>>>> they mean the same thing. (At least this is how I understand how >>>>> enums work? Please correct me if I am wrong.) >>>>> >>>>>> I considered using "resume" instead of "continue," but I interpreted >>>>>> it as "let's continue after stopping." >>>>> >>>>> There is for sure some truth to it, but I think it's ok to use >>>>> "resume". Should be close enough semantically. One can interpret the >>>>> call to the handler itself as "stopping/pausing" processing. >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 12/11/24 5:59 AM, Sebastien Viale wrote: >>>>>> Hi, >>>>>> >>>>>> BC6 >>>>>> I agree that there is no risk with this parameter. MEDIUM level is a >>>>>> good choice. >>>>>> BC7 >>>>>> response is indeed less confusing and easier to "read" >>>>>> BC8 >>>>>> For the same reason as BC7, it might be a good idea. Since you >>>>>> marked your comment as nit-picking, I will wait for other feedback >>>>>> before updating the KIP. >>>>>> BC9 >>>>>> I wasn’t entirely happy with my naming either. I considered using >>>>>> "resume" instead of "continue," but I interpreted it as "let's >>>>>> continue after stopping." If this is acceptable, I completely agree. >>>>>> >>>>>> cheers >>>>>> >>>>>> ________________________________ >>>>>> De : Bruno Cadonna <cado...@apache.org> >>>>>> Envoyé : mercredi 11 décembre 2024 11:49 >>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> >>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka >>>>>> Streams >>>>>> >>>>>> Warning External sender Do not click on any links or open any >>>>>> attachments unless you trust the sender and know the content is safe. >>>>>> >>>>>> Hi, >>>>>> >>>>>> Another suggestion came to my mind. >>>>>> >>>>>> BC9 >>>>>> You could use "resume" instead of "continue". Thus, you would not >>>>>> need >>>>>> to use continueProcessing(), failProcessing(), and retryProcessing() >>>>>> but >>>>>> simply resume(), fail(), retry(). >>>>>> Regarding the enum value CONTINUE, we do not need to change it to >>>>>> RESUME >>>>>> if you decide to go for resume(), IMO. Using resume() is just a >>>>>> compromise between avoiding the Java keyword continue and having >>>>>> shorter >>>>>> names. >>>>>> >>>>>> Best, >>>>>> Bruno >>>>>> >>>>>> This email was screened for spam and malicious content but exercise >>>>>> caution anyway. >>>>>> >>>>>> >>>>>> >>>>>> On 11.12.24 11:38, Bruno Cadonna wrote: >>>>>>> Hi Sébastien, >>>>>>> >>>>>>> Thanks for the updates! >>>>>>> >>>>>>> BC6 >>>>>>> Why is the importance of ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG >>>>>>> HIGH? >>>>>>> Is it really something a user should change before going to >>>>>>> production? >>>>>>> The only risk that I see is that they will not use a dead letter >>>>>>> queue >>>>>>> which I think does not put the app or the operation of the app at >>>>>>> risk. >>>>>>> Compare it with the config in KIP-1111. Users not setting that >>>>>>> config >>>>>>> risk to not be able to upgrade their topologies. With >>>>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG I do not see an equivalent >>>>>>> risk. Thus, I propose to set the importance to MEDIUM. >>>>>>> >>>>>>> >>>>>>> BC7 >>>>>>> I propose to call the new response class simply Response. Since >>>>>>> it is >>>>>>> specified in the ProductionExceptionHandler interface most of the >>>>>>> times >>>>>>> it will be used like ProductionExceptionHandler.Response which is a >>>>>>> bit >>>>>>> easier to parse with the eyes than >>>>>>> ProductionExceptionHandler.ProductionExceptionResponse. >>>>>>> >>>>>>> In addition, ProductionExceptionResponse does not make sense to me >>>>>>> since >>>>>>> it is the response of the handler and not of the exception. >>>>>>> >>>>>>> Similar applies to the other handler interfaces. >>>>>>> >>>>>>> >>>>>>> BC8 >>>>>>> The following comment is a bit of a nit-picking. So feel free not to >>>>>>> consider it. >>>>>>> The return type of response() is >>>>>>> ProductionExceptionHandler.ProductionExceptionHandlerResponse. This >>>>>>> was >>>>>>> also used before this KIP. Since the KIP changes quite some >>>>>>> stuff, you >>>>>>> could consider to create a complete new enum with a more readable >>>>>>> name. >>>>>>> Something like Result, which would then be used like >>>>>>> ProductionExceptionHandler.Result. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Bruno >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 04.09.24 14:46, Damien Gasparina wrote: >>>>>>>> Hi Bruno, >>>>>>>> >>>>>>>>> BC1 >>>>>>>> Make sense, let's keep this one outside of this KIP. I do think it >>>>>>>> would make sense to have a more generic tool to reset topics. This >>>>>>>> kind of stuff could be helpful, even outside of Kafka Streams (e.g. >>>>>>>> DLQ in a Consumer application, while testing, etc...). Funnily, >>>>>>>> I had >>>>>>>> to reset a topic a few minutes ago. >>>>>>>> >>>>>>>> > BC4 >>>>>>>> Good points, somehow I missed it, let me update the KIP to include >>>>>>>> the >>>>>>>> Getter. >>>>>>>> >>>>>>>>> BC5 >>>>>>>> Currently, the ErrorHandlerContext and the ProcessingContext are >>>>>>>> only >>>>>>>> containing information from the triggering message. >>>>>>>> I do think that keeping the current behavior is the best, on top of >>>>>>>> that, I would be afraid that identifying all the raw records >>>>>>>> could be >>>>>>>> very memory consuming and hard to implement (e.g. a >>>>>>>> .groupByKey().count()) . >>>>>>>> Good point for the documentation, let me update the KIP to document >>>>>>>> this behavior. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Damien >>>>>>>> >>>>>>>> On Wed, 4 Sept 2024 at 14:22, Bruno Cadonna <cado...@apache.org> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Damien, >>>>>>>>> >>>>>>>>> Thanks for the swift reply! >>>>>>>>> >>>>>>>>> BC1 >>>>>>>>> If you are not convinced, we can add that functionality to the >>>>>>>>> StreamsResetter in the future if requested. We do not need to >>>>>>>>> increase >>>>>>>>> the scope. Mine was just a question. >>>>>>>>> >>>>>>>>> >>>>>>>>> BC4 >>>>>>>>> I understand that `andAddToDeadLetterQueue()` adds the records >>>>>>>>> to an >>>>>>>>> internal field of the response. How is this field accessed? Is >>>>>>>>> it a >>>>>>>>> public field? What is the type of the field? From the KIP is >>>>>>>>> seems it >>>>>>>>> will be a public field of type Iterable<Producer<byte[], >>>>>>>>> byte[]>>. But >>>>>>>>> it is not stated explicitly. To be more felxible, I propose to >>>>>>>>> add a >>>>>>>>> public getter `deadLetterQueueRecords()` instead of a public >>>>>>>>> field for >>>>>>>>> the records to add to the DLQ. >>>>>>>>> >>>>>>>>> >>>>>>>>> BC5 >>>>>>>>> Regarding source raw records, what is the source raw record for a >>>>>>>>> result >>>>>>>>> that was computed by multiple input records like joins? >>>>>>>>> Does it still make sense to replay such messages? >>>>>>>>> Do you need to allow multiple source raw records in the error >>>>>>>>> handler >>>>>>>>> context? Or do you plan to just store the source raw record that >>>>>>>>> triggered the result? >>>>>>>>> I think all of this should be described in the KIP. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Bruno >>>>>>>>> >>>>>>>>> >>>>>>>>> On 9/3/24 5:24 PM, Damien Gasparina wrote: >>>>>>>>>> Hi Bruno, >>>>>>>>>> >>>>>>>>>>> BC1 >>>>>>>>>> I see your point. I wonder if having a generic and separate tool >>>>>>>>>> outside of StreamsReset to reset a topic would make sense. >>>>>>>>>> Some community projects have this feature (empty topics) and >>>>>>>>>> that's >>>>>>>>>> true that it could be quite useful, e.g. AKHQ. >>>>>>>>>> >>>>>>>>>>> BC3 >>>>>>>>>> Good point, this is not really a builder class, let me update >>>>>>>>>> the KIP >>>>>>>>>> with the proposed syntax. >>>>>>>>>> >>>>>>>>>> On Tue, 3 Sept 2024 at 16:00, Bruno Cadonna <cado...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Damien, >>>>>>>>>>> >>>>>>>>>>> BC1 >>>>>>>>>>> >>>>>>>>>>> > In my opinion DLQ topics should be viewed as a sink topic >>>>>>>>>>> and >>>>>>>>>>> > AFAIK, this tool does not clean up sink topics. >>>>>>>>>>> >>>>>>>>>>> Maybe, but one could also argue DLQ topics are part of the >>>>>>>>>>> runtime >>>>>>>>>>> because is collects errors occurred during the runtime that >>>>>>>>>>> might be >>>>>>>>>>> specific to given execution of the Streams app. One might want >>>>>>>>>>> want to >>>>>>>>>>> reset the errors when starting from scratch. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> BC2 >>>>>>>>>>> You are right the configs are passed at creation time. My bad! >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> BC3 >>>>>>>>>>> I think the `with`-syntax fits well when building objects that >>>>>>>>>>> contain >>>>>>>>>>> kind of configs like Materialized that you usually pass into an >>>>>>>>>>> API. >>>>>>>>>>> However, the handler returns the response. The response >>>>>>>>>>> instructs >>>>>>>>>>> something. It says CONTINUE processing or FAIL the processing. >>>>>>>>>>> With >>>>>>>>>>> your >>>>>>>>>>> KIP the response gets an additional instruction, namely >>>>>>>>>>> `andAddToDeadLetterQueue`. I would not sacrifice better >>>>>>>>>>> readability >>>>>>>>>>> for >>>>>>>>>>> consistency in this case. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Bruno >>>>>>>>>>> >>>>>>>>>>> On 9/3/24 3:18 PM, Damien Gasparina wrote: >>>>>>>>>>>> Hi Bruno, >>>>>>>>>>>> >>>>>>>>>>>> Thanks a lot for your comments! >>>>>>>>>>>> >>>>>>>>>>>>> BC1 >>>>>>>>>>>>> Do you also plan to add an argument to the StreamsResetter >>>>>>>>>>>>> tool to >>>>>>>>>>>>> delete the default DLQ topic when a Streams app is reset? >>>>>>>>>>>> Good point, I did not think about the StreamsResetter tool. >>>>>>>>>>>> Thinking >>>>>>>>>>>> out loud, I am not sure if it is a good idea to add an >>>>>>>>>>>> option to >>>>>>>>>>>> clean >>>>>>>>>>>> them up. In my opinion DLQ topics should be viewed as a sink >>>>>>>>>>>> topic >>>>>>>>>>>> and >>>>>>>>>>>> AFAIK, this tool does not clean up sink topics. >>>>>>>>>>>> >>>>>>>>>>>>> BC2 >>>>>>>>>>>> In case of a custom exception handlers, they can get the >>>>>>>>>>>> errors.deadletterqueue.topic.name configuration by overriding >>>>>>>>>>>> `void >>>>>>>>>>>> configure(Map<String, ?> configs);`. As it is the location >>>>>>>>>>>> where all >>>>>>>>>>>> the configuration can be accessed, I think it's the best way >>>>>>>>>>>> no? I >>>>>>>>>>>> will think about it a bit further, it might be useful/ >>>>>>>>>>>> convenient for >>>>>>>>>>>> users. >>>>>>>>>>>> >>>>>>>>>>>>> BC3 >>>>>>>>>>>> The "with" syntax is used in many locations in the Kafka >>>>>>>>>>>> Streams >>>>>>>>>>>> public classes, that's why I used it, e.g. Materialized.with(), >>>>>>>>>>>> Consumed.withKeySerde(...).withValueSerde(...), >>>>>>>>>>>> Grouped.withName(...). >>>>>>>>>>>> I do agree that .andAddToDeadLetterQueue is more intuitive, >>>>>>>>>>>> but I >>>>>>>>>>>> would argue that being consistent is better in this situation. >>>>>>>>>>>> What do >>>>>>>>>>>> you think? >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Damien >>>>>>>>>>>> >>>>>>>>>>>> On Tue, 3 Sept 2024 at 14:59, Bruno Cadonna >>>>>>>>>>>> <cado...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the updates! >>>>>>>>>>>>> >>>>>>>>>>>>> I have a couple of comments. >>>>>>>>>>>>> >>>>>>>>>>>>> BC1 >>>>>>>>>>>>> Do you also plan to add an argument to the StreamsResetter >>>>>>>>>>>>> tool to >>>>>>>>>>>>> delete the default DLQ topic when a Streams app is reset? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> BC2 >>>>>>>>>>>>> Would it make sense to add errors.deadletterqueue.topic.name >>>>>>>>>>>>> to the >>>>>>>>>>>>> ErrorHandlerContext in case that a custom exception handler >>>>>>>>>>>>> wants to >>>>>>>>>>>>> write to the configured DLQ topic? >>>>>>>>>>>>> For example if only one of the handler needs to be customized >>>>>>>>>>>>> but >>>>>>>>>>>>> all >>>>>>>>>>>>> handlers should write to the configured DLQ topic. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> BC3 >>>>>>>>>>>>> What do you think about renaming >>>>>>>>>>>>> withDeadLetterQueueRecords() to >>>>>>>>>>>>> andAddToDeadLetterQueue()? >>>>>>>>>>>>> A customized handler would look like >>>>>>>>>>>>> >>>>>>>>>>>>> public ProcessingHandlerResponse handle( >>>>>>>>>>>>> final ErrorHandlerContext context, >>>>>>>>>>>>> final Record<?, ?> record, >>>>>>>>>>>>> final Exception exception >>>>>>>>>>>>> ) { >>>>>>>>>>>>> return ProcessingHandlerResponse.CONTINUE >>>>>>>>>>>>> .andAddToDeadLetterQueue( >>>>>>>>>>>>> Collections.singletonList(new >>>>>>>>>>>>> ProducerRecord<>( >>>>>>>>>>>>> "app-dlq", >>>>>>>>>>>>> >>>>>>>>>>>>> "Hello".getBytes(StandardCharsets.UTF_8), >>>>>>>>>>>>> "World".getBytes(StandardCharsets.UTF_8) >>>>>>>>>>>>> )) >>>>>>>>>>>>> ); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> I think the code becomes more readable. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Bruno >>>>>>>>>>>>> >>>>>>>>>>>>> On 8/30/24 3:37 PM, Damien Gasparina wrote: >>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>> >>>>>>>>>>>>>> We just updated KIP-1034, we changed the following: >>>>>>>>>>>>>> - We included the ProcessingExceptionHandler >>>>>>>>>>>>>> (KIP-1033) >>>>>>>>>>>>>> directly in the KIP; >>>>>>>>>>>>>> - We provided examples to clarify the new >>>>>>>>>>>>>> configuration, >>>>>>>>>>>>>> and how it >>>>>>>>>>>>>> could be leveraged. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think we can resume the conversation on this KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Damien Sebastien and Loic >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, 27 Aug 2024 at 15:37, Sebastien Viale >>>>>>>>>>>>>> <sebastien.vi...@michelin.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Bruno, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We have planned a meeting for next friday to discuss it with >>>>>>>>>>>>>>> Loic and Damien. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We will be able to restart discussions about it soon. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ________________________________ >>>>>>>>>>>>>>> De : Bruno Cadonna <cado...@apache.org> >>>>>>>>>>>>>>> Envoyé : lundi 26 août 2024 11:32 >>>>>>>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> >>>>>>>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in >>>>>>>>>>>>>>> Kafka Streams >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Warning External sender Do not click on any links or open >>>>>>>>>>>>>>> any >>>>>>>>>>>>>>> attachments unless you trust the sender and know the >>>>>>>>>>>>>>> content is >>>>>>>>>>>>>>> safe. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Loïc, Sebastien, and Damien, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now that KIP-1033 is going to be released in 3.9, what is >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> plan to >>>>>>>>>>>>>>> progress with this KIP? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is the KIP up-to-date, so that we can restart discussion? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Bruno >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This email was screened for spam and malicious content but >>>>>>>>>>>>>>> exercise caution anyway. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 6/13/24 6:16 PM, Damien Gasparina wrote: >>>>>>>>>>>>>>>> Hi Bruno, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We focused our effort (well, mostly Seb and Loic :)) on >>>>>>>>>>>>>>>> KIP-1033, >>>>>>>>>>>>>>>> that's why not much progress has been made on this one yet. >>>>>>>>>>>>>>>> Regarding your points: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> B1: It is up to the user to specify the DLQ topic name >>>>>>>>>>>>>>>> and to >>>>>>>>>>>>>>>> implement a potential differentiation. I tend to think that >>>>>>>>>>>>>>>> having one >>>>>>>>>>>>>>>> DLQ per application ID is the wisest, but I encountered >>>>>>>>>>>>>>>> cases and >>>>>>>>>>>>>>>> applications that preferred having a shared DLQ topic >>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>> applications, e.g. to reduce the number of partitions, or >>>>>>>>>>>>>>>> to ease >>>>>>>>>>>>>>>> monitoring >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> B2 : Goot catch, it should be >>>>>>>>>>>>>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the >>>>>>>>>>>>>>>> "DEFAULT" >>>>>>>>>>>>>>>> prefix during the discussion, looks like I forgot to >>>>>>>>>>>>>>>> update all >>>>>>>>>>>>>>>> occurrences in the KIP. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> B3 :The trigger for sending to the DLQ would be if >>>>>>>>>>>>>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the >>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>> implemented a custom exception handler that returns DLQ >>>>>>>>>>>>>>>> records. >>>>>>>>>>>>>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just >>>>>>>>>>>>>>>> override the >>>>>>>>>>>>>>>> behavior of the default handler, thus custom exception >>>>>>>>>>>>>>>> handlers will >>>>>>>>>>>>>>>> completely ignore this parameter. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think it's a good trade-off between providing a >>>>>>>>>>>>>>>> production- >>>>>>>>>>>>>>>> ready >>>>>>>>>>>>>>>> default implementation, yet providing sufficient >>>>>>>>>>>>>>>> flexibility for >>>>>>>>>>>>>>>> complex use-cases. >>>>>>>>>>>>>>>> This behavior definitely needs to be documented, but I >>>>>>>>>>>>>>>> guess >>>>>>>>>>>>>>>> it's safe >>>>>>>>>>>>>>>> to push the responsibility of the DLQ records to the user >>>>>>>>>>>>>>>> if they >>>>>>>>>>>>>>>> implement custom handlers. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Damien >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna >>>>>>>>>>>>>>>> <cado...@apache.org> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> since there was not too much activity in this thread >>>>>>>>>>>>>>>>> recently, I was >>>>>>>>>>>>>>>>> wondering what the status of this discussion is. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I cannot find the examples in the KIP Sébastien >>>>>>>>>>>>>>>>> mentioned in >>>>>>>>>>>>>>>>> the last >>>>>>>>>>>>>>>>> message to this thread. I can also not find the >>>>>>>>>>>>>>>>> corresponding >>>>>>>>>>>>>>>>> definition >>>>>>>>>>>>>>>>> of the following method call in the KIP: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> FAIL.withDeadLetterQueueRecord(record, "dlq-topic") >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have also some comments: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> B1 >>>>>>>>>>>>>>>>> Did you consider to prefix the dead letter queue topic >>>>>>>>>>>>>>>>> names >>>>>>>>>>>>>>>>> with the >>>>>>>>>>>>>>>>> application ID to distinguish the topics between Streams >>>>>>>>>>>>>>>>> apps? Or is the >>>>>>>>>>>>>>>>> user responsible for the differentiation? If the user is >>>>>>>>>>>>>>>>> responsible, we >>>>>>>>>>>>>>>>> risk that faulty records of different Streams apps end >>>>>>>>>>>>>>>>> up in >>>>>>>>>>>>>>>>> the same >>>>>>>>>>>>>>>>> dead letter queue. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> B2 >>>>>>>>>>>>>>>>> Is the name of the dead letter queue topic config >>>>>>>>>>>>>>>>> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or >>>>>>>>>>>>>>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both >>>>>>>>>>>>>>>>> names are used. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> B3 >>>>>>>>>>>>>>>>> What is exactly the trigger to send a record to the dead >>>>>>>>>>>>>>>>> letter queue? >>>>>>>>>>>>>>>>> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or >>>>>>>>>>>>>>>>> is it >>>>>>>>>>>>>>>>> adding a >>>>>>>>>>>>>>>>> record to the return value of the exception handler? >>>>>>>>>>>>>>>>> What happens if I set >>>>>>>>>>>>>>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do >>>>>>>>>>>>>>>>> not add a record to the return value of the handler? What >>>>>>>>>>>>>>>>> happens if I >>>>>>>>>>>>>>>>> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but >>>>>>>>>>>>>>>>> add a >>>>>>>>>>>>>>>>> record to >>>>>>>>>>>>>>>>> the return value of the handler? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Bruno >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 4/22/24 10:19 PM, Sebastien Viale wrote: >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for your remarks >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> L1. I would say "who can do the most can do the least", >>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>> though most people will fail and stop, we found it >>>>>>>>>>>>>>>>>> interesting to offer the possibility to fail-and-send- >>>>>>>>>>>>>>>>>> to-DLQ >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> L2: We did not consider extending the TimestampExtractor >>>>>>>>>>>>>>>>>> because we estimate it out of scope for this KIP. >>>>>>>>>>>>>>>>>> Perhaps it >>>>>>>>>>>>>>>>>> will be possible to include it in an ExceptionHandler >>>>>>>>>>>>>>>>>> later. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> L3: we will include an example in the KIP, but as we >>>>>>>>>>>>>>>>>> mentioned earlier, the DLQ topic can be different in each >>>>>>>>>>>>>>>>>> custom Exception Handler: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> When providing custom handlers, users would have the >>>>>>>>>>>>>>>>>> possibility to return: >>>>>>>>>>>>>>>>>> * FAIL >>>>>>>>>>>>>>>>>> * CONTINUE >>>>>>>>>>>>>>>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") >>>>>>>>>>>>>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> cheers ! >>>>>>>>>>>>>>>>>> Sébastien >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ________________________________ >>>>>>>>>>>>>>>>>> De : Lucas Brutschy <lbruts...@confluent.io.INVALID> >>>>>>>>>>>>>>>>>> Envoyé : lundi 22 avril 2024 14:36 >>>>>>>>>>>>>>>>>> À : dev@kafka.apache.org <dev@kafka.apache.org> >>>>>>>>>>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter >>>>>>>>>>>>>>>>>> queue in >>>>>>>>>>>>>>>>>> Kafka Streams >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Warning External sender Do not click on any links or open >>>>>>>>>>>>>>>>>> any attachments unless you trust the sender and know the >>>>>>>>>>>>>>>>>> content is safe. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the KIP, great stuff. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> L1. I was a bit confused that the default configuration >>>>>>>>>>>>>>>>>> (once you set >>>>>>>>>>>>>>>>>> a DLQ topic) is going to be fail-and-send-to-DLQ, if I >>>>>>>>>>>>>>>>>> understood >>>>>>>>>>>>>>>>>> correctly. Is this something that will be a common use- >>>>>>>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>>> and is it >>>>>>>>>>>>>>>>>> a configuration that we want to encourage? It expected >>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> you either >>>>>>>>>>>>>>>>>> want to fail or skip-and-send-to-DLQ. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> L2. Have you considered extending the >>>>>>>>>>>>>>>>>> `TimestampExtractor` >>>>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>>> so that it can also produce to DLQ? AFAIK it's not >>>>>>>>>>>>>>>>>> covered >>>>>>>>>>>>>>>>>> by any of >>>>>>>>>>>>>>>>>> the existing exception handlers, but it can cause similar >>>>>>>>>>>>>>>>>> failures >>>>>>>>>>>>>>>>>> (potentially custom logic, depends on validity input >>>>>>>>>>>>>>>>>> record). There >>>>>>>>>>>>>>>>>> could also be a default implementation as a subclass of >>>>>>>>>>>>>>>>>> `ExtractRecordMetadataTimestamp`. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> L3. It would be nice to include an example of how to >>>>>>>>>>>>>>>>>> produce to >>>>>>>>>>>>>>>>>> multiple topics in the KIP, as I can imagine that this >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>> be a >>>>>>>>>>>>>>>>>> common use-case. I wasn't sure how much code would be >>>>>>>>>>>>>>>>>> involved to make >>>>>>>>>>>>>>>>>> it work. If a lot of code is required, we may want to >>>>>>>>>>>>>>>>>> consider >>>>>>>>>>>>>>>>>> exposing some utils that make it easier. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Lucas >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This email was screened for spam and malicious content >>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>> exercise caution anyway. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Apr 21, 2024 at 7:58?PM Damien Gasparina >>>>>>>>>>>>>>>>>> <d.gaspar...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Following all the discussion on this KIP and >>>>>>>>>>>>>>>>>>> KIP-1033, we >>>>>>>>>>>>>>>>>>> introduced a >>>>>>>>>>>>>>>>>>> new container class containing only processing context >>>>>>>>>>>>>>>>>>> metadata: >>>>>>>>>>>>>>>>>>> ProcessingMetadata. This new container class is actually >>>>>>>>>>>>>>>>>>> part of >>>>>>>>>>>>>>>>>>> KIP-1033, thus, I added a hard dependency for this >>>>>>>>>>>>>>>>>>> KIP on >>>>>>>>>>>>>>>>>>> KIP-1033, I >>>>>>>>>>>>>>>>>>> think it's the wisest implementation wise. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I also clarified the interface of the enums: >>>>>>>>>>>>>>>>>>> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[], >>>>>>>>>>>>>>>>>>> byte[]>> deadLetterQueueRecords) . Very likely most >>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>> would just >>>>>>>>>>>>>>>>>>> send one DLQ record, but there might be specific use- >>>>>>>>>>>>>>>>>>> cases >>>>>>>>>>>>>>>>>>> and what >>>>>>>>>>>>>>>>>>> can do more can do less, so I added an Iterable. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I took some time to think about the impact of storing >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> ProcessingMetadata on the ProductionExceptionHandler. I >>>>>>>>>>>>>>>>>>> think storing >>>>>>>>>>>>>>>>>>> the topic/offset/partition should be fine, but I am >>>>>>>>>>>>>>>>>>> concerned about >>>>>>>>>>>>>>>>>>> storing the rawSourceKey/Value. I think it could impact >>>>>>>>>>>>>>>>>>> some specific >>>>>>>>>>>>>>>>>>> use-cases, for example, a high-throughput Kafka Streams >>>>>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>> "counting" messages could have huge source input >>>>>>>>>>>>>>>>>>> messages, >>>>>>>>>>>>>>>>>>> and very >>>>>>>>>>>>>>>>>>> small sink messages, here, I assume storing the >>>>>>>>>>>>>>>>>>> rawSourceKey/Value >>>>>>>>>>>>>>>>>>> could significantly require more memory than the actual >>>>>>>>>>>>>>>>>>> Kafka Producer >>>>>>>>>>>>>>>>>>> buffer. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think the safest approach is actually to only store >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> fixed-size >>>>>>>>>>>>>>>>>>> metadata for the ProductionExceptionHandler.handle: >>>>>>>>>>>>>>>>>>> topic/partition/offset/processorNodeId/taskId, it >>>>>>>>>>>>>>>>>>> might be >>>>>>>>>>>>>>>>>>> confusing >>>>>>>>>>>>>>>>>>> for the user, but 1) it is still better than nowaday >>>>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>> there are >>>>>>>>>>>>>>>>>>> no context information at all, 2) it would be clearly >>>>>>>>>>>>>>>>>>> stated in the >>>>>>>>>>>>>>>>>>> javadoc, 3) the rawSourceKey/Value are already nullable >>>>>>>>>>>>>>>>>>> (e.g. the >>>>>>>>>>>>>>>>>>> punctuate case). . >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Do you think it would be a suitable design Sophie? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Damien >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, 14 Apr 2024 at 21:30, Loic Greffier >>>>>>>>>>>>>>>>>>> <loic.greff...@michelin.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Sophie, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for your feedback. >>>>>>>>>>>>>>>>>>>> Completing the Damien's comments here for points S1 >>>>>>>>>>>>>>>>>>>> and S5B. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S1: >>>>>>>>>>>>>>>>>>>>> I'm confused -- are you saying that we're >>>>>>>>>>>>>>>>>>>>> introducing a >>>>>>>>>>>>>>>>>>>>> new kind of ProducerRecord class for this? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I am wondering if it makes sense to alter the >>>>>>>>>>>>>>>>>>>> ProducerRecord from Clients API with a >>>>>>>>>>>>>>>>>>>> "deadLetterQueueTopicName" attribute dedicated to Kafka >>>>>>>>>>>>>>>>>>>> Streams DLQ. >>>>>>>>>>>>>>>>>>>> Adding "deadLetterQueueTopicName" as an additional >>>>>>>>>>>>>>>>>>>> parameter to "withDeadLetterQueueRecord" is a good >>>>>>>>>>>>>>>>>>>> option, >>>>>>>>>>>>>>>>>>>> and may allow users to send records to different DLQ >>>>>>>>>>>>>>>>>>>> topics depending on conditions: >>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>> public ProductionExceptionHandlerResponse handle(final >>>>>>>>>>>>>>>>>>>> ProcessingContext context, >>>>>>>>>>>>>>>>>>>> ProducerRecord<byte[], byte[]> record, >>>>>>>>>>>>>>>>>>>> Exception exception) { >>>>>>>>>>>>>>>>>>>> if (condition1) { >>>>>>>>>>>>>>>>>>>> return ProductionExceptionHandlerResponse.CONTINUE >>>>>>>>>>>>>>>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-a"); >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> if (condition2) { >>>>>>>>>>>>>>>>>>>> return ProductionExceptionHandlerResponse.CONTINUE >>>>>>>>>>>>>>>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-b"); >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> return ProductionExceptionHandlerResponse.CONTINUE >>>>>>>>>>>>>>>>>>>> .withDeadLetterQueueRecord(record, "dlq-topic-c"); >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S5B: >>>>>>>>>>>>>>>>>>>>> I was having a bit of trouble understanding what the >>>>>>>>>>>>>>>>>>>>> behavior would be if someone configured a >>>>>>>>>>>>>>>>>>>>> "errors.deadletterqueue.topic.name" but didn't >>>>>>>>>>>>>>>>>>>>> implement >>>>>>>>>>>>>>>>>>>>> the handlers. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The provided LogAndContinueExceptionHandler, >>>>>>>>>>>>>>>>>>>> LogAndFailExceptionHandler and >>>>>>>>>>>>>>>>>>>> DefaultProductionExceptionHandler should be able to >>>>>>>>>>>>>>>>>>>> tell >>>>>>>>>>>>>>>>>>>> if records should be sent to DLQ or not. >>>>>>>>>>>>>>>>>>>> The "errors.deadletterqueue.topic.name" takes place to: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * Specifying if the provided handlers should or should >>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> send records to DLQ. >>>>>>>>>>>>>>>>>>>> * If the value is empty, the handlers should not send >>>>>>>>>>>>>>>>>>>> records to DLQ. >>>>>>>>>>>>>>>>>>>> * If the value is not empty, the handlers should send >>>>>>>>>>>>>>>>>>>> records to DLQ. >>>>>>>>>>>>>>>>>>>> * Define the name of the DLQ topic that should be >>>>>>>>>>>>>>>>>>>> used by >>>>>>>>>>>>>>>>>>>> the provided handlers. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thus, if "errors.deadletterqueue.topic.name" is >>>>>>>>>>>>>>>>>>>> defined, >>>>>>>>>>>>>>>>>>>> the provided handlers should return either: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, >>>>>>>>>>>>>>>>>>>> defaultDeadLetterQueue) >>>>>>>>>>>>>>>>>>>> * FAIL.withDeadLetterQueueRecord(record, >>>>>>>>>>>>>>>>>>>> defaultDeadLetterQueue). >>>>>>>>>>>>>>>>>>>> If "errors.deadletterqueue.topic.name" is defined but >>>>>>>>>>>>>>>>>>>> neither DeserializationExceptionHandler nor >>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler classes are defined in the >>>>>>>>>>>>>>>>>>>> configuration, then nothing should happen as sending to >>>>>>>>>>>>>>>>>>>> DLQ is based on handlers’ response. >>>>>>>>>>>>>>>>>>>> When providing custom handlers, users would have the >>>>>>>>>>>>>>>>>>>> possibility to return: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * FAIL >>>>>>>>>>>>>>>>>>>> * CONTINUE >>>>>>>>>>>>>>>>>>>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") >>>>>>>>>>>>>>>>>>>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq- >>>>>>>>>>>>>>>>>>>> topic") >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> A DLQ topic name is currently required using the two >>>>>>>>>>>>>>>>>>>> last >>>>>>>>>>>>>>>>>>>> response types. >>>>>>>>>>>>>>>>>>>> I am wondering if it could benefit users to ease the >>>>>>>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>>>>>> of the default DLQ topic >>>>>>>>>>>>>>>>>>>> "errors.deadletterqueue.topic.name" when implementing >>>>>>>>>>>>>>>>>>>> custom handlers, with such kind of implementation: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * FAIL.withDefaultDeadLetterQueueRecord(record) >>>>>>>>>>>>>>>>>>>> * CONTINUE.withDefaultDeadLetterQueueRecord(record) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> Loïc >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> De : Damien Gasparina <d.gaspar...@gmail.com> >>>>>>>>>>>>>>>>>>>> Envoyé : dimanche 14 avril 2024 20:24 >>>>>>>>>>>>>>>>>>>> À : dev@kafka.apache.org >>>>>>>>>>>>>>>>>>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter >>>>>>>>>>>>>>>>>>>> queue in >>>>>>>>>>>>>>>>>>>> Kafka Streams >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Warning External sender Do not click on any links or >>>>>>>>>>>>>>>>>>>> open >>>>>>>>>>>>>>>>>>>> any attachments unless you trust the sender and know >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> content is safe. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Sophie, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks a lot for your feedback and your detailed >>>>>>>>>>>>>>>>>>>> comments. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S1. >>>>>>>>>>>>>>>>>>>>> I'm confused -- are you saying that we're >>>>>>>>>>>>>>>>>>>>> introducing a >>>>>>>>>>>>>>>>>>>>> new kind of >>>>>>>>>>>>>>>>>>>> ProducerRecord class for this? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Sorry for the poor wording, that's not what I meant. >>>>>>>>>>>>>>>>>>>> While >>>>>>>>>>>>>>>>>>>> writing the >>>>>>>>>>>>>>>>>>>> KIP, I was hesitating between 1. leveraging the Kafka >>>>>>>>>>>>>>>>>>>> Producer >>>>>>>>>>>>>>>>>>>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a >>>>>>>>>>>>>>>>>>>> topic name in >>>>>>>>>>>>>>>>>>>> a separate parameter, 3. a new custom interface (e.g. >>>>>>>>>>>>>>>>>>>> DeadLetterQueueRecord). >>>>>>>>>>>>>>>>>>>> As the KafkaProducer ProducerRecord is not used in the >>>>>>>>>>>>>>>>>>>> Kafka Streams >>>>>>>>>>>>>>>>>>>> API (except ProductionExceptionHandler) and I would >>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>> to avoid a >>>>>>>>>>>>>>>>>>>> new interface if not strictly required, I leaned toward >>>>>>>>>>>>>>>>>>>> option 2. >>>>>>>>>>>>>>>>>>>> Thinking about it, maybe option 1. would be best, but I >>>>>>>>>>>>>>>>>>>> assume it >>>>>>>>>>>>>>>>>>>> could create confusion with KafkaStreams >>>>>>>>>>>>>>>>>>>> ProducerRecord. >>>>>>>>>>>>>>>>>>>> Let me sleep >>>>>>>>>>>>>>>>>>>> on it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S2. I agree. Following the discussion in KIP-1033 and >>>>>>>>>>>>>>>>>>>> KIP-1034 and >>>>>>>>>>>>>>>>>>>> your point in S4, it seems more and more likely that we >>>>>>>>>>>>>>>>>>>> will create a >>>>>>>>>>>>>>>>>>>> new container class containing only the metadata for >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> exception >>>>>>>>>>>>>>>>>>>> handlers. To be consistent, I think we should use this >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> implementation in all exception handlers. >>>>>>>>>>>>>>>>>>>> The only issue I could think off is that the new >>>>>>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> expose less data than the current ProcessorContext >>>>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>> DeserializationException(e.g. stateDir(), metrics(), >>>>>>>>>>>>>>>>>>>> getStateStore()), >>>>>>>>>>>>>>>>>>>> thus it could be hard for some users to migrate to the >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> interface. >>>>>>>>>>>>>>>>>>>> I do expect that only a few users would be impacted as >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> javadoc is >>>>>>>>>>>>>>>>>>>> very clear: `Note, that the passed in {@link >>>>>>>>>>>>>>>>>>>> ProcessorContext} only >>>>>>>>>>>>>>>>>>>> allows access to metadata like the task ID.` >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S3. I completely agree with you, it is something that >>>>>>>>>>>>>>>>>>>> might not be >>>>>>>>>>>>>>>>>>>> trivial and should be thoroughly covered by unit tests >>>>>>>>>>>>>>>>>>>> during the >>>>>>>>>>>>>>>>>>>> implementation. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S4. Good point, I did not notice that the >>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler >>>>>>>>>>>>>>>>>>>> is also invoked in the producer.send() callback. >>>>>>>>>>>>>>>>>>>> Capturing the ProcessingContext for each in-flight >>>>>>>>>>>>>>>>>>>> message >>>>>>>>>>>>>>>>>>>> is probably >>>>>>>>>>>>>>>>>>>> not possible. I think there is no other way to write a >>>>>>>>>>>>>>>>>>>> custom >>>>>>>>>>>>>>>>>>>> container class holding only the metadata that are >>>>>>>>>>>>>>>>>>>> essentials, I am >>>>>>>>>>>>>>>>>>>> thinking of storing the following attributes: source >>>>>>>>>>>>>>>>>>>> topic, partition, >>>>>>>>>>>>>>>>>>>> offset, rawKey, rawValue and taskId. >>>>>>>>>>>>>>>>>>>> Those metadata should be relatively small, but I assume >>>>>>>>>>>>>>>>>>>> that there >>>>>>>>>>>>>>>>>>>> could be a high number of in-flight messages, >>>>>>>>>>>>>>>>>>>> especially >>>>>>>>>>>>>>>>>>>> with at least >>>>>>>>>>>>>>>>>>>> once processing guarantee. Do you think it would be >>>>>>>>>>>>>>>>>>>> fine >>>>>>>>>>>>>>>>>>>> memory wise? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S5. As many exceptions are only accessible in exception >>>>>>>>>>>>>>>>>>>> handlers, and >>>>>>>>>>>>>>>>>>>> we wanted to 1) allow users to customize the DLQ >>>>>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>>>>> and 2) have a >>>>>>>>>>>>>>>>>>>> suitable DLQ out of the box implementation, we felt it >>>>>>>>>>>>>>>>>>>> natural to rely >>>>>>>>>>>>>>>>>>>> on exception handlers, that's also why we created >>>>>>>>>>>>>>>>>>>> KIP-1033. >>>>>>>>>>>>>>>>>>>> Piggybacking on the enum response was the cleanest >>>>>>>>>>>>>>>>>>>> way we >>>>>>>>>>>>>>>>>>>> could think >>>>>>>>>>>>>>>>>>>> off, but we are completely open to suggestions. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S5a. Completely agree with you on this point, for this >>>>>>>>>>>>>>>>>>>> DLQ >>>>>>>>>>>>>>>>>>>> approach to >>>>>>>>>>>>>>>>>>>> be complete, the ProcessingExceptionHandler >>>>>>>>>>>>>>>>>>>> introduced in >>>>>>>>>>>>>>>>>>>> KIP-1033 is >>>>>>>>>>>>>>>>>>>> required. KIP-1033 is definitely our first priority. We >>>>>>>>>>>>>>>>>>>> decided to >>>>>>>>>>>>>>>>>>>> kick-off the KIP-1034 discussion as we expected the >>>>>>>>>>>>>>>>>>>> discussions to be >>>>>>>>>>>>>>>>>>>> dynamic and could potentially impact some choices of >>>>>>>>>>>>>>>>>>>> KIP-1033. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S5b. In this KIP, we wanted to 1. provide as much >>>>>>>>>>>>>>>>>>>> flexibility to the >>>>>>>>>>>>>>>>>>>> user as possible; 2. provide a good default >>>>>>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>>>>> for the DLQ without having to write custom exception >>>>>>>>>>>>>>>>>>>> handlers. >>>>>>>>>>>>>>>>>>>> For the default implementation, we introduced a new >>>>>>>>>>>>>>>>>>>> configuration: >>>>>>>>>>>>>>>>>>>> errors.deadletterqueue.topic.name. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If this configuration is set, it changes the >>>>>>>>>>>>>>>>>>>> behavior of >>>>>>>>>>>>>>>>>>>> the provided >>>>>>>>>>>>>>>>>>>> exception handlers to return a DLQ record containing >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> raw key/value >>>>>>>>>>>>>>>>>>>> + headers + exception metadata in headers. >>>>>>>>>>>>>>>>>>>> If the out of the box implementation is not suitable >>>>>>>>>>>>>>>>>>>> for a >>>>>>>>>>>>>>>>>>>> user, e.g. >>>>>>>>>>>>>>>>>>>> the payload needs to be masked in the DLQ, it could >>>>>>>>>>>>>>>>>>>> implement their >>>>>>>>>>>>>>>>>>>> own exception handlers. The >>>>>>>>>>>>>>>>>>>> errors.deadletterqueue.topic.name would >>>>>>>>>>>>>>>>>>>> only impact Kafka Streams bundled exception handlers >>>>>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>>>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Let me update the KIP to make it clear and also provide >>>>>>>>>>>>>>>>>>>> examples. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> S6/S7. Good point, mea culpa for the camel case, it >>>>>>>>>>>>>>>>>>>> must >>>>>>>>>>>>>>>>>>>> have been a >>>>>>>>>>>>>>>>>>>> sugar rush :) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks again for your detailed comments and pointing >>>>>>>>>>>>>>>>>>>> out S4 >>>>>>>>>>>>>>>>>>>> (production exception & Processing Context)! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>> Damien >>>>>>>>>>>>>>>>>>>> This email was screened for spam and malicious content >>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>> exercise caution anyway. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman >>>>>>>>>>>>>>>>>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, this will make a lot of people >>>>>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>> happy. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Wanted to chime in on a few points that have been >>>>>>>>>>>>>>>>>>>>> raised >>>>>>>>>>>>>>>>>>>>> so far and add >>>>>>>>>>>>>>>>>>>>> some of my own (numbering with an S to distinguish my >>>>>>>>>>>>>>>>>>>>> points from the >>>>>>>>>>>>>>>>>>>>> previous ones) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S1. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1.a I really meant ProducerRecord, that's the class >>>>>>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>>>> to forward to >>>>>>>>>>>>>>>>>>>>>> downstream processors in the PAPI. The only >>>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>> missing in >>>>>>>>>>>>>>>>>>>>>> this class is the topic name. I also considered >>>>>>>>>>>>>>>>>>>>>> relying >>>>>>>>>>>>>>>>>>>>>> on the Kafka >>>>>>>>>>>>>>>>>>>>>> Producer ProducerRecord, but I assume it would not be >>>>>>>>>>>>>>>>>>>>>> consistent with >>>>>>>>>>>>>>>>>>>>>> the KafkaStreams API. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I'm confused -- are you saying that we're >>>>>>>>>>>>>>>>>>>>> introducing a >>>>>>>>>>>>>>>>>>>>> new kind of >>>>>>>>>>>>>>>>>>>>> ProducerRecord class for this? Why not just use the >>>>>>>>>>>>>>>>>>>>> existing one, ie the >>>>>>>>>>>>>>>>>>>>> o.a.k.clients.producer.ProducerRecord class? This is >>>>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler uses, so it's definitely >>>>>>>>>>>>>>>>>>>>> "consistent". In other >>>>>>>>>>>>>>>>>>>>> words, we can remove the "String >>>>>>>>>>>>>>>>>>>>> deadLetterQueueTopicName" >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S2. >>>>>>>>>>>>>>>>>>>>> I think this would be a good opportunity to also >>>>>>>>>>>>>>>>>>>>> deprecate the existing >>>>>>>>>>>>>>>>>>>>> #handle method of the DeserializationExceptionHandler, >>>>>>>>>>>>>>>>>>>>> and replace it with >>>>>>>>>>>>>>>>>>>>> one that uses a ProcessingContext instead of the >>>>>>>>>>>>>>>>>>>>> ProcessorContext. Partly >>>>>>>>>>>>>>>>>>>>> for the same reasons about guarding access to the >>>>>>>>>>>>>>>>>>>>> #forward methods, partly >>>>>>>>>>>>>>>>>>>>> because this method needs to be migrated to the new >>>>>>>>>>>>>>>>>>>>> PAPI >>>>>>>>>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>>>>>> anyways, and ProcessingContext is part of the new one. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S3. >>>>>>>>>>>>>>>>>>>>> Regarding 2a. -- I'm inclined to agree that records >>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> a Punctuator >>>>>>>>>>>>>>>>>>>>> failed to produce should also be sent to the DLQ >>>>>>>>>>>>>>>>>>>>> via the >>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler. Users will just need to be >>>>>>>>>>>>>>>>>>>>> careful about >>>>>>>>>>>>>>>>>>>>> accessing certain fields of the ProcessingContext that >>>>>>>>>>>>>>>>>>>>> aren't available in >>>>>>>>>>>>>>>>>>>>> the punctuator, and need to check the Optional >>>>>>>>>>>>>>>>>>>>> returned >>>>>>>>>>>>>>>>>>>>> by the >>>>>>>>>>>>>>>>>>>>> ProcessingContext#recordMetadata API. >>>>>>>>>>>>>>>>>>>>> Also, from an implementation standpoint, it will be >>>>>>>>>>>>>>>>>>>>> really hard to >>>>>>>>>>>>>>>>>>>>> distinguish between a record created by a punctuator >>>>>>>>>>>>>>>>>>>>> vs a >>>>>>>>>>>>>>>>>>>>> processor from >>>>>>>>>>>>>>>>>>>>> within the RecordCollector, which is the class that >>>>>>>>>>>>>>>>>>>>> actually handles >>>>>>>>>>>>>>>>>>>>> sending records to the Streams Producer and >>>>>>>>>>>>>>>>>>>>> invoking the >>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler. This is because the >>>>>>>>>>>>>>>>>>>>> RecordCollector is at the >>>>>>>>>>>>>>>>>>>>> "end" of the topology graph and doesn't have any >>>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>> about which of the >>>>>>>>>>>>>>>>>>>>> upstream processors actually attempted to forward a >>>>>>>>>>>>>>>>>>>>> record. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This in itself is at least theoretically solvable, >>>>>>>>>>>>>>>>>>>>> but it >>>>>>>>>>>>>>>>>>>>> leads into my >>>>>>>>>>>>>>>>>>>>> first major new point: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S4: >>>>>>>>>>>>>>>>>>>>> I'm deeply worried about passing the >>>>>>>>>>>>>>>>>>>>> ProcessingContext in >>>>>>>>>>>>>>>>>>>>> as a means of >>>>>>>>>>>>>>>>>>>>> forwarding metadata. The problem is that the >>>>>>>>>>>>>>>>>>>>> processing/ >>>>>>>>>>>>>>>>>>>>> processor context >>>>>>>>>>>>>>>>>>>>> is a mutable class and is inherently meaningless >>>>>>>>>>>>>>>>>>>>> outside >>>>>>>>>>>>>>>>>>>>> the context of a >>>>>>>>>>>>>>>>>>>>> specific task. And when I said earlier that the >>>>>>>>>>>>>>>>>>>>> RecordCollector sits at >>>>>>>>>>>>>>>>>>>>> the "end" of the topology, I meant that it's literally >>>>>>>>>>>>>>>>>>>>> outside the task's >>>>>>>>>>>>>>>>>>>>> subtopology and is used/shared by all tasks on that >>>>>>>>>>>>>>>>>>>>> StreamThread. So to >>>>>>>>>>>>>>>>>>>>> begin with, there's no guarantee what will actually be >>>>>>>>>>>>>>>>>>>>> returned for >>>>>>>>>>>>>>>>>>>>> essential methods such as the new #rawSourceKey/ >>>>>>>>>>>>>>>>>>>>> Value or >>>>>>>>>>>>>>>>>>>>> the existing >>>>>>>>>>>>>>>>>>>>> #recordMetadata >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For serialization exceptions it'll probably be >>>>>>>>>>>>>>>>>>>>> correct, >>>>>>>>>>>>>>>>>>>>> but for general >>>>>>>>>>>>>>>>>>>>> send errors it almost definitely won't be. In short, >>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> is because we >>>>>>>>>>>>>>>>>>>>> send records to the producer after the sink node, but >>>>>>>>>>>>>>>>>>>>> don't check for send >>>>>>>>>>>>>>>>>>>>> errors right away since obviously it takes some >>>>>>>>>>>>>>>>>>>>> time for >>>>>>>>>>>>>>>>>>>>> the producer to >>>>>>>>>>>>>>>>>>>>> actually send. In other words, sending/producing >>>>>>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>>>>>> is actually done >>>>>>>>>>>>>>>>>>>>> asynchronously with processing, and we simply check >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> errors on any >>>>>>>>>>>>>>>>>>>>> previously-sent records >>>>>>>>>>>>>>>>>>>>> during the send on a new record in a sink node. This >>>>>>>>>>>>>>>>>>>>> means the context we >>>>>>>>>>>>>>>>>>>>> would be passing in to a (non-serialization) exception >>>>>>>>>>>>>>>>>>>>> would pretty much >>>>>>>>>>>>>>>>>>>>> always correspond not the the record that experienced >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> error, but the >>>>>>>>>>>>>>>>>>>>> random record that happened to be being sent when we >>>>>>>>>>>>>>>>>>>>> checked and saw the >>>>>>>>>>>>>>>>>>>>> error for the failed record. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This discrepancy, in addition to the whole >>>>>>>>>>>>>>>>>>>>> "sourceRawKey/ >>>>>>>>>>>>>>>>>>>>> Value and >>>>>>>>>>>>>>>>>>>>> recordMetadata are null for punctuators" issue, seems >>>>>>>>>>>>>>>>>>>>> like an >>>>>>>>>>>>>>>>>>>>> insurmountable inconsistency that is more likely to >>>>>>>>>>>>>>>>>>>>> cause >>>>>>>>>>>>>>>>>>>>> users confusion >>>>>>>>>>>>>>>>>>>>> or problems than be helpful. >>>>>>>>>>>>>>>>>>>>> We could create a new metadata object and copy over >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> relevant info from >>>>>>>>>>>>>>>>>>>>> the ProcessingContext, but I worry that has the >>>>>>>>>>>>>>>>>>>>> potential >>>>>>>>>>>>>>>>>>>>> to explode memory >>>>>>>>>>>>>>>>>>>>> since we'd need to hold on to it for all in-flight >>>>>>>>>>>>>>>>>>>>> records up until they >>>>>>>>>>>>>>>>>>>>> are either successfully sent or failed and passed in >>>>>>>>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler. But if the metadata is >>>>>>>>>>>>>>>>>>>>> relatively small, it's >>>>>>>>>>>>>>>>>>>>> probably fine. Especially if it's just the raw source >>>>>>>>>>>>>>>>>>>>> key/value. Are >>>>>>>>>>>>>>>>>>>>> there any other parts of the ProcessingContext you >>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>> should be made >>>>>>>>>>>>>>>>>>>>> available? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Note that this only applies to the >>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler, as the >>>>>>>>>>>>>>>>>>>>> DeserializationExceptionHandler (and the newly >>>>>>>>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>> ProcessingExceptionHandler) would both be invoked >>>>>>>>>>>>>>>>>>>>> immediately and therefore >>>>>>>>>>>>>>>>>>>>> with the failed record's context. However, I'm also a >>>>>>>>>>>>>>>>>>>>> bit >>>>>>>>>>>>>>>>>>>>> uncomfortable >>>>>>>>>>>>>>>>>>>>> with adding the rawSourceKey/rawSourceValue to the >>>>>>>>>>>>>>>>>>>>> ProcessingContext. So >>>>>>>>>>>>>>>>>>>>> I'd propose to just wrap those (and any other metadata >>>>>>>>>>>>>>>>>>>>> you might want) in a >>>>>>>>>>>>>>>>>>>>> container class and pass that in instead of the >>>>>>>>>>>>>>>>>>>>> ProcessingContext, to all >>>>>>>>>>>>>>>>>>>>> of the exception handlers. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S5: >>>>>>>>>>>>>>>>>>>>> For some reason I'm finding the proposed API a little >>>>>>>>>>>>>>>>>>>>> bit >>>>>>>>>>>>>>>>>>>>> awkward, although >>>>>>>>>>>>>>>>>>>>> it's entirely possible that the problem is with me, >>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>> the proposal :) >>>>>>>>>>>>>>>>>>>>> Specifically I'm struggling with the approach of >>>>>>>>>>>>>>>>>>>>> piggybacking on the >>>>>>>>>>>>>>>>>>>>> exception handlers and their response enums to dictate >>>>>>>>>>>>>>>>>>>>> how records are >>>>>>>>>>>>>>>>>>>>> forwarded to the DLQ. I think this comes down to two >>>>>>>>>>>>>>>>>>>>> things, though again, >>>>>>>>>>>>>>>>>>>>> these aren't necessarily problems with the API and >>>>>>>>>>>>>>>>>>>>> probably just need to be >>>>>>>>>>>>>>>>>>>>> hashed out: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S5a. >>>>>>>>>>>>>>>>>>>>> When I envision a DLQ, to me, the most common use case >>>>>>>>>>>>>>>>>>>>> would be to forward >>>>>>>>>>>>>>>>>>>>> input records that failed somewhere along the >>>>>>>>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>>>>>> graph. But it >>>>>>>>>>>>>>>>>>>>> seems like all the focus here is on the two far >>>>>>>>>>>>>>>>>>>>> ends of >>>>>>>>>>>>>>>>>>>>> the subtopology -- >>>>>>>>>>>>>>>>>>>>> the input/consumer, and the output/producer. I get >>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> the ProcessingExceptionHandler is really the missing >>>>>>>>>>>>>>>>>>>>> piece here, and it's >>>>>>>>>>>>>>>>>>>>> hard to say anything specific since it's not yet >>>>>>>>>>>>>>>>>>>>> accepted, but maybe a >>>>>>>>>>>>>>>>>>>>> somewhat more concrete example would help. FWIW I >>>>>>>>>>>>>>>>>>>>> think/ >>>>>>>>>>>>>>>>>>>>> hope to get that >>>>>>>>>>>>>>>>>>>>> KIP accepted and implementation ASAP, so I'm not >>>>>>>>>>>>>>>>>>>>> worried >>>>>>>>>>>>>>>>>>>>> about the "what if >>>>>>>>>>>>>>>>>>>>> it doesn't happen" case -- more just want to know >>>>>>>>>>>>>>>>>>>>> what it >>>>>>>>>>>>>>>>>>>>> will look like >>>>>>>>>>>>>>>>>>>>> when it does. Imo it's fine to build KIPs on top of >>>>>>>>>>>>>>>>>>>>> future ones, it feels >>>>>>>>>>>>>>>>>>>>> clear that this part will just have to wait for >>>>>>>>>>>>>>>>>>>>> that KIP >>>>>>>>>>>>>>>>>>>>> to actually be >>>>>>>>>>>>>>>>>>>>> added. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S5b: >>>>>>>>>>>>>>>>>>>>> Why do users have to define the entire >>>>>>>>>>>>>>>>>>>>> ProducerRecord -- >>>>>>>>>>>>>>>>>>>>> shouldn't Streams >>>>>>>>>>>>>>>>>>>>> handle all this for them? Or will we just >>>>>>>>>>>>>>>>>>>>> automatically >>>>>>>>>>>>>>>>>>>>> send every record >>>>>>>>>>>>>>>>>>>>> on failure to the default global DLQ, and users only >>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>> to implement the >>>>>>>>>>>>>>>>>>>>> handlers if they want to change the headers or send >>>>>>>>>>>>>>>>>>>>> to a >>>>>>>>>>>>>>>>>>>>> different topic? I >>>>>>>>>>>>>>>>>>>>> was having a bit of trouble understanding what the >>>>>>>>>>>>>>>>>>>>> behavior would be if >>>>>>>>>>>>>>>>>>>>> someone configured a >>>>>>>>>>>>>>>>>>>>> "errors.deadletterqueue.topic.name" >>>>>>>>>>>>>>>>>>>>> but didn't >>>>>>>>>>>>>>>>>>>>> implement the handlers. Apologies if it's somewhere in >>>>>>>>>>>>>>>>>>>>> the KIP and I >>>>>>>>>>>>>>>>>>>>> happened to miss it! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Either way, I really think an example would help me to >>>>>>>>>>>>>>>>>>>>> better imagine what >>>>>>>>>>>>>>>>>>>>> this will look like in practice, and evaluate >>>>>>>>>>>>>>>>>>>>> whether it >>>>>>>>>>>>>>>>>>>>> actually involves >>>>>>>>>>>>>>>>>>>>> as much overhead as I'm worried it will. Can you add a >>>>>>>>>>>>>>>>>>>>> section that >>>>>>>>>>>>>>>>>>>>> includes a basic implementation of all the features >>>>>>>>>>>>>>>>>>>>> here? >>>>>>>>>>>>>>>>>>>>> Nothing too >>>>>>>>>>>>>>>>>>>>> complicated, just the most bare-bones code needed to >>>>>>>>>>>>>>>>>>>>> actually implement >>>>>>>>>>>>>>>>>>>>> forwarding to a dead-letter-queue via the handlers. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Lastly, two super small things: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S6: >>>>>>>>>>>>>>>>>>>>> We use camel case in Streams, so it should be >>>>>>>>>>>>>>>>>>>>> rawSourceKey/Value rather >>>>>>>>>>>>>>>>>>>>> than raw_source_key/value >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S7: >>>>>>>>>>>>>>>>>>>>> Can you add javadocs for the >>>>>>>>>>>>>>>>>>>>> #withDeadLetterQueueRecord? >>>>>>>>>>>>>>>>>>>>> For example, it >>>>>>>>>>>>>>>>>>>>> seems to me that if the topic to be sent to here is >>>>>>>>>>>>>>>>>>>>> different than the >>>>>>>>>>>>>>>>>>>>> default/global DLQ, then the user will need to make >>>>>>>>>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>>>>> to have created >>>>>>>>>>>>>>>>>>>>> this themselves up front. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> That's it from me...sorry for the long response, it's >>>>>>>>>>>>>>>>>>>>> just because I'm >>>>>>>>>>>>>>>>>>>>> excited for this feature and have been waiting on a >>>>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>>> for this for years. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 11:10?AM Damien Gasparina >>>>>>>>>>>>>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Andrew, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks a lot for your review, plenty of good points! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 11. Typo fixed, good cach. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 12. I do agree with you and Nick also mentioned it, I >>>>>>>>>>>>>>>>>>>>>> updated the KIP >>>>>>>>>>>>>>>>>>>>>> to mention that context headers should be forwarded. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 13. Good catch, to be consistent with KIP-298, and >>>>>>>>>>>>>>>>>>>>>> without a strong >>>>>>>>>>>>>>>>>>>>>> opinion from my side, I updated the KIP with your >>>>>>>>>>>>>>>>>>>>>> prefix >>>>>>>>>>>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 14. I am not sure about this point, a big difference >>>>>>>>>>>>>>>>>>>>>> between KIP-298 >>>>>>>>>>>>>>>>>>>>>> and this one is that the handlers can easily be >>>>>>>>>>>>>>>>>>>>>> overridden, something >>>>>>>>>>>>>>>>>>>>>> that is not doable in Kafka Connect. >>>>>>>>>>>>>>>>>>>>>> If someone would like a different behavior, e.g. to >>>>>>>>>>>>>>>>>>>>>> mask >>>>>>>>>>>>>>>>>>>>>> the payload >>>>>>>>>>>>>>>>>>>>>> or include further headers, I think we should >>>>>>>>>>>>>>>>>>>>>> encourage >>>>>>>>>>>>>>>>>>>>>> them to write >>>>>>>>>>>>>>>>>>>>>> their own exception handlers to build the DLQ Record >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> way they >>>>>>>>>>>>>>>>>>>>>> expect. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 15. Yeah, that's a good point, I was not fully >>>>>>>>>>>>>>>>>>>>>> convinced >>>>>>>>>>>>>>>>>>>>>> about putting >>>>>>>>>>>>>>>>>>>>>> a String in it, I do assume that "null" is also a >>>>>>>>>>>>>>>>>>>>>> valid >>>>>>>>>>>>>>>>>>>>>> value. I do >>>>>>>>>>>>>>>>>>>>>> assume that the Stacktrace and the Exception in this >>>>>>>>>>>>>>>>>>>>>> case are the key >>>>>>>>>>>>>>>>>>>>>> metadata for the user to troubleshoot the problem. >>>>>>>>>>>>>>>>>>>>>> I updated the KIP to mention that the value should be >>>>>>>>>>>>>>>>>>>>>> null if >>>>>>>>>>>>>>>>>>>>>> triggered in a punctuate. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 16. I added a session to mention that Kafka Streams >>>>>>>>>>>>>>>>>>>>>> would not try to >>>>>>>>>>>>>>>>>>>>>> automatically create the topic and the topic should >>>>>>>>>>>>>>>>>>>>>> either be >>>>>>>>>>>>>>>>>>>>>> automatically created, or pre-created. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 17. If a DLQ record can not be sent, the exception >>>>>>>>>>>>>>>>>>>>>> should go to the >>>>>>>>>>>>>>>>>>>>>> uncaughtExceptionHandler. Let me clearly state it in >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina >>>>>>>>>>>>>>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Nick, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 1. Good point, that's less impactful than a custom >>>>>>>>>>>>>>>>>>>>>>> interface, I just >>>>>>>>>>>>>>>>>>>>>>> updated the KIP with the new signature. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 1.a I really meant ProducerRecord, that's the class >>>>>>>>>>>>>>>>>>>>>>> used to forward to >>>>>>>>>>>>>>>>>>>>>>> downstream processors in the PAPI. The only >>>>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>>> missing in >>>>>>>>>>>>>>>>>>>>>>> this class is the topic name. I also considered >>>>>>>>>>>>>>>>>>>>>>> relying >>>>>>>>>>>>>>>>>>>>>>> on the Kafka >>>>>>>>>>>>>>>>>>>>>>> Producer ProducerRecord, but I assume it would >>>>>>>>>>>>>>>>>>>>>>> not be >>>>>>>>>>>>>>>>>>>>>>> consistent with >>>>>>>>>>>>>>>>>>>>>>> the KafkaStreams API. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 2. Agreed >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 2.a I do think exceptions occurring during punctuate >>>>>>>>>>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>>>>>> included in the DLQ. >>>>>>>>>>>>>>>>>>>>>>> Even if building a suitable payload is almost >>>>>>>>>>>>>>>>>>>>>>> impossible, even with >>>>>>>>>>>>>>>>>>>>>>> custom code; those exceptions are still fatal for >>>>>>>>>>>>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>>>>>>>> Streams by >>>>>>>>>>>>>>>>>>>>>>> default and are something that can not be ignored >>>>>>>>>>>>>>>>>>>>>>> safely. >>>>>>>>>>>>>>>>>>>>>>> I do assume that most users would want to be >>>>>>>>>>>>>>>>>>>>>>> informed >>>>>>>>>>>>>>>>>>>>>>> if an error >>>>>>>>>>>>>>>>>>>>>>> happened during a punctuate, even if only the >>>>>>>>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>>>>>>> stacktrace, exception) is provided. >>>>>>>>>>>>>>>>>>>>>>> I am only concerned flooding the DLQ topic as, if a >>>>>>>>>>>>>>>>>>>>>>> scheduled >>>>>>>>>>>>>>>>>>>>>>> operation failed, very likely it will fails >>>>>>>>>>>>>>>>>>>>>>> during the >>>>>>>>>>>>>>>>>>>>>>> next >>>>>>>>>>>>>>>>>>>>>>> invocation, but >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 4. Good point, I clarified the wording in the KIP to >>>>>>>>>>>>>>>>>>>>>>> make it explicit. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 5. Good point, I will clearly mention that it is >>>>>>>>>>>>>>>>>>>>>>> out of >>>>>>>>>>>>>>>>>>>>>>> scope as part >>>>>>>>>>>>>>>>>>>>>>> of the KIP and might not be as trivial as people >>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>> expect. I will >>>>>>>>>>>>>>>>>>>>>>> update the KIP once I do have some spare time. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 6. Oh yeah, I didn't think about it, but forwarding >>>>>>>>>>>>>>>>>>>>>>> input headers >>>>>>>>>>>>>>>>>>>>>>> would definitely make sense. Confluent Schema >>>>>>>>>>>>>>>>>>>>>>> Registry >>>>>>>>>>>>>>>>>>>>>>> ID is actually >>>>>>>>>>>>>>>>>>>>>>> part of the payload, but many correlation ID and >>>>>>>>>>>>>>>>>>>>>>> technical metadata >>>>>>>>>>>>>>>>>>>>>>> are passed through headers, it makes sense to >>>>>>>>>>>>>>>>>>>>>>> forward >>>>>>>>>>>>>>>>>>>>>>> them, specially >>>>>>>>>>>>>>>>>>>>>>> as it is the default behavior of Kafka Streams, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford >>>>>>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Damien and Sebastien, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>>>>>> I think you can just add a `String topic` >>>>>>>>>>>>>>>>>>>>>>>> argument to >>>>>>>>>>>>>>>>>>>>>>>> the existing >>>>>>>>>>>>>>>>>>>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], >>>>>>>>>>>>>>>>>>>>>>>> byte[]> >>>>>>>>>>>>>>>>>>>>>>>> deadLetterQueueRecord)` method, and then the >>>>>>>>>>>>>>>>>>>>>>>> implementation of the >>>>>>>>>>>>>>>>>>>>>>>> exception handler could choose the topic to send >>>>>>>>>>>>>>>>>>>>>>>> records to using >>>>>>>>>>>>>>>>>>>>>> whatever >>>>>>>>>>>>>>>>>>>>>>>> logic the user desires. You could perhaps provide a >>>>>>>>>>>>>>>>>>>>>>>> built-in >>>>>>>>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>>>>>>>>> that leverages your new config to send all >>>>>>>>>>>>>>>>>>>>>>>> records to >>>>>>>>>>>>>>>>>>>>>>>> an untyped DLQ >>>>>>>>>>>>>>>>>>>>>> topic? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 1a. >>>>>>>>>>>>>>>>>>>>>>>> BTW you have a typo: in your >>>>>>>>>>>>>>>>>>>>>>>> DeserializationExceptionHandler, the type >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> your `deadLetterQueueRecord` argument is >>>>>>>>>>>>>>>>>>>>>>>> `ProducerRecord`, when it >>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>> probably be `ConsumerRecord`. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 2. >>>>>>>>>>>>>>>>>>>>>>>> Agreed. I think it's a good idea to provide an >>>>>>>>>>>>>>>>>>>>>>>> implementation that >>>>>>>>>>>>>>>>>>>>>> sends to >>>>>>>>>>>>>>>>>>>>>>>> a single DLQ by default, but it's important to >>>>>>>>>>>>>>>>>>>>>>>> enable >>>>>>>>>>>>>>>>>>>>>>>> users to >>>>>>>>>>>>>>>>>>>>>> customize >>>>>>>>>>>>>>>>>>>>>>>> this with their own exception handlers. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 2a. >>>>>>>>>>>>>>>>>>>>>>>> I'm not convinced that "errors" (e.g. failed >>>>>>>>>>>>>>>>>>>>>>>> punctuate) should be sent >>>>>>>>>>>>>>>>>>>>>> to a >>>>>>>>>>>>>>>>>>>>>>>> DLQ topic like it's a bad record. To me, a DLQ >>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>> only contain >>>>>>>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>>>>>>>>> that failed to process. I'm not even sure how a >>>>>>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>> re-process/action one of these other errors; it >>>>>>>>>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>>>>>> like the purview >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> error logging to me? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 4. >>>>>>>>>>>>>>>>>>>>>>>> My point here was that I think it would be >>>>>>>>>>>>>>>>>>>>>>>> useful for >>>>>>>>>>>>>>>>>>>>>>>> the KIP to >>>>>>>>>>>>>>>>>>>>>> contain an >>>>>>>>>>>>>>>>>>>>>>>> explanation of the behavior both with KIP-1033 and >>>>>>>>>>>>>>>>>>>>>>>> without it. i.e. >>>>>>>>>>>>>>>>>>>>>> clarify >>>>>>>>>>>>>>>>>>>>>>>> if/how records that throw an exception in a >>>>>>>>>>>>>>>>>>>>>>>> processor >>>>>>>>>>>>>>>>>>>>>>>> are handled. At >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> moment, I'm assuming that without KIP-1033, >>>>>>>>>>>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>>>>>>>>> exceptions >>>>>>>>>>>>>>>>>>>>>> would not >>>>>>>>>>>>>>>>>>>>>>>> cause records to be sent to the DLQ, but with >>>>>>>>>>>>>>>>>>>>>>>> KIP-1033, they would. If >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> assumption is correct, I think it should be made >>>>>>>>>>>>>>>>>>>>>>>> explicit in the KIP. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 5. >>>>>>>>>>>>>>>>>>>>>>>> Understood. You may want to make this explicit >>>>>>>>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>>>>>> documentation for >>>>>>>>>>>>>>>>>>>>>>>> users, so they understand the consequences of re- >>>>>>>>>>>>>>>>>>>>>>>> processing data sent >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> their DLQ. The main reason I raised this point is >>>>>>>>>>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>>>>>>>>>> something that's >>>>>>>>>>>>>>>>>>>>>>>> tripped me up in numerous KIPs that that committers >>>>>>>>>>>>>>>>>>>>>>>> frequently remind >>>>>>>>>>>>>>>>>>>>>> me >>>>>>>>>>>>>>>>>>>>>>>> of; so I wanted to get ahead of it for once! :D >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> And one new point: >>>>>>>>>>>>>>>>>>>>>>>> 6. >>>>>>>>>>>>>>>>>>>>>>>> The DLQ record schema appears to discard all custom >>>>>>>>>>>>>>>>>>>>>>>> headers set on the >>>>>>>>>>>>>>>>>>>>>>>> source record. Is there a way these can be >>>>>>>>>>>>>>>>>>>>>>>> included? >>>>>>>>>>>>>>>>>>>>>>>> In particular, I'm >>>>>>>>>>>>>>>>>>>>>>>> concerned with "schema pointer" headers (like those >>>>>>>>>>>>>>>>>>>>>>>> set by Schema >>>>>>>>>>>>>>>>>>>>>>>> Registry), that may need to be propagated, >>>>>>>>>>>>>>>>>>>>>>>> especially >>>>>>>>>>>>>>>>>>>>>>>> if the records >>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>> fed back into the source topics for re- >>>>>>>>>>>>>>>>>>>>>>>> processing by >>>>>>>>>>>>>>>>>>>>>>>> the user. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina >>>>>>>>>>>>>>>>>>>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for your review and your useful >>>>>>>>>>>>>>>>>>>>>>>>> comments! >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 1. It is a good point, as you mentioned, I >>>>>>>>>>>>>>>>>>>>>>>>> think it >>>>>>>>>>>>>>>>>>>>>>>>> would make sense >>>>>>>>>>>>>>>>>>>>>>>>> in some use cases to have potentially multiple DLQ >>>>>>>>>>>>>>>>>>>>>>>>> topics, so we >>>>>>>>>>>>>>>>>>>>>>>>> should provide an API to let users do it. >>>>>>>>>>>>>>>>>>>>>>>>> Thinking out-loud here, maybe it is a better >>>>>>>>>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>>>>>> to create a new >>>>>>>>>>>>>>>>>>>>>>>>> Record class containing the topic name, e.g. >>>>>>>>>>>>>>>>>>>>>>>>> DeadLetterQueueRecord >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> changing the signature to >>>>>>>>>>>>>>>>>>>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord> >>>>>>>>>>>>>>>>>>>>>>>>> deadLetterQueueRecords) instead of >>>>>>>>>>>>>>>>>>>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], >>>>>>>>>>>>>>>>>>>>>>>>> byte[]> >>>>>>>>>>>>>>>>>>>>>>>>> deadLetterQueueRecord). What do you think? >>>>>>>>>>>>>>>>>>>>>>>>> DeadLetterQueueRecord >>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>> be something like "class DeadLetterQueueRecord >>>>>>>>>>>>>>>>>>>>>>>>> extends >>>>>>>>>>>>>>>>>>>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords >>>>>>>>>>>>>>>>>>>>>>>>> { String >>>>>>>>>>>>>>>>>>>>>>>>> topic; /* + getter/setter + */ } " >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 2. I think the root question here is: should we >>>>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>> one DLQ topic or >>>>>>>>>>>>>>>>>>>>>>>>> multiple DLQ topics by default. This question >>>>>>>>>>>>>>>>>>>>>>>>> highly >>>>>>>>>>>>>>>>>>>>>>>>> depends on the >>>>>>>>>>>>>>>>>>>>>>>>> context, but implementing a default >>>>>>>>>>>>>>>>>>>>>>>>> implementation to >>>>>>>>>>>>>>>>>>>>>>>>> handle multiple >>>>>>>>>>>>>>>>>>>>>>>>> DLQ topics would be opinionated, e.g. how to >>>>>>>>>>>>>>>>>>>>>>>>> manage >>>>>>>>>>>>>>>>>>>>>>>>> errors in a >>>>>>>>>>>>>>>>>>>>>>>>> punctuate? >>>>>>>>>>>>>>>>>>>>>>>>> I think it makes sense to have the default >>>>>>>>>>>>>>>>>>>>>>>>> implementation writing all >>>>>>>>>>>>>>>>>>>>>>>>> faulty records to a single DLQ, that's at least >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> approach I used >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>> past applications: one DLQ per Kafka Streams >>>>>>>>>>>>>>>>>>>>>>>>> application. Of course >>>>>>>>>>>>>>>>>>>>>>>>> the message format could change in the DLQ e.g. >>>>>>>>>>>>>>>>>>>>>>>>> due >>>>>>>>>>>>>>>>>>>>>>>>> to the source >>>>>>>>>>>>>>>>>>>>>>>>> topic, but those DLQ records will be very likely >>>>>>>>>>>>>>>>>>>>>>>>> troubleshooted, and >>>>>>>>>>>>>>>>>>>>>>>>> maybe replay, manually anyway. >>>>>>>>>>>>>>>>>>>>>>>>> If a user needs to have multiple DLQ topics or >>>>>>>>>>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>>>>>>> to enforce a >>>>>>>>>>>>>>>>>>>>>>>>> specific schema, it's still possible, but they >>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>> implement >>>>>>>>>>>>>>>>>>>>>>>>> custom Exception Handlers. >>>>>>>>>>>>>>>>>>>>>>>>> Coming back to 1. I do agree that it would make >>>>>>>>>>>>>>>>>>>>>>>>> sense >>>>>>>>>>>>>>>>>>>>>>>>> to have the >>>>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>>>>> set the DLQ topic name in the handlers for more >>>>>>>>>>>>>>>>>>>>>>>>> flexibility. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 3. Good point, sorry it was a typo, the >>>>>>>>>>>>>>>>>>>>>>>>> ProcessingContext makes much >>>>>>>>>>>>>>>>>>>>>>>>> more sense here indeed. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 4. I do assume that we could implement KIP-1033 >>>>>>>>>>>>>>>>>>>>>>>>> (Processing exception >>>>>>>>>>>>>>>>>>>>>>>>> handler) independently from KIP-1034. I do hope >>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>> KIP-1033 would >>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>> adopted and implemented before KIP-1034, but if >>>>>>>>>>>>>>>>>>>>>>>>> that's not the case, >>>>>>>>>>>>>>>>>>>>>>>>> we could implement KIP-1034 indepantly and update >>>>>>>>>>>>>>>>>>>>>>>>> KIP-1033 to include >>>>>>>>>>>>>>>>>>>>>>>>> the DLQ record afterward (in the same KIP or in a >>>>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>> one if not >>>>>>>>>>>>>>>>>>>>>>>>> possible). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 5. I think we should be clear that this KIP only >>>>>>>>>>>>>>>>>>>>>>>>> covers the DLQ >>>>>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>>>>> produced. >>>>>>>>>>>>>>>>>>>>>>>>> Everything related to replay messages or recovery >>>>>>>>>>>>>>>>>>>>>>>>> plan should be >>>>>>>>>>>>>>>>>>>>>>>>> considered out-of-scope as it is use-case and >>>>>>>>>>>>>>>>>>>>>>>>> error >>>>>>>>>>>>>>>>>>>>>>>>> specific. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Let me know if that's not clear, there are >>>>>>>>>>>>>>>>>>>>>>>>> definitely >>>>>>>>>>>>>>>>>>>>>>>>> points that >>>>>>>>>>>>>>>>>>>>>>>>> highly debatable. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>> Damien >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford >>>>>>>>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Oh, and one more thing: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 5. >>>>>>>>>>>>>>>>>>>>>>>>>> Whenever you take a record out of the stream, and >>>>>>>>>>>>>>>>>>>>>>>>>> then potentially >>>>>>>>>>>>>>>>>>>>>>>>>> re-introduce it at a later date, you introduce >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> potential for >>>>>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>>>>>> ordering issues. For example, that record could >>>>>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>>> been destined >>>>>>>>>>>>>>>>>>>>>> for a >>>>>>>>>>>>>>>>>>>>>>>>>> Window that has been closed by the time it's re- >>>>>>>>>>>>>>>>>>>>>>>>>> processed. I'd >>>>>>>>>>>>>>>>>>>>>> like to >>>>>>>>>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>>>>>>>>> a section that considers these consequences, and >>>>>>>>>>>>>>>>>>>>>>>>>> perhaps make >>>>>>>>>>>>>>>>>>>>>> those risks >>>>>>>>>>>>>>>>>>>>>>>>>> clear to users. For the record, this is exactly >>>>>>>>>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>>>>>>>> sunk KIP-990, >>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>> was an alternative approach to error handling >>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>> introduced the >>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>> issues. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford >>>>>>>>>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com >>>>>>>>>>>>>>>>>>>> <mailto:nick.telf...@gmail.com%0b>> > > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Damien, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Dead-letter queues are >>>>>>>>>>>>>>>>>>>>>>>>>>> something that I >>>>>>>>>>>>>>>>>>>>>> think a >>>>>>>>>>>>>>>>>>>>>>>>> lot of >>>>>>>>>>>>>>>>>>>>>>>>>>> users would like. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there are a few points with this KIP >>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>> concern me: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>>>>>>>>> It looks like you can only define a single, >>>>>>>>>>>>>>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>>>>>>>>> DLQ for the >>>>>>>>>>>>>>>>>>>>>> entire >>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka Streams application? What about >>>>>>>>>>>>>>>>>>>>>>>>>>> applications >>>>>>>>>>>>>>>>>>>>>>>>>>> that would >>>>>>>>>>>>>>>>>>>>>> like to >>>>>>>>>>>>>>>>>>>>>>>>>>> define different DLQs for different data flows? >>>>>>>>>>>>>>>>>>>>>>>>>>> This is >>>>>>>>>>>>>>>>>>>>>> especially >>>>>>>>>>>>>>>>>>>>>>>>>>> important when dealing with multiple source >>>>>>>>>>>>>>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>>>>>>>>>>>>>> that have >>>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>> record schemas. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 2. >>>>>>>>>>>>>>>>>>>>>>>>>>> Your DLQ payload value can either be the record >>>>>>>>>>>>>>>>>>>>>>>>>>> value that >>>>>>>>>>>>>>>>>>>>>> failed, or >>>>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>> error string (such as "error during punctuate"). >>>>>>>>>>>>>>>>>>>>>>>>>>> This is likely >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> cause >>>>>>>>>>>>>>>>>>>>>>>>>>> problems when users try to process the records >>>>>>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>> the DLQ, as >>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>> can't >>>>>>>>>>>>>>>>>>>>>>>>>>> guarantee the format of every record value >>>>>>>>>>>>>>>>>>>>>>>>>>> will be >>>>>>>>>>>>>>>>>>>>>>>>>>> the same. >>>>>>>>>>>>>>>>>>>>>> This is >>>>>>>>>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>>>>> loosely related to point 1. above. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 3. >>>>>>>>>>>>>>>>>>>>>>>>>>> You provide a ProcessorContext to both exception >>>>>>>>>>>>>>>>>>>>>>>>>>> handlers, but >>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>> cannot be used to forward records. In that >>>>>>>>>>>>>>>>>>>>>>>>>>> case, I >>>>>>>>>>>>>>>>>>>>>>>>>>> believe you >>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessingContext instead, which statically >>>>>>>>>>>>>>>>>>>>>>>>>>> guarantees that it >>>>>>>>>>>>>>>>>>>>>> can't be >>>>>>>>>>>>>>>>>>>>>>>>>>> used to forward records. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 4. >>>>>>>>>>>>>>>>>>>>>>>>>>> You mention the KIP-1033 >>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessingExceptionHandler, but what's >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> plan >>>>>>>>>>>>>>>>>>>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands >>>>>>>>>>>>>>>>>>>>>>>>>>> before 1033? >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina < >>>>>>>>>>>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> In a general way, if the user does not >>>>>>>>>>>>>>>>>>>>>>>>>>>> configure >>>>>>>>>>>>>>>>>>>>>>>>>>>> the right ACL, >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>> would be a security issue, but that's true for >>>>>>>>>>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>>>>>> topic. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> This KIP allows users to configure a Dead >>>>>>>>>>>>>>>>>>>>>>>>>>>> Letter >>>>>>>>>>>>>>>>>>>>>>>>>>>> Queue without >>>>>>>>>>>>>>>>>>>>>> writing >>>>>>>>>>>>>>>>>>>>>>>>>>>> custom Java code in Kafka Streams, not at the >>>>>>>>>>>>>>>>>>>>>>>>>>>> topic level. >>>>>>>>>>>>>>>>>>>>>>>>>>>> A lot of applications are already implementing >>>>>>>>>>>>>>>>>>>>>>>>>>>> this pattern, >>>>>>>>>>>>>>>>>>>>>> but the >>>>>>>>>>>>>>>>>>>>>>>>>>>> required code to do it is quite painful and >>>>>>>>>>>>>>>>>>>>>>>>>>>> error >>>>>>>>>>>>>>>>>>>>>>>>>>>> prone, for >>>>>>>>>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>>>>>>>>>>>>>> most apps I have seen created a new >>>>>>>>>>>>>>>>>>>>>>>>>>>> KafkaProducer >>>>>>>>>>>>>>>>>>>>>>>>>>>> to send >>>>>>>>>>>>>>>>>>>>>> records to >>>>>>>>>>>>>>>>>>>>>>>>>>>> their DLQ. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> As it would be disabled by default for backward >>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility, >>>>>>>>>>>>>>>>>>>>>> I doubt >>>>>>>>>>>>>>>>>>>>>>>>>>>> it would generate any security concern. >>>>>>>>>>>>>>>>>>>>>>>>>>>> If a user explicitly configures a Deal Letter >>>>>>>>>>>>>>>>>>>>>>>>>>>> Queue, it would >>>>>>>>>>>>>>>>>>>>>> be up to >>>>>>>>>>>>>>>>>>>>>>>>>>>> him to configure the relevant ACLs to ensure >>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>> the right >>>>>>>>>>>>>>>>>>>>>> principal >>>>>>>>>>>>>>>>>>>>>>>>>>>> can access it. >>>>>>>>>>>>>>>>>>>>>>>>>>>> It is already the case for all internal, input >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>> output Kafka >>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams topics (e.g. repartition, changelog >>>>>>>>>>>>>>>>>>>>>>>>>>>> topics) that also >>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>>>>> contain confidential data, so I do not think we >>>>>>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>> implement a >>>>>>>>>>>>>>>>>>>>>>>>>>>> different behavior for this one. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP, we configured the default DLQ >>>>>>>>>>>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>>>>>>>> to have the >>>>>>>>>>>>>>>>>>>>>> initial >>>>>>>>>>>>>>>>>>>>>>>>>>>> record key/value as we assume that it is the >>>>>>>>>>>>>>>>>>>>>>>>>>>> expected and wanted >>>>>>>>>>>>>>>>>>>>>>>>>>>> behavior for most applications. >>>>>>>>>>>>>>>>>>>>>>>>>>>> If a user does not want to have the key/ >>>>>>>>>>>>>>>>>>>>>>>>>>>> value in >>>>>>>>>>>>>>>>>>>>>>>>>>>> the DLQ record >>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>> any reason, they could still implement >>>>>>>>>>>>>>>>>>>>>>>>>>>> exception >>>>>>>>>>>>>>>>>>>>>>>>>>>> handlers to >>>>>>>>>>>>>>>>>>>>>> build >>>>>>>>>>>>>>>>>>>>>>>>>>>> their own DLQ record. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding ACL, maybe something smarter could be >>>>>>>>>>>>>>>>>>>>>>>>>>>> done in Kafka >>>>>>>>>>>>>>>>>>>>>> Streams, >>>>>>>>>>>>>>>>>>>>>>>>>>>> but this is out of scope for this KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren >>>>>>>>>>>>>>>>>>>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> My concern is that someone would create a dead >>>>>>>>>>>>>>>>>>>>>>>>>>>>> letter queue >>>>>>>>>>>>>>>>>>>>>> on a >>>>>>>>>>>>>>>>>>>>>>>>>>>> sensitive >>>>>>>>>>>>>>>>>>>>>>>>>>>>> topic and not get the ACL correct from the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> start. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thus >>>>>>>>>>>>>>>>>>>>>> causing >>>>>>>>>>>>>>>>>>>>>>>>>>>> potential >>>>>>>>>>>>>>>>>>>>>>>>>>>>> confidential data leak. Is there anything >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>> prevent that from happening? If so I did not >>>>>>>>>>>>>>>>>>>>>>>>>>>>> recognize it as >>>>>>>>>>>>>>>>>>>>>> such. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45?AM Damien >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gasparina < >>>>>>>>>>>>>>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Claude, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In this KIP, the Dead Letter Queue is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized by a >>>>>>>>>>>>>>>>>>>>>> standard >>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> independant topic, thus normal ACL applies >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like any >>>>>>>>>>>>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>>>>>> topic. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This should not introduce any security >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> obviously, >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> right >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ACL would need to be provided to write to the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DLQ if >>>>>>>>>>>>>>>>>>>>>> configured. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Damien >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Warren, Jr >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am new to the Kafka codebase so please >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> excuse >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>> ignorance >>>>>>>>>>>>>>>>>>>>>>>>> on my >>>>>>>>>>>>>>>>>>>>>>>>>>>> part. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> When a dead letter queue is established is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there a >>>>>>>>>>>>>>>>>>>>>> process to >>>>>>>>>>>>>>>>>>>>>>>>>>>> ensure that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it at least is defined with the same ACL as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> original >>>>>>>>>>>>>>>>>>>>>> queue? >>>>>>>>>>>>>>>>>>>>>>>>>>>> Without >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> such a guarantee at the start it seems that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> managing dead >>>>>>>>>>>>>>>>>>>>>> letter >>>>>>>>>>>>>>>>>>>>>>>>>>>> queues >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be fraught with security issues. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34?AM Damien >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gasparina < >>>>>>>>>>>>>>>>>>>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To continue on our effort to improve Kafka >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Streams error >>>>>>>>>>>>>>>>>>>>>>>>>>>> handling, we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> propose a new KIP to add out of the box >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support for Dead >>>>>>>>>>>>>>>>>>>>>>>>> Letter >>>>>>>>>>>>>>>>>>>>>>>>>>>> Queue. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The goal of this KIP is to provide a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> default >>>>>>>>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be suitable for most applications >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>>>>>> users to >>>>>>>>>>>>>>>>>>>>>>>>>>>> override >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it if they have specific requirements. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In order to build a suitable payload, some >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> included in this KIP: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>> available, the >>>>>>>>>>>>>>>>>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> node raw key/value byte[] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. expose the ProcessingContext to the >>>>>>>>>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is currently not available in the handle >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding point 2., to expose the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessingContext to >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ProductionExceptionHandler, we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> considered two >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> choices: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter in >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> handle() >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> method. That's the cleanest way IMHO, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would need >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>> deprecate >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the old method. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> attribute in >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> interface. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This way, no method is deprecated, but we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would not be >>>>>>>>>>>>>>>>>>>>>>>>> consistent >>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the other ExceptionHandler. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>>>>> signature >>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one deprecated), but we could use other >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> opinions on >>>>>>>>>>>>>>>>>>>>>> this part. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> More information is available directly >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP link: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/<https://cwiki.apache.org/confluence/display/KAFKA> >>>>>>>>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA<https://cwiki.apache.org/confluence/display/KAFKA>> >>>>>>>>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/<https://cwiki.apache.org/confluence/display> >>>>>>>>>>>>>>>>>>>>>> KAFKA<https://cwiki.apache.org/confluence/display/<https://cwiki.apache.org/confluence/display/> >>>>>>>>>>>>>>>>>>>>>> KAFKA>> >>>>>>>>>>>>>>>>>>>>>> KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA<http://cwiki.apache.org/confluence/display/KAFKA>><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA<http://cwiki.apache.org/confluence/display/KAFKA><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA<http://cwiki.apache.org/confluence/display/KAFKA>>> >>>>>>>>>>>>>>>>>>>>>> KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/>><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/>>> >>>>>>>>>>>>>>>>>>>>>> KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/>><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/><http:// >>>>>>>>>>>>>>>>>>>>>> cwiki.apache.org/confluence/display/KAFKA/<http://cwiki.apache.org/confluence/display/KAFKA/>>> >>>>>>>>>>>>>>>>>>>>>> KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Feedbacks and suggestions are welcome, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Damien, Sebastien and Loic >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>> LinkedIn: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> http://www.linkedin.com/in/<http://www.linkedin.com/in> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in<http://www.linkedin.com/in>><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in<http://www.linkedin.com/in><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in<http://www.linkedin.com/in>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren<http://www.linkedin.com/in/<http://www.linkedin.com/in/> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in/<http://www.linkedin.com/in/>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren><http://www.linkedin.com/in/<http://www.linkedin.com/in/> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in/<http://www.linkedin.com/in/>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren<http://www.linkedin.com/in/<http://www.linkedin.com/in/> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in/<http://www.linkedin.com/in/>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren>><http://www.linkedin.com/in/<http://www.linkedin.com/in/> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in/<http://www.linkedin.com/in/>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren<http://www.linkedin.com/in/<http://www.linkedin.com/in/> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in/<http://www.linkedin.com/in/>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren><http://www.linkedin.com/in/<http://www.linkedin.com/in/> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in/<http://www.linkedin.com/in/>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren<http://www.linkedin.com/in/<http://www.linkedin.com/in/> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <http://www.linkedin.com/in/<http://www.linkedin.com/in/><http:// >>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.linkedin.com/in/<http://www.linkedin.com/in/>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> claudewarren>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >