On Fri, 18 Feb 2022 at 02:16, Matthias J. Sax <mj...@apache.org> wrote:

> > It probably deserves its own thread to start discussing ideas.
>
> Yes. My question was: if we think it's time to do a DSL 2.0, should we
> drop this KIP and just fix via DSL 2.0 instead?
>
>
Good question. Would love to hear what others think about this.

I've stated my position about this here:

> For this KIP specifically, I think about it as a continuation from
KIP-478. Therefore, it could make sense to have it as part of the current
version of the DSL.

I'd even add that if this KIP is adopted, I would not be that disappointed
if KIP-634 is dropped in favor of a DSL v2.0 as the access to headers
provided by KIP-478's via Record API is much better than previous
`.context().headers()`.

But happy to reconsider if there is an agreement to focus efforts towards a
DSL 2.0.


> > You're right. I'm not proposing the method signature.
>
> What signature do you propose? I don't see an update on the KIP.
>
> My bad. I have clarified this in the KIP's public interfaces now:

```

New methods:

   - KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut>
   processorSupplier, String... stateStoreNames)
   - KStream<K,VOut> KStream#processValues(ProcessorSupplier<K, V, K, VOut>
   processorSupplier, Named named, String... stateStoreNames)

Modified methods:

   - KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut>
   processorSupplier, String... stateStoreNames)
   - KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut>
   processorSupplier, Named named, String... stateStoreNames)

```


>
> > Not sure if I understand how this would look like. Do you mean checking
> it
> > on the Record itself or somewhere else?
>
> @Guozhang: I am not worried about the runtime overhead. I am worries
> about user experience. It's not clear from the method signature, that
> you are not allowed to change the key, what seems to be bad API desig.
> Even if I understand the desire to keep the API surface ares small -- I
> would rather have a compile time enforcement than a runtime check.
>
> For example, we have `map()` and `mapValues()` and `mapValues()` returns
> a `Value V` (enforces that that key is not changes) instead of a
> `KeyValue<KIn,VOut>` and we use a runtime check to check that the key is
> not changed.
>
> Naively, could we enforce something similar by setting the output key
> type as `Void`.
>
>    KStream#processValue(ProcessorSupplier<K, V, Void, VOut>
> processorSupplier)
>
> Not sure if this would work or not?
>
> Or it might be worth to add a new interface, `ValueProcessorSupplier`
> that ensures that the key is not modified?
>
>
This is an important discussion, even more so with a DSL v2.0.

At the moment, the DSL just flags whether partitioning is required based on
the DSL operation. As mentioned, `mapValues()` enforces only the value has
changed through the DSL, though the only _guarantee_ we have is that Kafka
Streams "owns" the implementation, and we can flag this properly.

With a hypothetical v2.0 based on Record API, this will be harder to
enforce with the current APIs. e.g. with `mapValues(Record<K, V> record)`,
nothing would stop users from using `record.withKey("needs_partitioning")`.

The approach defined on this KIP is similar to what we have at the moment
on `ValueTransformer*` where it validates at runtime that the users are not
calling `forward` with `ForwardingDisabledProcessorContext`.
`ValueProcessorSupplier` is not meant to be a public API. Only to be used
internally on `processValues` implementation.

At first, `KStream#processValue(ProcessorSupplier<K, V, Void, VOut>
processorSupplier)` won't work as it will require the `Processor`
implementation to actually change the key. Will take a deeper look to
validate if this could solve this issue.


>
>
> -Matthias
>
>
> On 2/17/22 10:56 AM, Guozhang Wang wrote:
> > Regarding the last question Matthias had, I wonder if it's similar to my
> > first email's point 2) above? I think the rationale is that, since
> > reference checks are relatively very cheap, it is worthwhile to pay this
> > extra runtime checks and in return to have a single consolidated
> > ProcessorSupplier programming interface (i.e. we would eventually
> > deprecate ValueTransformerWithKeySupplier).
> >
> > On Wed, Feb 16, 2022 at 10:57 AM Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> >> Thank you Matthias, this is great feedback.
> >>
> >> Adding my comments below.
> >>
> >> On Wed, 16 Feb 2022 at 00:42, Matthias J. Sax <mj...@apache.org> wrote:
> >>
> >>> Thanks for the KIP.
> >>>
> >>> In alignment to my reply to KIP-634, I am wondering if we are heading
> >>> into the right direction, or if we should consider to re-design the DSL
> >>> from scratch?
> >>>
> >>>
> >> I'm very excited about the idea of a DLS v2.0. It probably deserves its
> own
> >> thread to start discussing ideas.
> >>
> >> For this KIP specifically, I think about it as a continuation from
> KIP-478.
> >> Therefore, it could make sense to have it as part of the current
> version of
> >> the DSL.
> >>
> >>
> >>>
> >>> Even if we don't do a DSL 2.0 right now, I have some concerns about
> this
> >>> KIP:
> >>>
> >>> (1) I am not sure if the propose changed is backward compatible? We
> >>> currently have:
> >>>
> >>>     void KStream#process(ProcessorSupplier, String...)
> >>>
> >>> The newly proposed method:
> >>>
> >>>     KStream KStream#process(ProcessorSupplier)
> >>>
> >>> seems to be an incompatible change?
> >>>
> >>> The KIP states:
> >>>
> >>>> Modified method KStream#process should be compatible with previous
> >>> version, that at the moment is fixed to a Void return type.
> >>>
> >>> Why is it backward compatible? Having both old and new #process() seems
> >>> not to be compatible to me? Or are you proposing to _change_ the method
> >>> signature (if yes, the `String...` parameter to add a state store seems
> >>> to be missing)? For this case, it seems that existing programs would at
> >>> least need to be recompiled -- it would only be a source compatible
> >>> change, but not a binary compatible change?
> >>>
> >>>
> >> You're right. I'm not proposing the method signature.
> >> Totally agree about compatibility issue. I was only considering source
> >> compatibility and was ignorant that changing from void to a specific
> type
> >> would break binary compatibility.
> >> I will update the KIP to reflect this:
> >>
> >>> Modifications to method KStream#process are source compatible with
> >> previous version, though not binary compatible. Therefore will require
> >> users to recompile their applications with the latest version.
> >>
> >>
> >>> I am also wondering if/how this change related to KIP-401:
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> >>>
> >>>   From a high level it might not conflict, but I wanted to double
> check?
> >>>
> >>>
> >> Wasn't aware of this KIP, thanks for sharing! I don't think there is
> >> conflict between KIPs, as far as I understand.
> >>
> >>
> >>>
> >>> For `KStream#processValues()`, my main concern is the added runtime
> >>> check if the key was modified or not -- it seems to provide bad user
> >>> experience -- enforcing that the key is not modified on an API level,
> >>> would seem to be much better.
> >>>
> >>> Last, what is the purpose of `setRecordKey()` and `clearRecordKey()`? I
> >>> am not sure if I understand their purpose?
> >>>
> >>>
> >> Both methods set/clear the context (current key) to be used when
> checking
> >> keys on forward(record) implementation.
> >>
> >>> enforcing that the key is not modified on an API level, would seem to
> be
> >> much better.
> >>
> >> Not sure if I understand how this would look like. Do you mean checking
> it
> >> on the Record itself or somewhere else?
> >>
> >>
> >>
> >>> -Matthias
> >>>
> >>>
> >>> On 2/15/22 11:53 AM, John Roesler wrote:
> >>>> My apologies, this feedback was intended for KIP-634.
> >>>> -John
> >>>>
> >>>> On Tue, Feb 15, 2022, at 13:15, John Roesler wrote:
> >>>>> Thanks for the update, Jorge,
> >>>>>
> >>>>> I've just looked over the KIP again. Just one more small
> >>>>> concern:
> >>>>>
> >>>>> 5) We can't just change the type of Record#headers() to a
> >>>>> new fully qualified type. That would be a source-
> >>>>> incompatible breaking change for users.
> >>>>>
> >>>>> Out options are:
> >>>>> * Deprecate the existing method and create a new one with
> >>>>> the new type
> >>>>> * If the existing Headers is "not great but ok", then maybe
> >>>>> we leave it alone.
> >>>>>
> >>>>> Thanks,
> >>>>> -John
> >>>>>
> >>>>> On Mon, 2022-02-14 at 13:58 -0600, Paul Whalen wrote:
> >>>>>> No specific comments, but I just wanted to mention I like the
> >>> direction of
> >>>>>> the KIP.  My team is a big user of "transform" methods because of
> the
> >>>>>> ability to chain them, and I have always found the terminology
> >>> challenging
> >>>>>> to explain alongside "process".  It felt like one concept with two
> >>> names.
> >>>>>> So moving towards a single API that is powerful enough to handle
> both
> >>> use
> >>>>>> cases seems absolutely correct to me.
> >>>>>>
> >>>>>> Paul
> >>>>>>
> >>>>>> On Mon, Feb 14, 2022 at 1:12 PM Jorge Esteban Quilcate Otoya <
> >>>>>> quilcate.jo...@gmail.com> wrote:
> >>>>>>
> >>>>>>> Got it. Thanks John, this make sense.
> >>>>>>>
> >>>>>>> I've updated the KIP to include the deprecation of:
> >>>>>>>
> >>>>>>>      - KStream#transform
> >>>>>>>      - KStream#transformValues
> >>>>>>>      - KStream#flatTransform
> >>>>>>>      - KStream#flatTransformValues
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, 11 Feb 2022 at 15:16, John Roesler <vvcep...@apache.org>
> >>> wrote:
> >>>>>>>
> >>>>>>>> Thanks, Jorge!
> >>>>>>>>
> >>>>>>>> I think it’ll be better to keep this KIP focused on KStream
> methods
> >>> only.
> >>>>>>>> I suspect that the KTable methods may be more complicated than
> just
> >>> that
> >>>>>>>> proposed replacement, but it’ll also be easier to consider that
> >>> question
> >>>>>>> in
> >>>>>>>> isolation.
> >>>>>>>>
> >>>>>>>> The nice thing about just deprecating the KStream methods and not
> >> the
> >>>>>>>> Transform* interfaces is that you can keep your proposal just
> >> scoped
> >>> to
> >>>>>>>> KStream and not have any consequences for the rest of the DSL.
> >>>>>>>>
> >>>>>>>> Thanks again,
> >>>>>>>> John
> >>>>>>>>
> >>>>>>>> On Fri, Feb 11, 2022, at 06:43, Jorge Esteban Quilcate Otoya
> wrote:
> >>>>>>>>> Thanks, John.
> >>>>>>>>>
> >>>>>>>>>> 4) I agree that we shouldn't deprecate the Transformer*
> >>>>>>>>> classes, but do you think we should deprecate the
> >>>>>>>>> KStream#transform* methods? I'm curious if there's any
> >>>>>>>>> remaining reason to have those methods, or if your KIP
> >>>>>>>>> completely obviates them.
> >>>>>>>>>
> >>>>>>>>> Good catch.
> >>>>>>>>> I considered that deprecating `Transformer*` and `transform*`
> >> would
> >>> go
> >>>>>>>> hand
> >>>>>>>>> in hand — maybe it happened similarly with old `Processor` and
> >>>>>>> `process`?
> >>>>>>>>> Though deprecating only `transform*` operations could be a better
> >>>>>>> signal
> >>>>>>>>> for users than non deprecating anything at all and pave the way
> to
> >>> it's
> >>>>>>>>> deprecation.
> >>>>>>>>>
> >>>>>>>>> Should this deprecation also consider including
> >>>>>>> `KTable#transformValues`?
> >>>>>>>>> The approach proposed on the KIP:
> >>>>>>>>> `ktable.toStream().processValues().toTable()` seems fair to me,
> >>> though
> >>>>>>> I
> >>>>>>>>> will have to test it further.
> >>>>>>>>>
> >>>>>>>>> I'm happy to update the KIP if there's some consensus around
> this.
> >>>>>>>>> Will add the deprecation notes these days and wait for any
> >>> additional
> >>>>>>>>> feedback on this topic before wrapping up the KIP.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Fri, 11 Feb 2022 at 04:03, John Roesler <vvcep...@apache.org>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for the update, Jorge!
> >>>>>>>>>>
> >>>>>>>>>> I just read over the KIP again, and I'm in support. One more
> >>>>>>>>>> question came up for me, though:
> >>>>>>>>>>
> >>>>>>>>>> 4) I agree that we shouldn't deprecate the Transformer*
> >>>>>>>>>> classes, but do you think we should deprecate the
> >>>>>>>>>> KStream#transform* methods? I'm curious if there's any
> >>>>>>>>>> remaining reason to have those methods, or if your KIP
> >>>>>>>>>> completely obviates them.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> -John
> >>>>>>>>>>
> >>>>>>>>>> On Thu, 2022-02-10 at 21:32 +0000, Jorge Esteban Quilcate
> >>>>>>>>>> Otoya wrote:
> >>>>>>>>>>> Thank you both for your feedback!
> >>>>>>>>>>>
> >>>>>>>>>>> I have added the following note on punctuation:
> >>>>>>>>>>>
> >>>>>>>>>>> ```
> >>>>>>>>>>> NOTE: The key validation can be defined when processing the
> >>> message.
> >>>>>>>>>>> Though, with punctuations it won't be possible to define the
> key
> >>> for
> >>>>>>>>>>> validation before forwarding, therefore it won't be possible to
> >>>>>>>> forward
> >>>>>>>>>>> from punctuation.
> >>>>>>>>>>> This is similar behavior to how `ValueTransformer`s behave at
> >> the
> >>>>>>>> moment.
> >>>>>>>>>>> ```
> >>>>>>>>>>>
> >>>>>>>>>>> Also make it explicit also that we are going to apply
> >> referencial
> >>>>>>>>>> equality
> >>>>>>>>>>> for key validation.
> >>>>>>>>>>>
> >>>>>>>>>>> I hope this is covering all your feedback, let me know if I'm
> >>>>>>> missing
> >>>>>>>>>>> anything.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Jorge.
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, 9 Feb 2022 at 22:19, Guozhang Wang <wangg...@gmail.com
> >
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I'm +1 on John's point 3) for punctuations.
> >>>>>>>>>>>>
> >>>>>>>>>>>> And I think if people are on the same page that a reference
> >>>>>>> equality
> >>>>>>>>>> check
> >>>>>>>>>>>> per record is not a huge overhead, I think doing that
> >> enforcement
> >>>>>>> is
> >>>>>>>>>> better
> >>>>>>>>>>>> than documentations and hand-wavy undefined behaviors.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Feb 9, 2022 at 11:27 AM John Roesler <
> >>> vvcep...@apache.org
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP Jorge,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I'm in support of your proposal.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1)
> >>>>>>>>>>>>> I do agree with Guozhang's point (1). I think the cleanest
> >>>>>>>>>>>>> approach. I think it's cleaner and better to keep the
> >>>>>>>>>>>>> enforcement internal to the framework than to introduce a
> >>>>>>>>>>>>> public API or context wrapper for processors to use
> >>>>>>>>>>>>> explicitly.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) I tend to agree with you on this one; I think the
> >>>>>>>>>>>>> equality check ought to be fast enough in practice.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3) I think this is implicit, but should be explicit in the
> >>>>>>>>>>>>> KIP: For the `processValues` API, because the framework sets
> >>>>>>>>>>>>> the key on the context before calling `process` and then
> >>>>>>>>>>>>> unsets it afterwards, there will always be no key set during
> >>>>>>>>>>>>> task puctuation. Therefore, while processors may still
> >>>>>>>>>>>>> register punctuators, they will not be able to forward
> >>>>>>>>>>>>> anything from them.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This is functionally equivalent to the existing
> >>>>>>>>>>>>> transformers, by the way, that are also forbidden to forward
> >>>>>>>>>>>>> anything during punctuation.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For what it's worth, I think this is the best tradeoff.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The only alternative I see is not to place any restriction
> >>>>>>>>>>>>> on forwarded keys at all and just document that if users
> >>>>>>>>>>>>> don't maintain proper partitioning, they'll get undefined
> >>>>>>>>>>>>> behavior. That might be more powerful, but it's also a
> >>>>>>>>>>>>> usability problem.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> -John
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, 2022-02-09 at 11:34 +0000, Jorge Esteban Quilcate
> >>>>>>>>>>>>> Otoya wrote:
> >>>>>>>>>>>>>> Thanks Guozhang.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Does `ValueProcessorContext` have to be a public API? It
> >>>>>>> seems
> >>>>>>>>>> to me
> >>>>>>>>>>>>>> that this can be completely abstracted away from user
> >>>>>>> interfaces
> >>>>>>>>>> as an
> >>>>>>>>>>>>>> internal class
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Totally agree. No intention to add these as public APIs.
> Will
> >>>>>>>>>> update
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> KIP to reflect this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> in the past the rationale for enforcing it at the
> >>>>>>>>>>>>>> interface layer rather than do runtime checks is that it is
> >>>>>>> more
> >>>>>>>>>>>>> efficient.
> >>>>>>>>>>>>>>> I'm not sure how much overhead it may incur to check if the
> >>>>>>>> key
> >>>>>>>>>> did
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>> change: if it is just a reference equality check maybe it's
> >>>>>>>> okay.
> >>>>>>>>>>>> What's
> >>>>>>>>>>>>>> your take on this?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Agree, reference equality should cover this validation and
> >> the
> >>>>>>>>>> overhead
> >>>>>>>>>>>>>> impact should not be meaningful.
> >>>>>>>>>>>>>> Will update the KIP to reflect this as well.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, 8 Feb 2022 at 19:05, Guozhang Wang <
> >>>>>>> wangg...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello Jorge,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for bringing this KIP! I think this is a nice idea
> to
> >>>>>>>>>> consider
> >>>>>>>>>>>>> using
> >>>>>>>>>>>>>>> a single overloaded function name for #process, just a
> >>>>>>> couple
> >>>>>>>>>> quick
> >>>>>>>>>>>>>>> questions after reading the proposal:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) Does `ValueProcessorContext` have to be a public API? It
> >>>>>>>>>> seems to
> >>>>>>>>>>>> me
> >>>>>>>>>>>>>>> that this can be completely abstracted away from user
> >>>>>>>> interfaces
> >>>>>>>>>> as
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>>> internal class, and we call the `setKey` before calling
> >>>>>>>>>>>>> user-instantiated
> >>>>>>>>>>>>>>> `process` function, and then in its overridden `forward` it
> >>>>>>>> can
> >>>>>>>>>> just
> >>>>>>>>>>>>> check
> >>>>>>>>>>>>>>> if the key changes or not.
> >>>>>>>>>>>>>>> 2) Related to 1) above, in the past the rationale for
> >>>>>>>> enforcing
> >>>>>>>>>> it at
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> interface layer rather than do runtime checks is that it is
> >>>>>>>> more
> >>>>>>>>>>>>> efficient.
> >>>>>>>>>>>>>>> I'm not sure how much overhead it may incur to check if the
> >>>>>>>> key
> >>>>>>>>>> did
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>> change: if it is just a reference equality check maybe it's
> >>>>>>>> okay.
> >>>>>>>>>>>>> What's
> >>>>>>>>>>>>>>> your take on this?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Feb 8, 2022 at 5:17 AM Jorge Esteban Quilcate Otoya
> >>>>>>> <
> >>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Dev team,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'd like to start a new discussion thread on Kafka Streams
> >>>>>>>>>> KIP-820:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This KIP is aimed to extend the current `KStream#process`
> >>>>>>>> API
> >>>>>>>>>> to
> >>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>> output values that could be chained across the topology,
> >>>>>>> as
> >>>>>>>>>> well as
> >>>>>>>>>>>>>>>> introducing a new `KStream#processValues` to use processor
> >>>>>>>>>> while
> >>>>>>>>>>>>>>> validating
> >>>>>>>>>>>>>>>> keys haven't change and repartition is not required.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Jorge.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>
> >>
> >
> >
>

Reply via email to