On Fri, 18 Feb 2022 at 02:16, Matthias J. Sax <mj...@apache.org> wrote:
> > It probably deserves its own thread to start discussing ideas. > > Yes. My question was: if we think it's time to do a DSL 2.0, should we > drop this KIP and just fix via DSL 2.0 instead? > > Good question. Would love to hear what others think about this. I've stated my position about this here: > For this KIP specifically, I think about it as a continuation from KIP-478. Therefore, it could make sense to have it as part of the current version of the DSL. I'd even add that if this KIP is adopted, I would not be that disappointed if KIP-634 is dropped in favor of a DSL v2.0 as the access to headers provided by KIP-478's via Record API is much better than previous `.context().headers()`. But happy to reconsider if there is an agreement to focus efforts towards a DSL 2.0. > > You're right. I'm not proposing the method signature. > > What signature do you propose? I don't see an update on the KIP. > > My bad. I have clarified this in the KIP's public interfaces now: ``` New methods: - KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut> processorSupplier, String... stateStoreNames) - KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut> processorSupplier, Named named, String... stateStoreNames) Modified methods: - KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, String... stateStoreNames) - KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, Named named, String... stateStoreNames) ``` > > > Not sure if I understand how this would look like. Do you mean checking > it > > on the Record itself or somewhere else? > > @Guozhang: I am not worried about the runtime overhead. I am worries > about user experience. It's not clear from the method signature, that > you are not allowed to change the key, what seems to be bad API desig. > Even if I understand the desire to keep the API surface ares small -- I > would rather have a compile time enforcement than a runtime check. > > For example, we have `map()` and `mapValues()` and `mapValues()` returns > a `Value V` (enforces that that key is not changes) instead of a > `KeyValue<KIn,VOut>` and we use a runtime check to check that the key is > not changed. > > Naively, could we enforce something similar by setting the output key > type as `Void`. > > KStream#processValue(ProcessorSupplier<K, V, Void, VOut> > processorSupplier) > > Not sure if this would work or not? > > Or it might be worth to add a new interface, `ValueProcessorSupplier` > that ensures that the key is not modified? > > This is an important discussion, even more so with a DSL v2.0. At the moment, the DSL just flags whether partitioning is required based on the DSL operation. As mentioned, `mapValues()` enforces only the value has changed through the DSL, though the only _guarantee_ we have is that Kafka Streams "owns" the implementation, and we can flag this properly. With a hypothetical v2.0 based on Record API, this will be harder to enforce with the current APIs. e.g. with `mapValues(Record<K, V> record)`, nothing would stop users from using `record.withKey("needs_partitioning")`. The approach defined on this KIP is similar to what we have at the moment on `ValueTransformer*` where it validates at runtime that the users are not calling `forward` with `ForwardingDisabledProcessorContext`. `ValueProcessorSupplier` is not meant to be a public API. Only to be used internally on `processValues` implementation. At first, `KStream#processValue(ProcessorSupplier<K, V, Void, VOut> processorSupplier)` won't work as it will require the `Processor` implementation to actually change the key. Will take a deeper look to validate if this could solve this issue. > > > -Matthias > > > On 2/17/22 10:56 AM, Guozhang Wang wrote: > > Regarding the last question Matthias had, I wonder if it's similar to my > > first email's point 2) above? I think the rationale is that, since > > reference checks are relatively very cheap, it is worthwhile to pay this > > extra runtime checks and in return to have a single consolidated > > ProcessorSupplier programming interface (i.e. we would eventually > > deprecate ValueTransformerWithKeySupplier). > > > > On Wed, Feb 16, 2022 at 10:57 AM Jorge Esteban Quilcate Otoya < > > quilcate.jo...@gmail.com> wrote: > > > >> Thank you Matthias, this is great feedback. > >> > >> Adding my comments below. > >> > >> On Wed, 16 Feb 2022 at 00:42, Matthias J. Sax <mj...@apache.org> wrote: > >> > >>> Thanks for the KIP. > >>> > >>> In alignment to my reply to KIP-634, I am wondering if we are heading > >>> into the right direction, or if we should consider to re-design the DSL > >>> from scratch? > >>> > >>> > >> I'm very excited about the idea of a DLS v2.0. It probably deserves its > own > >> thread to start discussing ideas. > >> > >> For this KIP specifically, I think about it as a continuation from > KIP-478. > >> Therefore, it could make sense to have it as part of the current > version of > >> the DSL. > >> > >> > >>> > >>> Even if we don't do a DSL 2.0 right now, I have some concerns about > this > >>> KIP: > >>> > >>> (1) I am not sure if the propose changed is backward compatible? We > >>> currently have: > >>> > >>> void KStream#process(ProcessorSupplier, String...) > >>> > >>> The newly proposed method: > >>> > >>> KStream KStream#process(ProcessorSupplier) > >>> > >>> seems to be an incompatible change? > >>> > >>> The KIP states: > >>> > >>>> Modified method KStream#process should be compatible with previous > >>> version, that at the moment is fixed to a Void return type. > >>> > >>> Why is it backward compatible? Having both old and new #process() seems > >>> not to be compatible to me? Or are you proposing to _change_ the method > >>> signature (if yes, the `String...` parameter to add a state store seems > >>> to be missing)? For this case, it seems that existing programs would at > >>> least need to be recompiled -- it would only be a source compatible > >>> change, but not a binary compatible change? > >>> > >>> > >> You're right. I'm not proposing the method signature. > >> Totally agree about compatibility issue. I was only considering source > >> compatibility and was ignorant that changing from void to a specific > type > >> would break binary compatibility. > >> I will update the KIP to reflect this: > >> > >>> Modifications to method KStream#process are source compatible with > >> previous version, though not binary compatible. Therefore will require > >> users to recompile their applications with the latest version. > >> > >> > >>> I am also wondering if/how this change related to KIP-401: > >>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 > >>> > >>> From a high level it might not conflict, but I wanted to double > check? > >>> > >>> > >> Wasn't aware of this KIP, thanks for sharing! I don't think there is > >> conflict between KIPs, as far as I understand. > >> > >> > >>> > >>> For `KStream#processValues()`, my main concern is the added runtime > >>> check if the key was modified or not -- it seems to provide bad user > >>> experience -- enforcing that the key is not modified on an API level, > >>> would seem to be much better. > >>> > >>> Last, what is the purpose of `setRecordKey()` and `clearRecordKey()`? I > >>> am not sure if I understand their purpose? > >>> > >>> > >> Both methods set/clear the context (current key) to be used when > checking > >> keys on forward(record) implementation. > >> > >>> enforcing that the key is not modified on an API level, would seem to > be > >> much better. > >> > >> Not sure if I understand how this would look like. Do you mean checking > it > >> on the Record itself or somewhere else? > >> > >> > >> > >>> -Matthias > >>> > >>> > >>> On 2/15/22 11:53 AM, John Roesler wrote: > >>>> My apologies, this feedback was intended for KIP-634. > >>>> -John > >>>> > >>>> On Tue, Feb 15, 2022, at 13:15, John Roesler wrote: > >>>>> Thanks for the update, Jorge, > >>>>> > >>>>> I've just looked over the KIP again. Just one more small > >>>>> concern: > >>>>> > >>>>> 5) We can't just change the type of Record#headers() to a > >>>>> new fully qualified type. That would be a source- > >>>>> incompatible breaking change for users. > >>>>> > >>>>> Out options are: > >>>>> * Deprecate the existing method and create a new one with > >>>>> the new type > >>>>> * If the existing Headers is "not great but ok", then maybe > >>>>> we leave it alone. > >>>>> > >>>>> Thanks, > >>>>> -John > >>>>> > >>>>> On Mon, 2022-02-14 at 13:58 -0600, Paul Whalen wrote: > >>>>>> No specific comments, but I just wanted to mention I like the > >>> direction of > >>>>>> the KIP. My team is a big user of "transform" methods because of > the > >>>>>> ability to chain them, and I have always found the terminology > >>> challenging > >>>>>> to explain alongside "process". It felt like one concept with two > >>> names. > >>>>>> So moving towards a single API that is powerful enough to handle > both > >>> use > >>>>>> cases seems absolutely correct to me. > >>>>>> > >>>>>> Paul > >>>>>> > >>>>>> On Mon, Feb 14, 2022 at 1:12 PM Jorge Esteban Quilcate Otoya < > >>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>> > >>>>>>> Got it. Thanks John, this make sense. > >>>>>>> > >>>>>>> I've updated the KIP to include the deprecation of: > >>>>>>> > >>>>>>> - KStream#transform > >>>>>>> - KStream#transformValues > >>>>>>> - KStream#flatTransform > >>>>>>> - KStream#flatTransformValues > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Fri, 11 Feb 2022 at 15:16, John Roesler <vvcep...@apache.org> > >>> wrote: > >>>>>>> > >>>>>>>> Thanks, Jorge! > >>>>>>>> > >>>>>>>> I think it’ll be better to keep this KIP focused on KStream > methods > >>> only. > >>>>>>>> I suspect that the KTable methods may be more complicated than > just > >>> that > >>>>>>>> proposed replacement, but it’ll also be easier to consider that > >>> question > >>>>>>> in > >>>>>>>> isolation. > >>>>>>>> > >>>>>>>> The nice thing about just deprecating the KStream methods and not > >> the > >>>>>>>> Transform* interfaces is that you can keep your proposal just > >> scoped > >>> to > >>>>>>>> KStream and not have any consequences for the rest of the DSL. > >>>>>>>> > >>>>>>>> Thanks again, > >>>>>>>> John > >>>>>>>> > >>>>>>>> On Fri, Feb 11, 2022, at 06:43, Jorge Esteban Quilcate Otoya > wrote: > >>>>>>>>> Thanks, John. > >>>>>>>>> > >>>>>>>>>> 4) I agree that we shouldn't deprecate the Transformer* > >>>>>>>>> classes, but do you think we should deprecate the > >>>>>>>>> KStream#transform* methods? I'm curious if there's any > >>>>>>>>> remaining reason to have those methods, or if your KIP > >>>>>>>>> completely obviates them. > >>>>>>>>> > >>>>>>>>> Good catch. > >>>>>>>>> I considered that deprecating `Transformer*` and `transform*` > >> would > >>> go > >>>>>>>> hand > >>>>>>>>> in hand — maybe it happened similarly with old `Processor` and > >>>>>>> `process`? > >>>>>>>>> Though deprecating only `transform*` operations could be a better > >>>>>>> signal > >>>>>>>>> for users than non deprecating anything at all and pave the way > to > >>> it's > >>>>>>>>> deprecation. > >>>>>>>>> > >>>>>>>>> Should this deprecation also consider including > >>>>>>> `KTable#transformValues`? > >>>>>>>>> The approach proposed on the KIP: > >>>>>>>>> `ktable.toStream().processValues().toTable()` seems fair to me, > >>> though > >>>>>>> I > >>>>>>>>> will have to test it further. > >>>>>>>>> > >>>>>>>>> I'm happy to update the KIP if there's some consensus around > this. > >>>>>>>>> Will add the deprecation notes these days and wait for any > >>> additional > >>>>>>>>> feedback on this topic before wrapping up the KIP. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Fri, 11 Feb 2022 at 04:03, John Roesler <vvcep...@apache.org> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Thanks for the update, Jorge! > >>>>>>>>>> > >>>>>>>>>> I just read over the KIP again, and I'm in support. One more > >>>>>>>>>> question came up for me, though: > >>>>>>>>>> > >>>>>>>>>> 4) I agree that we shouldn't deprecate the Transformer* > >>>>>>>>>> classes, but do you think we should deprecate the > >>>>>>>>>> KStream#transform* methods? I'm curious if there's any > >>>>>>>>>> remaining reason to have those methods, or if your KIP > >>>>>>>>>> completely obviates them. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> -John > >>>>>>>>>> > >>>>>>>>>> On Thu, 2022-02-10 at 21:32 +0000, Jorge Esteban Quilcate > >>>>>>>>>> Otoya wrote: > >>>>>>>>>>> Thank you both for your feedback! > >>>>>>>>>>> > >>>>>>>>>>> I have added the following note on punctuation: > >>>>>>>>>>> > >>>>>>>>>>> ``` > >>>>>>>>>>> NOTE: The key validation can be defined when processing the > >>> message. > >>>>>>>>>>> Though, with punctuations it won't be possible to define the > key > >>> for > >>>>>>>>>>> validation before forwarding, therefore it won't be possible to > >>>>>>>> forward > >>>>>>>>>>> from punctuation. > >>>>>>>>>>> This is similar behavior to how `ValueTransformer`s behave at > >> the > >>>>>>>> moment. > >>>>>>>>>>> ``` > >>>>>>>>>>> > >>>>>>>>>>> Also make it explicit also that we are going to apply > >> referencial > >>>>>>>>>> equality > >>>>>>>>>>> for key validation. > >>>>>>>>>>> > >>>>>>>>>>> I hope this is covering all your feedback, let me know if I'm > >>>>>>> missing > >>>>>>>>>>> anything. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Jorge. > >>>>>>>>>>> > >>>>>>>>>>> On Wed, 9 Feb 2022 at 22:19, Guozhang Wang <wangg...@gmail.com > > > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I'm +1 on John's point 3) for punctuations. > >>>>>>>>>>>> > >>>>>>>>>>>> And I think if people are on the same page that a reference > >>>>>>> equality > >>>>>>>>>> check > >>>>>>>>>>>> per record is not a huge overhead, I think doing that > >> enforcement > >>>>>>> is > >>>>>>>>>> better > >>>>>>>>>>>> than documentations and hand-wavy undefined behaviors. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Guozhang > >>>>>>>>>>>> > >>>>>>>>>>>> On Wed, Feb 9, 2022 at 11:27 AM John Roesler < > >>> vvcep...@apache.org > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP Jorge, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I'm in support of your proposal. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1) > >>>>>>>>>>>>> I do agree with Guozhang's point (1). I think the cleanest > >>>>>>>>>>>>> approach. I think it's cleaner and better to keep the > >>>>>>>>>>>>> enforcement internal to the framework than to introduce a > >>>>>>>>>>>>> public API or context wrapper for processors to use > >>>>>>>>>>>>> explicitly. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2) I tend to agree with you on this one; I think the > >>>>>>>>>>>>> equality check ought to be fast enough in practice. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3) I think this is implicit, but should be explicit in the > >>>>>>>>>>>>> KIP: For the `processValues` API, because the framework sets > >>>>>>>>>>>>> the key on the context before calling `process` and then > >>>>>>>>>>>>> unsets it afterwards, there will always be no key set during > >>>>>>>>>>>>> task puctuation. Therefore, while processors may still > >>>>>>>>>>>>> register punctuators, they will not be able to forward > >>>>>>>>>>>>> anything from them. > >>>>>>>>>>>>> > >>>>>>>>>>>>> This is functionally equivalent to the existing > >>>>>>>>>>>>> transformers, by the way, that are also forbidden to forward > >>>>>>>>>>>>> anything during punctuation. > >>>>>>>>>>>>> > >>>>>>>>>>>>> For what it's worth, I think this is the best tradeoff. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The only alternative I see is not to place any restriction > >>>>>>>>>>>>> on forwarded keys at all and just document that if users > >>>>>>>>>>>>> don't maintain proper partitioning, they'll get undefined > >>>>>>>>>>>>> behavior. That might be more powerful, but it's also a > >>>>>>>>>>>>> usability problem. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> -John > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, 2022-02-09 at 11:34 +0000, Jorge Esteban Quilcate > >>>>>>>>>>>>> Otoya wrote: > >>>>>>>>>>>>>> Thanks Guozhang. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Does `ValueProcessorContext` have to be a public API? It > >>>>>>> seems > >>>>>>>>>> to me > >>>>>>>>>>>>>> that this can be completely abstracted away from user > >>>>>>> interfaces > >>>>>>>>>> as an > >>>>>>>>>>>>>> internal class > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Totally agree. No intention to add these as public APIs. > Will > >>>>>>>>>> update > >>>>>>>>>>>> the > >>>>>>>>>>>>>> KIP to reflect this. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> in the past the rationale for enforcing it at the > >>>>>>>>>>>>>> interface layer rather than do runtime checks is that it is > >>>>>>> more > >>>>>>>>>>>>> efficient. > >>>>>>>>>>>>>>> I'm not sure how much overhead it may incur to check if the > >>>>>>>> key > >>>>>>>>>> did > >>>>>>>>>>>> not > >>>>>>>>>>>>>> change: if it is just a reference equality check maybe it's > >>>>>>>> okay. > >>>>>>>>>>>> What's > >>>>>>>>>>>>>> your take on this? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Agree, reference equality should cover this validation and > >> the > >>>>>>>>>> overhead > >>>>>>>>>>>>>> impact should not be meaningful. > >>>>>>>>>>>>>> Will update the KIP to reflect this as well. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, 8 Feb 2022 at 19:05, Guozhang Wang < > >>>>>>> wangg...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hello Jorge, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for bringing this KIP! I think this is a nice idea > to > >>>>>>>>>> consider > >>>>>>>>>>>>> using > >>>>>>>>>>>>>>> a single overloaded function name for #process, just a > >>>>>>> couple > >>>>>>>>>> quick > >>>>>>>>>>>>>>> questions after reading the proposal: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) Does `ValueProcessorContext` have to be a public API? It > >>>>>>>>>> seems to > >>>>>>>>>>>> me > >>>>>>>>>>>>>>> that this can be completely abstracted away from user > >>>>>>>> interfaces > >>>>>>>>>> as > >>>>>>>>>>>> an > >>>>>>>>>>>>>>> internal class, and we call the `setKey` before calling > >>>>>>>>>>>>> user-instantiated > >>>>>>>>>>>>>>> `process` function, and then in its overridden `forward` it > >>>>>>>> can > >>>>>>>>>> just > >>>>>>>>>>>>> check > >>>>>>>>>>>>>>> if the key changes or not. > >>>>>>>>>>>>>>> 2) Related to 1) above, in the past the rationale for > >>>>>>>> enforcing > >>>>>>>>>> it at > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> interface layer rather than do runtime checks is that it is > >>>>>>>> more > >>>>>>>>>>>>> efficient. > >>>>>>>>>>>>>>> I'm not sure how much overhead it may incur to check if the > >>>>>>>> key > >>>>>>>>>> did > >>>>>>>>>>>> not > >>>>>>>>>>>>>>> change: if it is just a reference equality check maybe it's > >>>>>>>> okay. > >>>>>>>>>>>>> What's > >>>>>>>>>>>>>>> your take on this? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Feb 8, 2022 at 5:17 AM Jorge Esteban Quilcate Otoya > >>>>>>> < > >>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Dev team, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'd like to start a new discussion thread on Kafka Streams > >>>>>>>>>> KIP-820: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> This KIP is aimed to extend the current `KStream#process` > >>>>>>>> API > >>>>>>>>>> to > >>>>>>>>>>>>> return > >>>>>>>>>>>>>>>> output values that could be chained across the topology, > >>>>>>> as > >>>>>>>>>> well as > >>>>>>>>>>>>>>>> introducing a new `KStream#processValues` to use processor > >>>>>>>>>> while > >>>>>>>>>>>>>>> validating > >>>>>>>>>>>>>>>> keys haven't change and repartition is not required. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Jorge. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>> > >> > > > > >