Thanks, John!
This looks very promising.

I will familiarize this approach and update the KIP accordingly. From what
I can see so far, this should cover most of the open issues in this
proposal.

PS.

> Just as a reminder, the current approach with transformers
> is NOT enforced at compile time. Transformers have access to
> a "forwarding disabled" processor context, which still has
> the forward methods that throw a runtime exception when
> invoked.

Agree. I was referring to the value transformers where `readOnlyKey` is
passed but not forwarded internally. Though about the "forwarding disabled"
approach, you're totally right that is a runtime validation.
Regardless, the approach proposed here will be a much better one.


On Sun, 6 Mar 2022 at 18:59, John Roesler <vvcep...@apache.org> wrote:

> Hello all,
>
> It seems like we're making good progress on this discussion.
> If I'm keeping track correctly, if we can resolve this
> question about how to handle processValues(), then we should
> be able to finalize the vote, right?
>
> I share Matthias's preference for having a type-safe API.
>
> Just as a reminder, the current approach with transformers
> is NOT enforced at compile time. Transformers have access to
> a "forwarding disabled" processor context, which still has
> the forward methods that throw a runtime exception when
> invoked.
>
> However, the spirit of the "new processor api" line of work
> is to clean up a lot of the cruft around the original
> processor API, so this is a good opportunity to introduce a
> type-safe version if we can.
>
> Based on my experience adding the new processor API, I felt
> like it should be possible to do what he suggests, but it
> would be more involved than what he said. The biggest thing
> I learned from that effort, though, is that you really have
> to just try it to see what all the complications are.
>
> With that in mind, I went ahead and implemented the
> suggestion: https://github.com/apache/kafka/pull/11854
>
> This is a functional prototype. It only adds processValues,
> which takes a supplier of a new type, FixedKeyProcessor.
> That processor only processes FixedKeyRecords, which have a
> key that cannot be changed. FixedKeyProcessors have a
> special context, a FixedKeyProcessorContext, which can only
> forward FixedKeyRecords.
>
> FixedKeyRecords have "fixed keys" because its key can only
> be set in the constructor, and its constructor is package-
> private.
>
> As you can see, this new record/processor/context ecosystem
> is an independent peer of the general one. This is necessary
> to ensure the desired compiler check. For example, if
> FixedKeyRecord were merely an interface implemented by
> Record, then users could create a new Record with a new key
> and forward it as a FixedKeyRecord, violating the
> constraint.
>
> As I said, with this proposal, the devil is in the details,
> so if anyone thinks the API can be simplified, I suggest you
> check out the branch and try out your proposal. I'd be very
> happy to have a simplier solution, but I'm also pretty sure
> this complexity is necessary.
>
> Taking a step back, I do think this approach results in a
> better API, even though the change is a little complicated.
>
> Thanks,
> -John
>
> On Sun, 2022-03-06 at 10:51 +0000, Jorge Esteban Quilcate
> Otoya wrote:
> > Matthias, thanks for your feedback.
> >
> > I can see the following alternatives to deal with `processValues()`:
> >
> > 1. Runtime key validation (current proposal)
> > 2. Using Void type. Guozhang already points out some important
> > considerations about allocating `Record` twice.
> > 3. Adding a new ValueRecord, proposed by Matthias. This one would carry
> > some of the problems of the second alternative as ValueRecord will have
> to
> > be created from a Record. Also, either by having a public constructor or
> > creation from a Record, the key _can_ be changed without being captured
> by
> > the Topology.
> > 4. Reducing the KIP scope to `process` only, and removing/postponing
> > `processValues` for a later DSL redesign.
> >
> > A couple of additional comments:
> >
> > About the Record API:
> >
> > IIUC, the issue with allocating new objects is coming from the current
> > design of the Record API.
> > If a user does record.withKey(...).withValue(...) is already leading to a
> > couple of instatiations.
> > My impression is that if the cost/value of immutability has been weighed
> > already, then maybe the considerations for alternative 2 can be
> disregarded?
> > Either way, if the cost of recreation of objects is something we want to
> > minimize, then maybe adding a Builder to the record should help to reduce
> > the allocations.
> >
> > About the key validation:
> >
> > So far, the only way I can see to _really_ validate a key doesn't change
> at
> > compile-time is by not exposing it at all — as we are doing it today with
> > Transform.
> > Otherwise, deal with it at runtime — as we have been dealing with
> Transform
> > without the ability to forward.
> > Processor API already —by definition— means lower-level abstraction,
> > therefore users should be aware of the potential runtime exceptions if
> the
> > key changes.
> > This is why I'm leaning towards alternative 1.
> >
> > Looking forward to your feedback.
> > As a reminder, the vote thread is still open. Feel free to add your vote
> or
> > amend if needed.
> >
> > Cheers,
> >
> >
> > On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax <mj...@apache.org> wrote:
> >
> > > John, thanks for verifying source compatibility. My impression was that
> > > it should be source compatible, I was just not 100% sure.
> > >
> > > The question about `processValues()` is really a hard one. Guozhang's
> > > point is very good one. Maybe we need to be pragmatic and accept the
> > > runtime check (even if I deeply hate this solution compare to a compile
> > > time check).
> > >
> > > Other possibilities to address this issue might just become too ugly?
> It
> > > seems it would require to add a new `ValueProcessorContext` that offers
> > > a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record`
> > > with immutable key? Not sure if we would be willing to go down this
> > > route? Personally, I would be ok with it, as a strongly prefer compile
> > > time checks and I am happy to extend the API surface area to achieve it
> > > -- however, I won't be surprised if others don't like this idea...
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > Thanks, Guozhang.
> > > >
> > > > > Compared with reference checks and runtime exceptions for those who
> > > > > mistakenly change the key, I think that enforcing everyone to
> `setValue`
> > > > > may incur more costs..
> > > >
> > > > This is a fair point. I agree that this may incur in more costs than
> key
> > > > checking.
> > > >
> > > > Will hold for more feedback, but if we agree I will update the KIP
> during
> > > > the week.
> > > >
> > > > Cheers,
> > > > Jorge.
> > > >
> > > >
> > > > On Sun, 27 Feb 2022 at 00:50, Guozhang Wang <wangg...@gmail.com>
> wrote:
> > > >
> > > > > Hello folks,
> > > > >
> > > > > Regarding the outstanding question, I'm actually a bit leaning
> towards
> > > the
> > > > > second option since that `withKey()` itself always creates a new
> Record
> > > > > object. This has a few implications:
> > > > >
> > > > > * That we would have to discard the previous Record object to be
> GC'ed
> > > with
> > > > > the new object --- note in practice, processing value does not mean
> > > you'd
> > > > > have to replace the whole value with `withValue`, but maybe you
> just
> > > need
> > > > > to manipulate some fields of the value object if it is a JSon /
> etc.
> > > > > * It may become an obstacle for further runtime optimizations e.g.
> skip
> > > > > serdes and interpret processing as direct byte manipulations.
> > > > >
> > > > > Compared with reference checks and runtime exceptions for those who
> > > > > mistakenly change the key, I think that enforcing everyone to
> `setValue`
> > > > > may incur more costs..
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Fri, Feb 25, 2022 at 12:54 PM Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jo...@gmail.com> wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Appreciate very much all the great feedback received so far.
> > > > > >
> > > > > > > After applying that interface change, I don't see any syntax
> > > > > > errors in our tests (which use those methods), and the
> > > > > > StreamBuilderTest still passes for me.
> > > > > >
> > > > > > This is awesome John, thank you for your efforts here.
> > > > > >
> > > > > > > Jorge, do you mind clarifying these points in the Compatibility
> > > section
> > > > > > of your KIP?
> > > > > >
> > > > > > +1. I have clarified the impact of changing the return type in
> the KIP.
> > > > > >
> > > > > > > I think the other outstanding question for you is whether
> > > > > > > the output key type for processValues should be K or Void.
> > > > > > >
> > > > > > > One thing I realized belatedly was that if we do set it to
> > > > > > > Void, then users will actually have to override the key when
> > > > > > > forwarding, like `record.withKey(null)`, whereas if we keep
> > > > > > > it is K, all users have to do is not touch the key at all.
> > > > > >
> > > > > > This is a tricky one.
> > > > > > On one hand, with Void type for key output, we force the users
> to cast
> > > to
> > > > > > Void and change the key to null,
> > > > > > though this can be documented on the API, so the users are aware
> of the
> > > > > > peculiarity of forwarding within `processValues`.
> > > > > > On the other hand, keeping the key type as output doesn't
> _require_ to
> > > do
> > > > > > any change of keys,
> > > > > > but this could lead to key-checking runtime exceptions.
> > > > > >
> > > > > > I slightly inclined myself for the first option and change the
> type to
> > > > > > `Void`.
> > > > > > This will impose a bit of pain on the users to gain some
> type-safety
> > > and
> > > > > > avoid runtime exceptions.
> > > > > > We can justify this requirement as a way to prove that the key
> hasn't
> > > > > > changed.
> > > > > >
> > > > > > Btw, thanks for this idea Matthias!
> > > > > >
> > > > > >
> > > > > > On Fri, 25 Feb 2022 at 17:10, John Roesler <vvcep...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Oh, one more thing Jorge,
> > > > > > >
> > > > > > > I think the other outstanding question for you is whether
> > > > > > > the output key type for processValues should be K or Void. I
> > > > > > > get the impression that all of us don't feel too strongly
> > > > > > > about it, so I think the ball is in your court to consider
> > > > > > > everyone's points and make a call (with justification).
> > > > > > >
> > > > > > > One thing I realized belatedly was that if we do set it to
> > > > > > > Void, then users will actually have to override the key when
> > > > > > > forwarding, like `record.withKey(null)`, whereas if we keep
> > > > > > > it as K, all users have to do is not touch the key at all.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > > On Fri, 2022-02-25 at 11:07 -0600, John Roesler wrote:
> > > > > > > > Hello all,
> > > > > > > >
> > > > > > > > I'll chime in again in the interest of trying to do a better
> > > > > > > > job of keeping KIPs moving forward...
> > > > > > > >
> > > > > > > > Matthias raised some very good questions about whether the
> > > > > > > > change is really source compatible. I just checked out the
> > > > > > > > code and make the interface change that Jorge specified in
> > > > > > > > the KIP:
> > > > > > > >
> > > > > > > > > Modified methods:
> > > > > > > > >
> > > > > > > > > KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V,
> > > > > > > > KOut, VOut> processorSupplier, String... stateStoreNames)
> > > > > > > > > KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V,
> > > > > > > > KOut, VOut> processorSupplier, Named named, String...
> > > > > > > > stateStoreNames)
> > > > > > > >
> > > > > > > > After applying that interface change, I don't see any syntax
> > > > > > > > errors in our tests (which use those methods), and the
> > > > > > > > StreamBuilderTest still passes for me.
> > > > > > > >
> > > > > > > > The reason is that the existing API already takes a
> > > > > > > > ProcessorSupplier<K, V, Void, Void> and is currently a
> > > > > > > > `void` return.
> > > > > > > >
> > > > > > > > After this interface change, all existing usages will just
> > > > > > > > bind Void to KOut and Void to VOut. In other words, KOut,
> > > > > > > > which is short for `KOut extends Object` is an upper bound
> > > > > > > > on Void, so all existing processor suppliers are still valid
> > > > > > > > arguments.
> > > > > > > >
> > > > > > > > Because the current methods are void returns, no existing
> > > > > > > > code could be assigning the result to any variable, so
> > > > > > > > moving from a void return to a typed return is always
> > > > > > > > compatible.
> > > > > > > >
> > > > > > > > Jorge, do you mind clarifying these points in the
> > > > > > > > Compatibility section of your KIP?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, 2022-02-23 at 15:07 -0800, Matthias J. Sax wrote:
> > > > > > > > > For this KIP, I also see the value. I was just trying to
> make a
> > > > > step
> > > > > > > > > back and ask if it's a good short term solution. If we
> believe it
> > > > > is,
> > > > > > > I
> > > > > > > > > am fine with it.
> > > > > > > > >
> > > > > > > > > (I am more worried about the header's KIP...)
> > > > > > > > >
> > > > > > > > > Btw: I am still wondering if we can change existing
> `process()` as
> > > > > > > > > proposed in the KIP? It the propose change source
> compatible? (It's
> > > > > > > for
> > > > > > > > > sure not binary compatible, but this seems fine -- I don't
> think we
> > > > > > > > > guarantee binary compatibility).
> > > > > > > > >
> > > > > > > > > Btw: would be good to clarify what is changes for
> process() --
> > > > > should
> > > > > > > be
> > > > > > > > > return type change from `void` to `KStream<KOut, VOut>` as
> well as
> > > > > > > > > change of `ProcessorSupplier` generic types (output types
> change
> > > > > from
> > > > > > > > > `Void` to `KOut` and `VOut`?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > -Matthias
> > > > > > > > >
> > > > > > > > > On 2/23/22 11:32 AM, Guozhang Wang wrote:
> > > > > > > > > > Hi folks,
> > > > > > > > > >
> > > > > > > > > > I agree with John that this KIP by itself could be a good
> > > > > > > improvement, and
> > > > > > > > > > I feel it aligns well with the eventual DSL 2.0 proposal
> so we do
> > > > > > > not need
> > > > > > > > > > to hold it until later.
> > > > > > > > > >
> > > > > > > > > > Regarding the last point (i.e. whether we should do
> enforcement
> > > > > > with
> > > > > > > a new
> > > > > > > > > > interface), here's my 2c: in the past we introduced
> public
> > > > > > > > > > `ValueTransfomer/etc` for two purposes, 1) to enforce
> the key is
> > > > > > not
> > > > > > > > > > modifiable, 2) to indicate inside the library's topology
> builder
> > > > > > > itself
> > > > > > > > > > that since the key is not modified, the direct
> downstream does
> > > > > not
> > > > > > > need to
> > > > > > > > > > inject a repartition stage. I think we are more or less
> on the
> > > > > same
> > > > > > > page
> > > > > > > > > > that for purpose 1), doing runtime check could be
> sufficient; as
> > > > > > for
> > > > > > > the
> > > > > > > > > > purpose of 2), as for this KIP itself I think it is
> similar to
> > > > > what
> > > > > > > we have
> > > > > > > > > > (i.e. just base on the function name "processValue"
> itself) and
> > > > > > > hence are
> > > > > > > > > > not sacrificed either. I do not know if
> > > > > > > > > > `KStream#processValue(ProcessorSupplier<K, V, Void, VOut>
> > > > > > > > > > processorSupplier)` will work, or work better, maybe
> Jorge could
> > > > > do
> > > > > > > some
> > > > > > > > > > digging and get back to us.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Feb 18, 2022 at 8:24 AM John Roesler <
> > > > > vvcep...@apache.org>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hello all,
> > > > > > > > > > >
> > > > > > > > > > > While I sympathize with Matthias’s desire to wipe the
> slate
> > > > > clean
> > > > > > > and
> > > > > > > > > > > redesign the dsl with full knowledge of everything
> we’ve
> > > > > learned
> > > > > > > in the
> > > > > > > > > > > past few years, that would also be a pretty intense
> project on
> > > > > > its
> > > > > > > own. It
> > > > > > > > > > > seems better to leave that project for someone who is
> motivated
> > > > > > to
> > > > > > > take it
> > > > > > > > > > > on.
> > > > > > > > > > >
> > > > > > > > > > > Reading between the lines, it seems like Jorge’s
> motivation is
> > > > > > > more along
> > > > > > > > > > > the lines of removing a few specific pain points. I
> appreciate
> > > > > > > Matthias
> > > > > > > > > > > extending the offer, but if Jorge doesn’t want to
> redesign the
> > > > > > dsl
> > > > > > > right
> > > > > > > > > > > now, we’re better off just accepting the work he’s
> willing to
> > > > > do.
> > > > > > > > > > >
> > > > > > > > > > > Specifically, this KIP is quite a nice improvement.
> Looking at
> > > > > > the
> > > > > > > KStream
> > > > > > > > > > > interface, roughly half of it is devoted to various
> flavors of
> > > > > > > “transform”,
> > > > > > > > > > > which makes it really hard on users to figure out
> which they
> > > > > are
> > > > > > > supposed
> > > > > > > > > > > to use for what purpose. This kip let us drop all that
> > > > > complexity
> > > > > > > in favor
> > > > > > > > > > > of just two methods, thanks to the fact that we now
> have the
> > > > > > > ability for
> > > > > > > > > > > processors to specify their forwarding type.
> > > > > > > > > > >
> > > > > > > > > > > By the way, I really like Matthias’s suggestion to set
> the KOut
> > > > > > > generic
> > > > > > > > > > > bound to Void for processValues. Then, instead of
> doing an
> > > > > > > equality check
> > > > > > > > > > > on the key during forward, you’d just set the key back
> to the
> > > > > one
> > > > > > > saved
> > > > > > > > > > > before processing (with setRecordKey). This is both
> more
> > > > > > efficient
> > > > > > > (because
> > > > > > > > > > > we don’t have the equality check) and more foolproof
> for users
> > > > > > > (because
> > > > > > > > > > > it’s enforced by the compiler instead of the runtime).
> > > > > > > > > > >
> > > > > > > > > > > Thanks, all!
> > > > > > > > > > > -John
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Feb 18, 2022, at 00:43, Jorge Esteban Quilcate
> Otoya
> > > > > > wrote:
> > > > > > > > > > > > On Fri, 18 Feb 2022 at 02:16, Matthias J. Sax <
> > > > > > mj...@apache.org>
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > > It probably deserves its own thread to start
> discussing
> > > > > > > ideas.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Yes. My question was: if we think it's time to do
> a DSL
> > > > > 2.0,
> > > > > > > should we
> > > > > > > > > > > > > drop this KIP and just fix via DSL 2.0 instead?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > Good question. Would love to hear what others think
> about
> > > > > this.
> > > > > > > > > > > >
> > > > > > > > > > > > I've stated my position about this here:
> > > > > > > > > > > >
> > > > > > > > > > > > > For this KIP specifically, I think about it as a
> > > > > continuation
> > > > > > > from
> > > > > > > > > > > > KIP-478. Therefore, it could make sense to have it
> as part of
> > > > > > > the current
> > > > > > > > > > > > version of the DSL.
> > > > > > > > > > > >
> > > > > > > > > > > > I'd even add that if this KIP is adopted, I would
> not be that
> > > > > > > > > > > disappointed
> > > > > > > > > > > > if KIP-634 is dropped in favor of a DSL v2.0 as the
> access to
> > > > > > > headers
> > > > > > > > > > > > provided by KIP-478's via Record API is much better
> than
> > > > > > previous
> > > > > > > > > > > > `.context().headers()`.
> > > > > > > > > > > >
> > > > > > > > > > > > But happy to reconsider if there is an agreement to
> focus
> > > > > > efforts
> > > > > > > > > > > towards a
> > > > > > > > > > > > DSL 2.0.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > > You're right. I'm not proposing the method
> signature.
> > > > > > > > > > > > >
> > > > > > > > > > > > > What signature do you propose? I don't see an
> update on the
> > > > > > > KIP.
> > > > > > > > > > > > >
> > > > > > > > > > > > > My bad. I have clarified this in the KIP's public
> > > > > interfaces
> > > > > > > now:
> > > > > > > > > > > >
> > > > > > > > > > > > ```
> > > > > > > > > > > >
> > > > > > > > > > > > New methods:
> > > > > > > > > > > >
> > > > > > > > > > > >      - KStream<K,VOut>
> > > > > > KStream#processValues(ProcessorSupplier<K,
> > > > > > > V, K,
> > > > > > > > > > > VOut>
> > > > > > > > > > > >      processorSupplier, String... stateStoreNames)
> > > > > > > > > > > >      - KStream<K,VOut>
> > > > > > KStream#processValues(ProcessorSupplier<K,
> > > > > > > V, K,
> > > > > > > > > > > VOut>
> > > > > > > > > > > >      processorSupplier, Named named, String...
> > > > > stateStoreNames)
> > > > > > > > > > > >
> > > > > > > > > > > > Modified methods:
> > > > > > > > > > > >
> > > > > > > > > > > >      - KStream<KOut,VOut>
> KStream#process(ProcessorSupplier<K,
> > > > > > V,
> > > > > > > KOut,
> > > > > > > > > > > VOut>
> > > > > > > > > > > >      processorSupplier, String... stateStoreNames)
> > > > > > > > > > > >      - KStream<KOut,VOut>
> KStream#process(ProcessorSupplier<K,
> > > > > > V,
> > > > > > > KOut,
> > > > > > > > > > > VOut>
> > > > > > > > > > > >      processorSupplier, Named named, String...
> > > > > stateStoreNames)
> > > > > > > > > > > >
> > > > > > > > > > > > ```
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Not sure if I understand how this would look
> like. Do you
> > > > > > > mean
> > > > > > > > > > > checking
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > on the Record itself or somewhere else?
> > > > > > > > > > > > >
> > > > > > > > > > > > > @Guozhang: I am not worried about the runtime
> overhead. I
> > > > > am
> > > > > > > worries
> > > > > > > > > > > > > about user experience. It's not clear from the
> method
> > > > > > > signature, that
> > > > > > > > > > > > > you are not allowed to change the key, what seems
> to be bad
> > > > > > > API desig.
> > > > > > > > > > > > > Even if I understand the desire to keep the API
> surface
> > > > > ares
> > > > > > > small -- I
> > > > > > > > > > > > > would rather have a compile time enforcement than
> a runtime
> > > > > > > check.
> > > > > > > > > > > > >
> > > > > > > > > > > > > For example, we have `map()` and `mapValues()` and
> > > > > > > `mapValues()` returns
> > > > > > > > > > > > > a `Value V` (enforces that that key is not
> changes) instead
> > > > > > of
> > > > > > > a
> > > > > > > > > > > > > `KeyValue<KIn,VOut>` and we use a runtime check to
> check
> > > > > that
> > > > > > > the key is
> > > > > > > > > > > > > not changed.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Naively, could we enforce something similar by
> setting the
> > > > > > > output key
> > > > > > > > > > > > > type as `Void`.
> > > > > > > > > > > > >
> > > > > > > > > > > > >      KStream#processValue(ProcessorSupplier<K, V,
> Void,
> > > > > VOut>
> > > > > > > > > > > > > processorSupplier)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Not sure if this would work or not?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Or it might be worth to add a new interface,
> > > > > > > `ValueProcessorSupplier`
> > > > > > > > > > > > > that ensures that the key is not modified?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > This is an important discussion, even more so with a
> DSL
> > > > > v2.0.
> > > > > > > > > > > >
> > > > > > > > > > > > At the moment, the DSL just flags whether
> partitioning is
> > > > > > > required based
> > > > > > > > > > > on
> > > > > > > > > > > > the DSL operation. As mentioned, `mapValues()`
> enforces only
> > > > > > the
> > > > > > > value
> > > > > > > > > > > has
> > > > > > > > > > > > changed through the DSL, though the only _guarantee_
> we have
> > > > > is
> > > > > > > that
> > > > > > > > > > > Kafka
> > > > > > > > > > > > Streams "owns" the implementation, and we can flag
> this
> > > > > > properly.
> > > > > > > > > > > >
> > > > > > > > > > > > With a hypothetical v2.0 based on Record API, this
> will be
> > > > > > > harder to
> > > > > > > > > > > > enforce with the current APIs. e.g. with
> `mapValues(Record<K,
> > > > > > V>
> > > > > > > > > > > record)`,
> > > > > > > > > > > > nothing would stop users from using
> > > > > > > > > > > `record.withKey("needs_partitioning")`.
> > > > > > > > > > > >
> > > > > > > > > > > > The approach defined on this KIP is similar to what
> we have
> > > > > at
> > > > > > > the moment
> > > > > > > > > > > > on `ValueTransformer*` where it validates at runtime
> that the
> > > > > > > users are
> > > > > > > > > > > not
> > > > > > > > > > > > calling `forward` with
> `ForwardingDisabledProcessorContext`.
> > > > > > > > > > > > `ValueProcessorSupplier` is not meant to be a public
> API.
> > > > > Only
> > > > > > > to be used
> > > > > > > > > > > > internally on `processValues` implementation.
> > > > > > > > > > > >
> > > > > > > > > > > > At first, `KStream#processValue(ProcessorSupplier<K,
> V, Void,
> > > > > > > VOut>
> > > > > > > > > > > > processorSupplier)` won't work as it will require the
> > > > > > `Processor`
> > > > > > > > > > > > implementation to actually change the key. Will take
> a deeper
> > > > > > > look to
> > > > > > > > > > > > validate if this could solve this issue.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On 2/17/22 10:56 AM, Guozhang Wang wrote:
> > > > > > > > > > > > > > Regarding the last question Matthias had, I
> wonder if
> > > > > it's
> > > > > > > similar to
> > > > > > > > > > > my
> > > > > > > > > > > > > > first email's point 2) above? I think the
> rationale is
> > > > > > that,
> > > > > > > since
> > > > > > > > > > > > > > reference checks are relatively very cheap, it is
> > > > > > worthwhile
> > > > > > > to pay
> > > > > > > > > > > this
> > > > > > > > > > > > > > extra runtime checks and in return to have a
> single
> > > > > > > consolidated
> > > > > > > > > > > > > > ProcessorSupplier programming interface (i.e. we
> would
> > > > > > > eventually
> > > > > > > > > > > > > > deprecate ValueTransformerWithKeySupplier).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 10:57 AM Jorge Esteban
> Quilcate
> > > > > > > Otoya <
> > > > > > > > > > > > > > quilcate.jo...@gmail.com> wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thank you Matthias, this is great feedback.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Adding my comments below.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, 16 Feb 2022 at 00:42, Matthias J. Sax <
> > > > > > > mj...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > In alignment to my reply to KIP-634, I am
> wondering
> > > > > if
> > > > > > > we are
> > > > > > > > > > > heading
> > > > > > > > > > > > > > > > into the right direction, or if we should
> consider to
> > > > > > > re-design the
> > > > > > > > > > > DSL
> > > > > > > > > > > > > > > > from scratch?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'm very excited about the idea of a DLS v2.0.
> It
> > > > > > probably
> > > > > > > deserves
> > > > > > > > > > > its
> > > > > > > > > > > > > own
> > > > > > > > > > > > > > > thread to start discussing ideas.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > For this KIP specifically, I think about it as
> a
> > > > > > > continuation from
> > > > > > > > > > > > > KIP-478.
> > > > > > > > > > > > > > > Therefore, it could make sense to have it as
> part of
> > > > > the
> > > > > > > current
> > > > > > > > > > > > > version of
> > > > > > > > > > > > > > > the DSL.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Even if we don't do a DSL 2.0 right now, I
> have some
> > > > > > > concerns about
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > KIP:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > (1) I am not sure if the propose changed is
> backward
> > > > > > > compatible? We
> > > > > > > > > > > > > > > > currently have:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >       void KStream#process(ProcessorSupplier,
> > > > > String...)
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > The newly proposed method:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >       KStream
> KStream#process(ProcessorSupplier)
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > seems to be an incompatible change?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > The KIP states:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Modified method KStream#process should be
> > > > > compatible
> > > > > > > with previous
> > > > > > > > > > > > > > > > version, that at the moment is fixed to a
> Void return
> > > > > > > type.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Why is it backward compatible? Having both
> old and
> > > > > new
> > > > > > > #process()
> > > > > > > > > > > seems
> > > > > > > > > > > > > > > > not to be compatible to me? Or are you
> proposing to
> > > > > > > _change_ the
> > > > > > > > > > > method
> > > > > > > > > > > > > > > > signature (if yes, the `String...` parameter
> to add a
> > > > > > > state store
> > > > > > > > > > > seems
> > > > > > > > > > > > > > > > to be missing)? For this case, it seems that
> existing
> > > > > > > programs
> > > > > > > > > > > would at
> > > > > > > > > > > > > > > > least need to be recompiled -- it would only
> be a
> > > > > > source
> > > > > > > compatible
> > > > > > > > > > > > > > > > change, but not a binary compatible change?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > You're right. I'm not proposing the method
> signature.
> > > > > > > > > > > > > > > Totally agree about compatibility issue. I was
> only
> > > > > > > considering
> > > > > > > > > > > source
> > > > > > > > > > > > > > > compatibility and was ignorant that changing
> from void
> > > > > to
> > > > > > > a specific
> > > > > > > > > > > > > type
> > > > > > > > > > > > > > > would break binary compatibility.
> > > > > > > > > > > > > > > I will update the KIP to reflect this:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Modifications to method KStream#process are
> source
> > > > > > > compatible with
> > > > > > > > > > > > > > > previous version, though not binary compatible.
> > > > > Therefore
> > > > > > > will
> > > > > > > > > > > require
> > > > > > > > > > > > > > > users to recompile their applications with the
> latest
> > > > > > > version.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I am also wondering if/how this change
> related to
> > > > > > > KIP-401:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >     From a high level it might not conflict,
> but I
> > > > > > wanted
> > > > > > > to double
> > > > > > > > > > > > > check?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Wasn't aware of this KIP, thanks for sharing!
> I don't
> > > > > > > think there is
> > > > > > > > > > > > > > > conflict between KIPs, as far as I understand.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > For `KStream#processValues()`, my main
> concern is the
> > > > > > > added runtime
> > > > > > > > > > > > > > > > check if the key was modified or not -- it
> seems to
> > > > > > > provide bad user
> > > > > > > > > > > > > > > > experience -- enforcing that the key is not
> modified
> > > > > on
> > > > > > > an API
> > > > > > > > > > > level,
> > > > > > > > > > > > > > > > would seem to be much better.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Last, what is the purpose of
> `setRecordKey()` and
> > > > > > > > > > > `clearRecordKey()`? I
> > > > > > > > > > > > > > > > am not sure if I understand their purpose?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Both methods set/clear the context (current
> key) to be
> > > > > > > used when
> > > > > > > > > > > > > checking
> > > > > > > > > > > > > > > keys on forward(record) implementation.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > enforcing that the key is not modified on an
> API
> > > > > level,
> > > > > > > would seem
> > > > > > > > > > > to
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > much better.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Not sure if I understand how this would look
> like. Do
> > > > > you
> > > > > > > mean
> > > > > > > > > > > checking
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > on the Record itself or somewhere else?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On 2/15/22 11:53 AM, John Roesler wrote:
> > > > > > > > > > > > > > > > > My apologies, this feedback was intended
> for
> > > > > KIP-634.
> > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Tue, Feb 15, 2022, at 13:15, John
> Roesler wrote:
> > > > > > > > > > > > > > > > > > Thanks for the update, Jorge,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I've just looked over the KIP again.
> Just one
> > > > > more
> > > > > > > small
> > > > > > > > > > > > > > > > > > concern:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 5) We can't just change the type of
> > > > > > Record#headers()
> > > > > > > to a
> > > > > > > > > > > > > > > > > > new fully qualified type. That would be
> a source-
> > > > > > > > > > > > > > > > > > incompatible breaking change for users.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Out options are:
> > > > > > > > > > > > > > > > > > * Deprecate the existing method and
> create a new
> > > > > > one
> > > > > > > with
> > > > > > > > > > > > > > > > > > the new type
> > > > > > > > > > > > > > > > > > * If the existing Headers is "not great
> but ok",
> > > > > > > then maybe
> > > > > > > > > > > > > > > > > > we leave it alone.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Mon, 2022-02-14 at 13:58 -0600, Paul
> Whalen
> > > > > > wrote:
> > > > > > > > > > > > > > > > > > > No specific comments, but I just
> wanted to
> > > > > > mention
> > > > > > > I like the
> > > > > > > > > > > > > > > > direction of
> > > > > > > > > > > > > > > > > > > the KIP.  My team is a big user of
> "transform"
> > > > > > > methods because of
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > ability to chain them, and I have
> always found
> > > > > > the
> > > > > > > terminology
> > > > > > > > > > > > > > > > challenging
> > > > > > > > > > > > > > > > > > > to explain alongside "process".  It
> felt like
> > > > > one
> > > > > > > concept with
> > > > > > > > > > > two
> > > > > > > > > > > > > > > > names.
> > > > > > > > > > > > > > > > > > > So moving towards a single API that is
> powerful
> > > > > > > enough to handle
> > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > > > cases seems absolutely correct to me.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Mon, Feb 14, 2022 at 1:12 PM Jorge
> Esteban
> > > > > > > Quilcate Otoya <
> > > > > > > > > > > > > > > > > > > quilcate.jo...@gmail.com> wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Got it. Thanks John, this make sense.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > I've updated the KIP to include the
> > > > > deprecation
> > > > > > > of:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >        - KStream#transform
> > > > > > > > > > > > > > > > > > > >        - KStream#transformValues
> > > > > > > > > > > > > > > > > > > >        - KStream#flatTransform
> > > > > > > > > > > > > > > > > > > >        - KStream#flatTransformValues
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Fri, 11 Feb 2022 at 15:16, John
> Roesler <
> > > > > > > vvcep...@apache.org
> > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks, Jorge!
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > I think it’ll be better to keep
> this KIP
> > > > > > > focused on KStream
> > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > only.
> > > > > > > > > > > > > > > > > > > > > I suspect that the KTable methods
> may be
> > > > > more
> > > > > > > complicated than
> > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > proposed replacement, but it’ll
> also be
> > > > > > easier
> > > > > > > to consider that
> > > > > > > > > > > > > > > > question
> > > > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > > > isolation.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > The nice thing about just
> deprecating the
> > > > > > > KStream methods and
> > > > > > > > > > > not
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > Transform* interfaces is that you
> can keep
> > > > > > > your proposal just
> > > > > > > > > > > > > > > scoped
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > KStream and not have any
> consequences for
> > > > > the
> > > > > > > rest of the DSL.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks again,
> > > > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Fri, Feb 11, 2022, at 06:43,
> Jorge
> > > > > Esteban
> > > > > > > Quilcate Otoya
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > Thanks, John.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > 4) I agree that we shouldn't
> deprecate
> > > > > > the
> > > > > > > Transformer*
> > > > > > > > > > > > > > > > > > > > > > classes, but do you think we
> should
> > > > > > > deprecate the
> > > > > > > > > > > > > > > > > > > > > > KStream#transform* methods? I'm
> curious
> > > > > if
> > > > > > > there's any
> > > > > > > > > > > > > > > > > > > > > > remaining reason to have those
> methods,
> > > > > or
> > > > > > > if your KIP
> > > > > > > > > > > > > > > > > > > > > > completely obviates them.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Good catch.
> > > > > > > > > > > > > > > > > > > > > > I considered that deprecating
> > > > > > `Transformer*`
> > > > > > > and `transform*`
> > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > go
> > > > > > > > > > > > > > > > > > > > > hand
> > > > > > > > > > > > > > > > > > > > > > in hand — maybe it happened
> similarly
> > > > > with
> > > > > > > old `Processor` and
> > > > > > > > > > > > > > > > > > > > `process`?
> > > > > > > > > > > > > > > > > > > > > > Though deprecating only
> `transform*`
> > > > > > > operations could be a
> > > > > > > > > > > better
> > > > > > > > > > > > > > > > > > > > signal
> > > > > > > > > > > > > > > > > > > > > > for users than non deprecating
> anything
> > > > > at
> > > > > > > all and pave the
> > > > > > > > > > > way
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > > deprecation.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Should this deprecation also
> consider
> > > > > > > including
> > > > > > > > > > > > > > > > > > > > `KTable#transformValues`?
> > > > > > > > > > > > > > > > > > > > > > The approach proposed on the KIP:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > `ktable.toStream().processValues().toTable()` seems fair to
> > > > > > > > > > > me,
> > > > > > > > > > > > > > > > though
> > > > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > > > will have to test it further.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > I'm happy to update the KIP if
> there's
> > > > > some
> > > > > > > consensus around
> > > > > > > > > > > > > this.
> > > > > > > > > > > > > > > > > > > > > > Will add the deprecation notes
> these days
> > > > > > > and wait for any
> > > > > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > > > > > > > feedback on this topic before
> wrapping up
> > > > > > > the KIP.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > On Fri, 11 Feb 2022 at 04:03,
> John
> > > > > Roesler
> > > > > > <
> > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Thanks for the update, Jorge!
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > I just read over the KIP
> again, and I'm
> > > > > > in
> > > > > > > support. One more
> > > > > > > > > > > > > > > > > > > > > > > question came up for me,
> though:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > 4) I agree that we shouldn't
> deprecate
> > > > > > the
> > > > > > > Transformer*
> > > > > > > > > > > > > > > > > > > > > > > classes, but do you think we
> should
> > > > > > > deprecate the
> > > > > > > > > > > > > > > > > > > > > > > KStream#transform* methods?
> I'm curious
> > > > > > if
> > > > > > > there's any
> > > > > > > > > > > > > > > > > > > > > > > remaining reason to have those
> methods,
> > > > > > or
> > > > > > > if your KIP
> > > > > > > > > > > > > > > > > > > > > > > completely obviates them.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > On Thu, 2022-02-10 at 21:32
> +0000,
> > > > > Jorge
> > > > > > > Esteban Quilcate
> > > > > > > > > > > > > > > > > > > > > > > Otoya wrote:
> > > > > > > > > > > > > > > > > > > > > > > > Thank you both for your
> feedback!
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > I have added the following
> note on
> > > > > > > punctuation:
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > > > > > > > > > NOTE: The key validation can
> be
> > > > > defined
> > > > > > > when processing the
> > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > > > Though, with punctuations it
> won't be
> > > > > > > possible to define the
> > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > validation before forwarding,
> > > > > therefore
> > > > > > > it won't be
> > > > > > > > > > > possible to
> > > > > > > > > > > > > > > > > > > > > forward
> > > > > > > > > > > > > > > > > > > > > > > > from punctuation.
> > > > > > > > > > > > > > > > > > > > > > > > This is similar behavior to
> how
> > > > > > > `ValueTransformer`s behave
> > > > > > > > > > > at
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > moment.
> > > > > > > > > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Also make it explicit also
> that we
> > > > > are
> > > > > > > going to apply
> > > > > > > > > > > > > > > referencial
> > > > > > > > > > > > > > > > > > > > > > > equality
> > > > > > > > > > > > > > > > > > > > > > > > for key validation.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > I hope this is covering all
> your
> > > > > > > feedback, let me know if
> > > > > > > > > > > I'm
> > > > > > > > > > > > > > > > > > > > missing
> > > > > > > > > > > > > > > > > > > > > > > > anything.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > > > > > > > > Jorge.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > On Wed, 9 Feb 2022 at 22:19,
> Guozhang
> > > > > > > Wang <
> > > > > > > > > > > wangg...@gmail.com
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I'm +1 on John's point 3)
> for
> > > > > > > punctuations.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > And I think if people are
> on the
> > > > > same
> > > > > > > page that a reference
> > > > > > > > > > > > > > > > > > > > equality
> > > > > > > > > > > > > > > > > > > > > > > check
> > > > > > > > > > > > > > > > > > > > > > > > > per record is not a huge
> overhead,
> > > > > I
> > > > > > > think doing that
> > > > > > > > > > > > > > > enforcement
> > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > > > > > > > > > > than documentations and
> hand-wavy
> > > > > > > undefined behaviors.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 9, 2022 at
> 11:27 AM
> > > > > John
> > > > > > > Roesler <
> > > > > > > > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP Jorge,
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > I'm in support of your
> proposal.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > 1)
> > > > > > > > > > > > > > > > > > > > > > > > > > I do agree with
> Guozhang's point
> > > > > > > (1). I think the cleanest
> > > > > > > > > > > > > > > > > > > > > > > > > > approach. I think it's
> cleaner
> > > > > and
> > > > > > > better to keep the
> > > > > > > > > > > > > > > > > > > > > > > > > > enforcement internal to
> the
> > > > > > > framework than to introduce a
> > > > > > > > > > > > > > > > > > > > > > > > > > public API or context
> wrapper for
> > > > > > > processors to use
> > > > > > > > > > > > > > > > > > > > > > > > > > explicitly.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > 2) I tend to agree with
> you on
> > > > > this
> > > > > > > one; I think the
> > > > > > > > > > > > > > > > > > > > > > > > > > equality check ought to
> be fast
> > > > > > > enough in practice.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > 3) I think this is
> implicit, but
> > > > > > > should be explicit in the
> > > > > > > > > > > > > > > > > > > > > > > > > > KIP: For the
> `processValues` API,
> > > > > > > because the framework
> > > > > > > > > > > sets
> > > > > > > > > > > > > > > > > > > > > > > > > > the key on the context
> before
> > > > > > > calling `process` and then
> > > > > > > > > > > > > > > > > > > > > > > > > > unsets it afterwards,
> there will
> > > > > > > always be no key set
> > > > > > > > > > > during
> > > > > > > > > > > > > > > > > > > > > > > > > > task puctuation.
> Therefore, while
> > > > > > > processors may still
> > > > > > > > > > > > > > > > > > > > > > > > > > register punctuators,
> they will
> > > > > not
> > > > > > > be able to forward
> > > > > > > > > > > > > > > > > > > > > > > > > > anything from them.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > This is functionally
> equivalent
> > > > > to
> > > > > > > the existing
> > > > > > > > > > > > > > > > > > > > > > > > > > transformers, by the
> way, that
> > > > > are
> > > > > > > also forbidden to
> > > > > > > > > > > forward
> > > > > > > > > > > > > > > > > > > > > > > > > > anything during
> punctuation.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > For what it's worth, I
> think this
> > > > > > is
> > > > > > > the best tradeoff.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > The only alternative I
> see is not
> > > > > > to
> > > > > > > place any restriction
> > > > > > > > > > > > > > > > > > > > > > > > > > on forwarded keys at all
> and just
> > > > > > > document that if users
> > > > > > > > > > > > > > > > > > > > > > > > > > don't maintain proper
> > > > > partitioning,
> > > > > > > they'll get undefined
> > > > > > > > > > > > > > > > > > > > > > > > > > behavior. That might be
> more
> > > > > > > powerful, but it's also a
> > > > > > > > > > > > > > > > > > > > > > > > > > usability problem.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2022-02-09 at
> 11:34
> > > > > +0000,
> > > > > > > Jorge Esteban Quilcate
> > > > > > > > > > > > > > > > > > > > > > > > > > Otoya wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Guozhang.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Does
> `ValueProcessorContext`
> > > > > > > have to be a public API? It
> > > > > > > > > > > > > > > > > > > > seems
> > > > > > > > > > > > > > > > > > > > > > > to me
> > > > > > > > > > > > > > > > > > > > > > > > > > > that this can be
> completely
> > > > > > > abstracted away from user
> > > > > > > > > > > > > > > > > > > > interfaces
> > > > > > > > > > > > > > > > > > > > > > > as an
> > > > > > > > > > > > > > > > > > > > > > > > > > > internal class
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Totally agree. No
> intention to
> > > > > > add
> > > > > > > these as public APIs.
> > > > > > > > > > > > > Will
> > > > > > > > > > > > > > > > > > > > > > > update
> > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > KIP to reflect this.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > in the past the
> rationale for
> > > > > > > enforcing it at the
> > > > > > > > > > > > > > > > > > > > > > > > > > > interface layer rather
> than do
> > > > > > > runtime checks is that it
> > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > > > > > > > > efficient.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure how much
> > > > > overhead
> > > > > > > it may incur to check if
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > > > > > change: if it is just a
> > > > > reference
> > > > > > > equality check maybe
> > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > okay.
> > > > > > > > > > > > > > > > > > > > > > > > > What's
> > > > > > > > > > > > > > > > > > > > > > > > > > > your take on this?
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Agree, reference
> equality
> > > > > should
> > > > > > > cover this validation
> > > > > > > > > > > and
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > overhead
> > > > > > > > > > > > > > > > > > > > > > > > > > > impact should not be
> > > > > meaningful.
> > > > > > > > > > > > > > > > > > > > > > > > > > > Will update the KIP to
> reflect
> > > > > > > this as well.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 8 Feb 2022 at
> 19:05,
> > > > > > > Guozhang Wang <
> > > > > > > > > > > > > > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Jorge,
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for bringing
> this
> > > > > KIP! I
> > > > > > > think this is a nice
> > > > > > > > > > > idea
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > consider
> > > > > > > > > > > > > > > > > > > > > > > > > > using
> > > > > > > > > > > > > > > > > > > > > > > > > > > > a single overloaded
> function
> > > > > > > name for #process, just a
> > > > > > > > > > > > > > > > > > > > couple
> > > > > > > > > > > > > > > > > > > > > > > quick
> > > > > > > > > > > > > > > > > > > > > > > > > > > > questions after
> reading the
> > > > > > > proposal:
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) Does
> > > > > `ValueProcessorContext`
> > > > > > > have to be a public
> > > > > > > > > > > API? It
> > > > > > > > > > > > > > > > > > > > > > > seems to
> > > > > > > > > > > > > > > > > > > > > > > > > me
> > > > > > > > > > > > > > > > > > > > > > > > > > > > that this can be
> completely
> > > > > > > abstracted away from user
> > > > > > > > > > > > > > > > > > > > > interfaces
> > > > > > > > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > > > > > > > > > internal class, and
> we call
> > > > > the
> > > > > > > `setKey` before calling
> > > > > > > > > > > > > > > > > > > > > > > > > > user-instantiated
> > > > > > > > > > > > > > > > > > > > > > > > > > > > `process` function,
> and then
> > > > > in
> > > > > > > its overridden
> > > > > > > > > > > `forward` it
> > > > > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > > > > > > > check
> > > > > > > > > > > > > > > > > > > > > > > > > > > > if the key changes
> or not.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) Related to 1)
> above, in
> > > > > the
> > > > > > > past the rationale for
> > > > > > > > > > > > > > > > > > > > > enforcing
> > > > > > > > > > > > > > > > > > > > > > > it at
> > > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > interface layer
> rather than
> > > > > do
> > > > > > > runtime checks is that
> > > > > > > > > > > it is
> > > > > > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > > > > > > > > efficient.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure how much
> > > > > overhead
> > > > > > > it may incur to check if
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > key
> > > > > > > > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > > > > > > change: if it is
> just a
> > > > > > > reference equality check maybe
> > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > > > okay.
> > > > > > > > > > > > > > > > > > > > > > > > > > What's
> > > > > > > > > > > > > > > > > > > > > > > > > > > > your take on this?
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 8, 2022
> at 5:17
> > > > > AM
> > > > > > > Jorge Esteban Quilcate
> > > > > > > > > > > Otoya
> > > > > > > > > > > > > > > > > > > > <
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> quilcate.jo...@gmail.com>
> > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dev team,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start
> a new
> > > > > > > discussion thread on Kafka
> > > > > > > > > > > Streams
> > > > > > > > > > > > > > > > > > > > > > > KIP-820:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > This KIP is aimed
> to extend
> > > > > > > the current
> > > > > > > > > > > `KStream#process`
> > > > > > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > > > return
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > output values that
> could be
> > > > > > > chained across the
> > > > > > > > > > > topology,
> > > > > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > > > > well as
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > introducing a new
> > > > > > > `KStream#processValues` to use
> > > > > > > > > > > processor
> > > > > > > > > > > > > > > > > > > > > > > while
> > > > > > > > > > > > > > > > > > > > > > > > > > > > validating
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > keys haven't
> change and
> > > > > > > repartition is not required.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Looking forward to
> your
> > > > > > > feedback.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jorge.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
>
>

Reply via email to