The question about `processValues()` is really a hard one. Guozhang's point is very good one. Maybe we need to be pragmatic and accept the runtime check (even if I deeply hate this solution compare to a compile time check).
Other possibilities to address this issue might just become too ugly? It seems it would require to add a new `ValueProcessorContext` that offers a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record` with immutable key? Not sure if we would be willing to go down this route? Personally, I would be ok with it, as a strongly prefer compile time checks and I am happy to extend the API surface area to achieve it -- however, I won't be surprised if others don't like this idea...
-Matthias On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:
Thanks, Guozhang.Compared with reference checks and runtime exceptions for those who mistakenly change the key, I think that enforcing everyone to `setValue` may incur more costs..This is a fair point. I agree that this may incur in more costs than key checking. Will hold for more feedback, but if we agree I will update the KIP during the week. Cheers, Jorge. On Sun, 27 Feb 2022 at 00:50, Guozhang Wang <wangg...@gmail.com> wrote:Hello folks, Regarding the outstanding question, I'm actually a bit leaning towards the second option since that `withKey()` itself always creates a new Record object. This has a few implications: * That we would have to discard the previous Record object to be GC'ed with the new object --- note in practice, processing value does not mean you'd have to replace the whole value with `withValue`, but maybe you just need to manipulate some fields of the value object if it is a JSon / etc. * It may become an obstacle for further runtime optimizations e.g. skip serdes and interpret processing as direct byte manipulations. Compared with reference checks and runtime exceptions for those who mistakenly change the key, I think that enforcing everyone to `setValue` may incur more costs.. Guozhang On Fri, Feb 25, 2022 at 12:54 PM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote:Hi all, Appreciate very much all the great feedback received so far.After applying that interface change, I don't see any syntaxerrors in our tests (which use those methods), and the StreamBuilderTest still passes for me. This is awesome John, thank you for your efforts here.Jorge, do you mind clarifying these points in the Compatibility sectionof your KIP? +1. I have clarified the impact of changing the return type in the KIP.I think the other outstanding question for you is whether the output key type for processValues should be K or Void. One thing I realized belatedly was that if we do set it to Void, then users will actually have to override the key when forwarding, like `record.withKey(null)`, whereas if we keep it is K, all users have to do is not touch the key at all.This is a tricky one. On one hand, with Void type for key output, we force the users to cast to Void and change the key to null, though this can be documented on the API, so the users are aware of the peculiarity of forwarding within `processValues`. On the other hand, keeping the key type as output doesn't _require_ to do any change of keys, but this could lead to key-checking runtime exceptions. I slightly inclined myself for the first option and change the type to `Void`. This will impose a bit of pain on the users to gain some type-safety and avoid runtime exceptions. We can justify this requirement as a way to prove that the key hasn't changed. Btw, thanks for this idea Matthias! On Fri, 25 Feb 2022 at 17:10, John Roesler <vvcep...@apache.org> wrote:Oh, one more thing Jorge, I think the other outstanding question for you is whether the output key type for processValues should be K or Void. I get the impression that all of us don't feel too strongly about it, so I think the ball is in your court to consider everyone's points and make a call (with justification). One thing I realized belatedly was that if we do set it to Void, then users will actually have to override the key when forwarding, like `record.withKey(null)`, whereas if we keep it as K, all users have to do is not touch the key at all. Thanks, -John On Fri, 2022-02-25 at 11:07 -0600, John Roesler wrote:Hello all, I'll chime in again in the interest of trying to do a better job of keeping KIPs moving forward... Matthias raised some very good questions about whether the change is really source compatible. I just checked out the code and make the interface change that Jorge specified in the KIP:Modified methods: KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V,KOut, VOut> processorSupplier, String... stateStoreNames)KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V,KOut, VOut> processorSupplier, Named named, String... stateStoreNames) After applying that interface change, I don't see any syntax errors in our tests (which use those methods), and the StreamBuilderTest still passes for me. The reason is that the existing API already takes a ProcessorSupplier<K, V, Void, Void> and is currently a `void` return. After this interface change, all existing usages will just bind Void to KOut and Void to VOut. In other words, KOut, which is short for `KOut extends Object` is an upper bound on Void, so all existing processor suppliers are still valid arguments. Because the current methods are void returns, no existing code could be assigning the result to any variable, so moving from a void return to a typed return is always compatible. Jorge, do you mind clarifying these points in the Compatibility section of your KIP? Thanks, -John On Wed, 2022-02-23 at 15:07 -0800, Matthias J. Sax wrote:For this KIP, I also see the value. I was just trying to make astepback and ask if it's a good short term solution. If we believe itis,Iam fine with it. (I am more worried about the header's KIP...) Btw: I am still wondering if we can change existing `process()` as proposed in the KIP? It the propose change source compatible? (It'sforsure not binary compatible, but this seems fine -- I don't think we guarantee binary compatibility). Btw: would be good to clarify what is changes for process() --shouldbereturn type change from `void` to `KStream<KOut, VOut>` as well as change of `ProcessorSupplier` generic types (output types changefrom`Void` to `KOut` and `VOut`? -Matthias On 2/23/22 11:32 AM, Guozhang Wang wrote:Hi folks, I agree with John that this KIP by itself could be a goodimprovement, andI feel it aligns well with the eventual DSL 2.0 proposal so we donot needto hold it until later. Regarding the last point (i.e. whether we should do enforcementwitha newinterface), here's my 2c: in the past we introduced public `ValueTransfomer/etc` for two purposes, 1) to enforce the key isnotmodifiable, 2) to indicate inside the library's topology builderitselfthat since the key is not modified, the direct downstream doesnotneed toinject a repartition stage. I think we are more or less on thesamepagethat for purpose 1), doing runtime check could be sufficient; asforthepurpose of 2), as for this KIP itself I think it is similar towhatwe have(i.e. just base on the function name "processValue" itself) andhence arenot sacrificed either. I do not know if `KStream#processValue(ProcessorSupplier<K, V, Void, VOut> processorSupplier)` will work, or work better, maybe Jorge coulddosomedigging and get back to us. On Fri, Feb 18, 2022 at 8:24 AM John Roesler <vvcep...@apache.org>wrote:Hello all, While I sympathize with Matthias’s desire to wipe the slatecleanandredesign the dsl with full knowledge of everything we’velearnedin thepast few years, that would also be a pretty intense project onitsown. Itseems better to leave that project for someone who is motivatedtotake iton. Reading between the lines, it seems like Jorge’s motivation ismore alongthe lines of removing a few specific pain points. I appreciateMatthiasextending the offer, but if Jorge doesn’t want to redesign thedslrightnow, we’re better off just accepting the work he’s willing todo.Specifically, this KIP is quite a nice improvement. Looking attheKStreaminterface, roughly half of it is devoted to various flavors of“transform”,which makes it really hard on users to figure out which theyaresupposedto use for what purpose. This kip let us drop all thatcomplexityin favorof just two methods, thanks to the fact that we now have theability forprocessors to specify their forwarding type. By the way, I really like Matthias’s suggestion to set the KOutgenericbound to Void for processValues. Then, instead of doing anequality checkon the key during forward, you’d just set the key back to theonesavedbefore processing (with setRecordKey). This is both moreefficient(becausewe don’t have the equality check) and more foolproof for users(becauseit’s enforced by the compiler instead of the runtime). Thanks, all! -John On Fri, Feb 18, 2022, at 00:43, Jorge Esteban Quilcate Otoyawrote:On Fri, 18 Feb 2022 at 02:16, Matthias J. Sax <mj...@apache.org>wrote:It probably deserves its own thread to start discussingideas.Yes. My question was: if we think it's time to do a DSL2.0,should wedrop this KIP and just fix via DSL 2.0 instead?Good question. Would love to hear what others think aboutthis.I've stated my position about this here:For this KIP specifically, I think about it as acontinuationfromKIP-478. Therefore, it could make sense to have it as part ofthe currentversion of the DSL. I'd even add that if this KIP is adopted, I would not be thatdisappointedif KIP-634 is dropped in favor of a DSL v2.0 as the access toheadersprovided by KIP-478's via Record API is much better thanprevious`.context().headers()`. But happy to reconsider if there is an agreement to focuseffortstowards aDSL 2.0.You're right. I'm not proposing the method signature.What signature do you propose? I don't see an update on theKIP.My bad. I have clarified this in the KIP's publicinterfacesnow:``` New methods: - KStream<K,VOut>KStream#processValues(ProcessorSupplier<K,V, K,VOut>processorSupplier, String... stateStoreNames) - KStream<K,VOut>KStream#processValues(ProcessorSupplier<K,V, K,VOut>processorSupplier, Named named, String...stateStoreNames)Modified methods: - KStream<KOut,VOut> KStream#process(ProcessorSupplier<K,V,KOut,VOut>processorSupplier, String... stateStoreNames) - KStream<KOut,VOut> KStream#process(ProcessorSupplier<K,V,KOut,VOut>processorSupplier, Named named, String...stateStoreNames)```Not sure if I understand how this would look like. Do youmeancheckingiton the Record itself or somewhere else?@Guozhang: I am not worried about the runtime overhead. Iamworriesabout user experience. It's not clear from the methodsignature, thatyou are not allowed to change the key, what seems to be badAPI desig.Even if I understand the desire to keep the API surfacearessmall -- Iwould rather have a compile time enforcement than a runtimecheck.For example, we have `map()` and `mapValues()` and`mapValues()` returnsa `Value V` (enforces that that key is not changes) insteadofa`KeyValue<KIn,VOut>` and we use a runtime check to checkthatthe key isnot changed. Naively, could we enforce something similar by setting theoutput keytype as `Void`. KStream#processValue(ProcessorSupplier<K, V, Void,VOut>processorSupplier) Not sure if this would work or not? Or it might be worth to add a new interface,`ValueProcessorSupplier`that ensures that the key is not modified?This is an important discussion, even more so with a DSLv2.0.At the moment, the DSL just flags whether partitioning isrequired basedonthe DSL operation. As mentioned, `mapValues()` enforces onlythevaluehaschanged through the DSL, though the only _guarantee_ we haveisthatKafkaStreams "owns" the implementation, and we can flag thisproperly.With a hypothetical v2.0 based on Record API, this will beharder toenforce with the current APIs. e.g. with `mapValues(Record<K,V>record)`,nothing would stop users from using`record.withKey("needs_partitioning")`.The approach defined on this KIP is similar to what we haveatthe momenton `ValueTransformer*` where it validates at runtime that theusers arenotcalling `forward` with `ForwardingDisabledProcessorContext`. `ValueProcessorSupplier` is not meant to be a public API.Onlyto be usedinternally on `processValues` implementation. At first, `KStream#processValue(ProcessorSupplier<K, V, Void,VOut>processorSupplier)` won't work as it will require the`Processor`implementation to actually change the key. Will take a deeperlook tovalidate if this could solve this issue.-Matthias On 2/17/22 10:56 AM, Guozhang Wang wrote:Regarding the last question Matthias had, I wonder ifit'ssimilar tomyfirst email's point 2) above? I think the rationale isthat,sincereference checks are relatively very cheap, it isworthwhileto paythisextra runtime checks and in return to have a singleconsolidatedProcessorSupplier programming interface (i.e. we wouldeventuallydeprecate ValueTransformerWithKeySupplier). On Wed, Feb 16, 2022 at 10:57 AM Jorge Esteban QuilcateOtoya <quilcate.jo...@gmail.com> wrote:Thank you Matthias, this is great feedback. Adding my comments below. On Wed, 16 Feb 2022 at 00:42, Matthias J. Sax <mj...@apache.org>wrote:Thanks for the KIP. In alignment to my reply to KIP-634, I am wonderingifwe areheadinginto the right direction, or if we should consider tore-design theDSLfrom scratch?I'm very excited about the idea of a DLS v2.0. Itprobablydeservesitsownthread to start discussing ideas. For this KIP specifically, I think about it as acontinuation fromKIP-478.Therefore, it could make sense to have it as part ofthecurrentversion ofthe DSL.Even if we don't do a DSL 2.0 right now, I have someconcerns aboutthisKIP: (1) I am not sure if the propose changed is backwardcompatible? Wecurrently have: void KStream#process(ProcessorSupplier,String...)The newly proposed method: KStream KStream#process(ProcessorSupplier) seems to be an incompatible change? The KIP states:Modified method KStream#process should becompatiblewith previousversion, that at the moment is fixed to a Void returntype.Why is it backward compatible? Having both old andnew#process()seemsnot to be compatible to me? Or are you proposing to_change_ themethodsignature (if yes, the `String...` parameter to add astate storeseemsto be missing)? For this case, it seems that existingprogramswould atleast need to be recompiled -- it would only be asourcecompatiblechange, but not a binary compatible change?You're right. I'm not proposing the method signature. Totally agree about compatibility issue. I was onlyconsideringsourcecompatibility and was ignorant that changing from voidtoa specifictypewould break binary compatibility. I will update the KIP to reflect this:Modifications to method KStream#process are sourcecompatible withprevious version, though not binary compatible.Thereforewillrequireusers to recompile their applications with the latestversion.I am also wondering if/how this change related toKIP-401:https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756From a high level it might not conflict, but Iwantedto doublecheck?Wasn't aware of this KIP, thanks for sharing! I don'tthink there isconflict between KIPs, as far as I understand.For `KStream#processValues()`, my main concern is theadded runtimecheck if the key was modified or not -- it seems toprovide bad userexperience -- enforcing that the key is not modifiedonan APIlevel,would seem to be much better. Last, what is the purpose of `setRecordKey()` and`clearRecordKey()`? Iam not sure if I understand their purpose?Both methods set/clear the context (current key) to beused whencheckingkeys on forward(record) implementation.enforcing that the key is not modified on an APIlevel,would seemtobemuch better. Not sure if I understand how this would look like. Doyoumeancheckingiton the Record itself or somewhere else?-Matthias On 2/15/22 11:53 AM, John Roesler wrote:My apologies, this feedback was intended forKIP-634.-John On Tue, Feb 15, 2022, at 13:15, John Roesler wrote:Thanks for the update, Jorge, I've just looked over the KIP again. Just onemoresmallconcern: 5) We can't just change the type ofRecord#headers()to anew fully qualified type. That would be a source- incompatible breaking change for users. Out options are: * Deprecate the existing method and create a newonewiththe new type * If the existing Headers is "not great but ok",then maybewe leave it alone. Thanks, -John On Mon, 2022-02-14 at 13:58 -0600, Paul Whalenwrote:No specific comments, but I just wanted tomentionI like thedirection ofthe KIP. My team is a big user of "transform"methods because oftheability to chain them, and I have always foundtheterminologychallengingto explain alongside "process". It felt likeoneconcept withtwonames.So moving towards a single API that is powerfulenough to handlebothusecases seems absolutely correct to me. Paul On Mon, Feb 14, 2022 at 1:12 PM Jorge EstebanQuilcate Otoya <quilcate.jo...@gmail.com> wrote:Got it. Thanks John, this make sense. I've updated the KIP to include thedeprecationof:- KStream#transform - KStream#transformValues - KStream#flatTransform - KStream#flatTransformValues On Fri, 11 Feb 2022 at 15:16, John Roesler <vvcep...@apache.orgwrote:Thanks, Jorge! I think it’ll be better to keep this KIPfocused on KStreammethodsonly.I suspect that the KTable methods may bemorecomplicated thanjustthatproposed replacement, but it’ll also beeasierto consider thatquestioninisolation. The nice thing about just deprecating theKStream methods andnottheTransform* interfaces is that you can keepyour proposal justscopedtoKStream and not have any consequences fortherest of the DSL.Thanks again, John On Fri, Feb 11, 2022, at 06:43, JorgeEstebanQuilcate Otoyawrote:Thanks, John.4) I agree that we shouldn't deprecatetheTransformer*classes, but do you think we shoulddeprecate theKStream#transform* methods? I'm curiousifthere's anyremaining reason to have those methods,orif your KIPcompletely obviates them. Good catch. I considered that deprecating`Transformer*`and `transform*`wouldgohandin hand — maybe it happened similarlywithold `Processor` and`process`?Though deprecating only `transform*`operations could be abettersignalfor users than non deprecating anythingatall and pave thewaytoit'sdeprecation. Should this deprecation also considerincluding`KTable#transformValues`?The approach proposed on the KIP:`ktable.toStream().processValues().toTable()` seems fair tome,thoughIwill have to test it further. I'm happy to update the KIP if there'ssomeconsensus aroundthis.Will add the deprecation notes these daysand wait for anyadditionalfeedback on this topic before wrapping upthe KIP.On Fri, 11 Feb 2022 at 04:03, JohnRoesler<vvcep...@apache.org>wrote:Thanks for the update, Jorge! I just read over the KIP again, and I'minsupport. One morequestion came up for me, though: 4) I agree that we shouldn't deprecatetheTransformer*classes, but do you think we shoulddeprecate theKStream#transform* methods? I'm curiousifthere's anyremaining reason to have those methods,orif your KIPcompletely obviates them. Thanks, -John On Thu, 2022-02-10 at 21:32 +0000,JorgeEsteban QuilcateOtoya wrote:Thank you both for your feedback! I have added the following note onpunctuation:``` NOTE: The key validation can bedefinedwhen processing themessage.Though, with punctuations it won't bepossible to define thekeyforvalidation before forwarding,thereforeit won't bepossible toforwardfrom punctuation. This is similar behavior to how`ValueTransformer`s behaveatthemoment.``` Also make it explicit also that wearegoing to applyreferencialequalityfor key validation. I hope this is covering all yourfeedback, let me know ifI'mmissinganything. Cheers, Jorge. On Wed, 9 Feb 2022 at 22:19, GuozhangWang <wangg...@gmail.comwrote:I'm +1 on John's point 3) forpunctuations.And I think if people are on thesamepage that a referenceequalitycheckper record is not a huge overhead,Ithink doing thatenforcementisbetterthan documentations and hand-wavyundefined behaviors.Guozhang On Wed, Feb 9, 2022 at 11:27 AMJohnRoesler <vvcep...@apache.orgwrote:Thanks for the KIP Jorge, I'm in support of your proposal. 1) I do agree with Guozhang's point(1). I think the cleanestapproach. I think it's cleanerandbetter to keep theenforcement internal to theframework than to introduce apublic API or context wrapper forprocessors to useexplicitly. 2) I tend to agree with you onthisone; I think theequality check ought to be fastenough in practice.3) I think this is implicit, butshould be explicit in theKIP: For the `processValues` API,because the frameworksetsthe key on the context beforecalling `process` and thenunsets it afterwards, there willalways be no key setduringtask puctuation. Therefore, whileprocessors may stillregister punctuators, they willnotbe able to forwardanything from them. This is functionally equivalenttothe existingtransformers, by the way, thatarealso forbidden toforwardanything during punctuation. For what it's worth, I think thisisthe best tradeoff.The only alternative I see is nottoplace any restrictionon forwarded keys at all and justdocument that if usersdon't maintain properpartitioning,they'll get undefinedbehavior. That might be morepowerful, but it's also ausability problem. Thanks, -John On Wed, 2022-02-09 at 11:34+0000,Jorge Esteban QuilcateOtoya wrote:Thanks Guozhang.Does `ValueProcessorContext`have to be a public API? Itseemsto methat this can be completelyabstracted away from userinterfacesas aninternal class Totally agree. No intention toaddthese as public APIs.WillupdatetheKIP to reflect this.in the past the rationale forenforcing it at theinterface layer rather than doruntime checks is that itismoreefficient.I'm not sure how muchoverheadit may incur to check ifthekeydidnotchange: if it is just areferenceequality check maybeit'sokay.What'syour take on this? Agree, reference equalityshouldcover this validationandtheoverheadimpact should not bemeaningful.Will update the KIP to reflectthis as well.On Tue, 8 Feb 2022 at 19:05,Guozhang Wang <wangg...@gmail.com>wrote:Hello Jorge, Thanks for bringing thisKIP! Ithink this is a niceideatoconsiderusinga single overloaded functionname for #process, just acouplequickquestions after reading theproposal:1) Does`ValueProcessorContext`have to be a publicAPI? Itseems tomethat this can be completelyabstracted away from userinterfacesasaninternal class, and we callthe`setKey` before callinguser-instantiated`process` function, and theninits overridden`forward` itcanjustcheckif the key changes or not. 2) Related to 1) above, inthepast the rationale forenforcingit attheinterface layer rather thandoruntime checks is thatit ismoreefficient.I'm not sure how muchoverheadit may incur to check ifthekeydidnotchange: if it is just areference equality check maybeit'sokay.What'syour take on this? Guozhang On Tue, Feb 8, 2022 at 5:17AMJorge Esteban QuilcateOtoya<quilcate.jo...@gmail.com>wrote:Hi Dev team, I'd like to start a newdiscussion thread on KafkaStreamsKIP-820:https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+APIThis KIP is aimed to extendthe current`KStream#process`APItoreturnoutput values that could bechained across thetopology,aswell asintroducing a new`KStream#processValues` to useprocessorwhilevalidatingkeys haven't change andrepartition is not required.Looking forward to yourfeedback.Regards, Jorge.-- -- Guozhang-- -- Guozhang-- -- Guozhang