104. Fair enough -- also happy to defer to Rohan on this (or Bruno if he
feels super strongly)

107. That's a good point . Ultimately the task load should reflect the
processing capacity, and that's something that will exist in both the new
and old threading model. I like #processingCapacity for the name. And for
the javadocs, I think we can just say something generic enough that it will
be accurate in both the old and new model and won't need to be updated
(since, as you mentioned before, we always forget to update random
javadocs).

As for the NOTE, however, I do feel that is necessary. At the very least,
it's necessary if we rename the method to something with the word
"capacity", since that tends to imply a "maximum limit" rather than a
"minimum load". In fact I think the original method was named capacity and
that's why I added this NOTE to the javadoc in the first place, to avoid
confusion. I can see why it doesn't make sense if the name doesn't have the
word "capacity" in it though.

But I do think #processingCapacity feels appropriate. I guess a reasonable
alternative would just be #numProcessingThreads, which is both descriptive
enough to not need the NOTE in the javadocs and also accurately describes
both the old one-consumer-per-StreamThread model and the new one (it's just
a matter of what "processing thread" means will change). So how about we
just call it #numProcessingThreads and in the javadocs we can say it's like
a capacity that should correspond to the relative "weight" of assigned
tasks relative to other KafkaStreams clients? WDYT?

108. Yes, I 100% believe that a custom assignor should be able to schedule
a followup rebalance:

108(1) For one thing, imo the HATaskAssignor shouldn't be a "special case"
but should effectively look just like any other custom assignor. And it
definitely needs the ability to schedule followups. How would the
StreamsPartitionAssignor figure out whether to schedule a followup
rebalance for the HAAssignor? From the StreamsPArtitionAssignor POV it just
sees "active" and "standby" tasks -- It doesn't know whether some of those
standby tasks are actually "warmup tasks". And anyways, I think users
should be able to request a followup rebalance. What is the concern over
giving custom assignors control over this? Imo if people abuse this and
shoot themselves in the foot by scheduling followup rebalances for no
reason, that's on them. As always -- "simple things should be easy,
difficult things should be possible"

108(2) This is a fair question, although I still believe we should give the
custom assignor full control over the scheduled followup rebalance
timeline. Again, because more advanced things should be possible -- and
it's not like having this API return an Instant makes things more
complicated for people who want to do simple things, users are free to
ignore this completely and returning an Instant doesn't feel more difficult
than returning an enum. So why restrict this?

To take a more specific example: let's say users have a complicated set of
metrics they use to determine task placement. Sometimes these metrics are
unavailable, in which case they want to schedule a followup rebalance but
may want to implement a backoff/retry rather than simply scheduling an
immediate followup. I know for a fact that immediate followup rebalances
triggered by Kafka Streams can actually cause issues in some cases (see
KAFKA-14382 <https://issues.apache.org/jira/browse/KAFKA-14382> and
KAFKA-14419 <https://issues.apache.org/jira/browse/KAFKA-14419> -- same
root cause but note that the second issue is still unresolved to this day,
and I know someone besides the issue reporter who has repeatedly been
affected by it). Giving users the ability to back off and schedule smarter
"immediate" followups if/when they run into issues seems like a sufficient
motivation to me. For another: perhaps these complex metrics are advanced
enough to be able to predict when a given task will be "warmed up" (to take
the HAAssignor example) or otherwise be able to compute an exact time at
which to cut over to a new task assignment. In this case it would be
necessary to have full flexibility over when the followup was triggered.

115. I see what you mean here. I was originally thinking that way, but was
worried that users might "accidentally" catch whatever exception we throw
if the task lag computation fails and not know how to handle it. But I
suppose we can just say in the javadocs that you can/should not catch it
and/or rethrow to allow Streams to recover and re-attempt. I agree we
should have Streams just handle this transparently for users and not
require them to rebuild the assignment on their own. I'll add this to the
javadocs -- and I don't think we need to introduce a new exception type
even. We have the "TaskAssignmentException" already which behaves similarly
now -- ie if there's an error during assignment, we throw this and return
the same assignment back and schedule a followup.

So how about we (a) put in the TaskAssignor#assign javadocs that if a
TaskAssignmentException is thrown, Streams will return the same assignment
and automatically schedule an immediate followup rebalance, and (b) note in
the #kafkaStreamsStates(boolean computeTaskLags) javadocs that this can
throw a TaskAssignmentException, which should be rethrown in order to retry
the rebalance. I think this is basically what you're saying here so I'll go
ahead and update the KIP with this but lmk if there's anything else

But coming back to what I said in 108(2) -- I do think it would be valuable
for users to be able to determine their own followup rebalance schedule,
even for "immediate" followup rebalances (again, as a way to mitigate
issues like KAFKA-14419 <https://issues.apache.org/jira/browse/KAFKA-14419>).
Let's say there's an error in someone's own assignor code, or again, it's
trying to make a decision based on some custom metrics that are
unavailable. In this case I feel the utility method would still be helpful,
even if we would handle it transparently for errors in things like the task
lag computation. It's a utility method -- there for people who want to use
it, everyone else can ignore it (and most users probably will). But for
those who want ultimate control (mwahaha) it's a nice-to-have so they don't
have to do *everything* from scratch. WDYT?

117. That's fair. I agree this is probably the most tricky of the open
questions, and I'm happy to defer. Although I do strongly believe that we
shouldn't call certain kinds of assignments "invalid" (such as a task not
being assigned to anyone). For the clearly-invalid assignment cases, I'd
err on the side of not holding users hands too much for now, but again:
would be happy to defer if anyone has another suggestion and/or strong
opinion.

On Wed, Apr 24, 2024 at 10:13 PM Matthias J. Sax <mj...@apache.org> wrote:

> 104: I also don't feel super strong about it. Not sure if
> `onAssignment()` might overload the name in a confusing way? In the end,
> when the method is called, we don't assign anything? -- Guess, I am fine
> with whatever Rohan picks as a name from the suggestions we have so far.
>
>
> 107: Did not think about how to do it yet. Just raised the question to
> see if I am even heading into the right direction or not... I did not
> propose to remove the method; it's clear that we need it.
>
> Thinking about it a little more, what we actually want to convey is a
> certain "processing capacity" an instance has? Thus,
> `numConsumerClients()` might not reflect this in the future? Should we
> just generically call it `processingCapacity()` or similar and for now
> explain in the JavaDocs that it maps to number of (currently running)
> `StreamsThread` (currently running, because users can dynamically
> add/remove them...). We can later update the JavaDocs when we have
> "processing threads" and point to number of processing threads? Maybe
> Lucas/Bruno can provide more input on what/how we plan the future
> threading model and configs.
>
> Nit: not sure if we need the "NOTE" section at all? If we think we want
> it, maybe remove from the KIP and we can discuss in more detail on the
> PR (think it could be improved). Don't think the JavaDocs on he KIP but
> be 100% accurate to what we put into the code later.
>
>
> 108: I guess my question is two-fold. (1) Does user-code need to decide
> to schedule a probing rebalance to begin with? Or could the
> non-customizable part of `StreamsPartitionAssignor` decide it? (2) If
> custom code really need to make this decision, why would it not just
> return a boolean? It seems unnecessary to compute a deadline, given that
> the probing rebalance interval is a config? -- Or maybe I am missing
> something? If it's about regular probing rebalance vs immediate
> rebalance vs no follow up, maybe an enum would do the trick?
>
>
> 115: Thanks for the explanation. This does make sense. I am wondering if
> we need the new utility method though? Would it not be possible to
> encapsulate all this inside the non-customizable code? The method
> `kafkaStreamsStates(boolean computeTaskLags)` will be provided by us and
> called by the user code. Thus, if we cannot compute the lag, we could
> still throw an exception -- the user code does not need to know anything
> about it, and is not supposed to catch this exception. Hence, it should
> bubble up and get back to our code from
> `TaskAssingor#assign(ApplicationState applicationState)` which is called
> by us, and we can catch our own exception here, and do what we do
> currently: we return the old assignment, and request an immediate follow
> up rebalance? For this case, the user code does not need to know
> anything about it, and does not need to do anything special, and it
> would become a provided built-in feature what seems desirable?
>
>
> 117: not sure myself... Let's see what others think. I'll think about it
> a little bit more and follow up again later. Its a tricky one.
>
>
> -Matthias
>
>
> On 4/24/24 5:08 PM, Sophie Blee-Goldman wrote:
> > Now to respond to Matthias:
> >
> > FYI, I'm following the numbering scheme from your email but added **** to
> > mark responses with further questions or feedback and/or aren't yet
> > addressed in the KIP and need to be followed up on. You can more or less
> > just skip over the ones without stars to save time
> >
> > 100: I think this is leftover from a previous approach we considered.
> > Removed this line
> >
> > 101: I agree. Perhaps we hadn't fully committed to this decision when the
> > KIP was first written ;P Added a "Consumer Assignments" section under
> > "Public Changes" to address this more carefully and explicitly
> >
> > 102: fixed
> >
> > 103: fixed
> >
> > ****104: I do agree with Bruno on the structure of this callback name, ie
> > that it should start with "on", but any of the suggestions with that
> sound
> > good to me. I really don't feel too strongly but just to throw in a bit
> of
> > extra context, there is an analogous callback on the
> > ConsumerPartitionAssignor that is called #onAssignment. So personally I
> > would slightly prefer to just call it #onAssignment. However I'm happy to
> > go with whatever the consensus is -- @Bruno/@Matthias WDYT?
> >
> > 105: done (fyi I did leave the config doc string which is technically a
> > private variable but is part of the public contract)
> >
> > 106: good point about the numKafkaStreamsClients and toString methods --
> > removed those
> >
> > ****107: I guess this reflects how long it's been since the KIP was first
> > written :P But that's a fair point -- and yes, we should write this in a
> > forward looking manner. However, we can't build it in a way that's so
> > forward-looking that it doesn't work for the current version. Are you
> > proposing to remove this API altogether, or just rename it to
> > #numConsumerClients (or something like that) and update the javadocs
> > accordingly?  Assuming the latter, I totally agree, and have made the
> > change. But we definitely can't just remove it altogether (it may even be
> > relevant in the new threading model if we eventually allow configuring
> the
> > number of consumer clients independently of the processing threads -- but
> > that's a different conversation. The important thing is that this KIP be
> > compatible with the old/current threading model, in which case we need
> this
> > API).
> > Anyways, please take a look at the new javadocs and method name and lmk
> if
> > that makes sense to you
> >
> > ****108: The #followupRebalanceDeadline allows the custom assignor to
> > request followup rebalances, for example in order to probe for
> restoration
> > progress or other conditions for task assignment. This is fundamental to
> > the HighAvailabilityTaskAssignor (ie the default assignor) and may be
> > useful to custom assignors with similar such approaches. So it's
> definitely
> > necessary -- are you asking why we have it at all, or why it's an API on
> > the KafkaStreamsAssignment class and not, say, the TaskAssignment class?
> >
> > 109: Makes sense to me -- done
> >
> > 110: ack -- updated all mentions of "node" to "KafkaStreams client"
> >
> > 111: ack -- updated name from "consumer" to "consumerClientId"
> >
> > 112: that's fair -- I do think it's valuable to have "#allTasks" since
> some
> > assignors may not care about the stateful vs stateless distinction, but
> > it's weird to have #statefulTasks without #statelessTasks. Let's just
> have
> > all three. Added to the KIP
> >
> > 113: this makes sense to me. Updated computeTaskLags  to be an input
> > parameter instead of a mutating API. Also noted that it can throw a
> > TimeoutException in this case (this is relevant for point 115 below)
> >
> > 114: fixed
> >
> > ****115: Reasonable question -- I think the way it's described right now
> is
> > a bit awkward, and there's a better way to approach the issue. Ultimately
> > the "issue" is how we can handle failures in things like the task lag
> > computation, which notably makes a remote call to the brokers and can/has
> > been known to fail at times. Right now if this API fails, the
> > StreamsPartitionAssignor will just return the same assignment as the
> > previous one and trigger an immediate followup rebalance. This exception
> > was meant to be a "utility" that can be thrown to indicate to the
> > StreamsPartitionAssignor to just return the old assignment and trigger an
> > immediate followup.
> > That said, this exception does feel like an awkward way to do it,
> > especially since the TaskAssignor can already do all of this via native
> > APIs: it can request a followup rebalance (and better yet, determine for
> > itself what a reasonable retry/backoff interval would be). It can also
> just
> > return the same assignment -- the only issue is that implementing this is
> > kind of annoying. So I would propose that instead of throwing a
> > RetryableException, we should just add an additional utility method to
> the
> > TaskAssignmentUtils that does this "fallback" assignment and returns the
> > same tasks to their previous clients.
> > Added TaskAssignmentUtils#identityAssignment for this (though I'm happy
> to
> > take other name suggestions)
> >
> > That's a long paragraph, sorry -- but hopefully it makes sense?
> >
> > 116: fair point. Updated the wording -- hopefully it makes more sense
> now.
> >
> > ****117: This is an interesting question. I think it's worth looking at
> > each of the cases you listed individually (and thinking about other cases
> > we might want to allow/disallow):
> >
> > 117a. Task assigned to two different clients: invalid assignment (but
> > should we enforce this?)
> > Imo we should look to the analogous ConsumerPartitionAssignor for this
> > question, which does not do any verification of the basic "each partition
> > is assigned to exactly one consumer" requirement) In the end this is an
> > advanced feature, and we should trust users to correctly implement and
> test
> > their custom assignors. So I would personally opt not to do any
> > post-assignment verification -- although I'm flexible on this if anyone
> > else feels strongly.
> >
> > 117b. Task not assigned to any client: valid assignment
> > Imo this should be allowed and could be used to implement some
> interesting
> > features. For example let's say you wanted to introduce a "pause" or
> > checkpoint-like feature, wherein Streams will stop processing new records
> > from the input topics but will continue processing everything downstream
> > until all intermediary records are flushed, ie the repartition topics are
> > drained. One way to do this would be to trigger a rebalance and assign
> only
> > downstream subtopologies, so that tasks which read from the input topics
> > aren't processed (until it's "restarted").
> >
> > 117c. Active and standby on the same KafkaStreams client: valid(?)
> > assignment
> > I'm not quite convinced either way on this, but think there could be edge
> > cases for which advanced users might want to have an active and standby
> > version of an in-memory store on the same client. Obviously this doesn't
> > make sense if you scale in/out by adding/removing entire KafkaStreams
> > clients, but those who scale up/down by adding/removing single
> > StreamThreads would benefit from relaxing this constraint.
> >
> > 117d. Unknown ProcessId: invalid assignment (should be enforced)
> > This one definitely sounds the most like a clear contract violation and
> in
> > fact, would make it impossible for the StreamsPartitionAssignor to
> > correctly process the returned KafkaStreamsAssignment into a set of
> > individual consumer assignments. So we almost certainly need to throw an
> > exception here/call out an error here. Maybe we can use the #onAssignment
> > callback to return an error code as well? WDYT?
> >
> > 118. Agreed, right now it's a bit hard to tell which APIs are for the
> user
> > to access, and which ones are for them to implement. Updated the KIP to
> > make this more clear by breaking up the classes into "User APIs" and
> > "Read-only APIs" (with a short description of each category).
> >
> > 119. Good point, added a public constructor for AssignedTask
> >
> > 120. Also good point, gave the #onAssignment callback a default no-op
> > implementation
> >
> >
> > On Wed, Apr 24, 2024 at 2:32 PM Sophie Blee-Goldman <
> sop...@responsive.dev>
> > wrote:
> >
> >> Responding to Bruno first:
> >>
> >> (1) I actually think "KafkaStreams" is exactly right here -- for the
> >> reason you said, ultimately this is describing a literal instance of the
> >> "KafkaStreams" class. Glad we hashed this out! (I saw that Rohan went
> with
> >> StreamsClient but i also prefer KafkaStreams)
> >>
> >> (4) Rohan is  right about what I was saying -- but I'm now realizing
> that
> >> I completely misinterpreted what your concern was. Sorry for the
> >> long-winded and ultimately irrelevant answer. I'm completely fine with
> >> having the return type be a simple Set with additional info such as
> TaskId
> >> in the AssignedTask class (and I see Rohan already made this change so
> >> we're all good)
> >>
> >> (5) I don't insist either way :)   ApplicationState works for me
> >>
> >> On Fri, Apr 19, 2024 at 9:37 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >>> One more thing. It might be good to clearly call out, which interfaced
> a
> >>> user would implement, vs the other ones Kafka Streams implements and
> >>> TaskAssignor only uses.
> >>>
> >>> My understanding is, that users would implement `TaskAssignor`,
> >>> `TaskAssignment`, and `StreamsClientAssignment`.
> >>>
> >>> For `AssignedTask` it seems that users would actually only need to
> >>> instantiate them. Should we add a public constructor?
> >>>
> >>> Also wondering if we should add an empty default implementation for
> >>> `onAssignmentComputed()` as it seems not to be strictly necessary to
> use
> >>> this method?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 4/19/24 7:30 PM, Matthias J. Sax wrote:
> >>>> Great KIP. I have some minor comments/questions:
> >>>>
> >>>>
> >>>> 100 The KIP says: "In the future, additional plugins can use the same
> >>>> partition.assignor  prefix". What does this mean?
> >>>>
> >>>>
> >>>> 101 (nit) The KIP says: "Note that the thread-level assignment will
> >>>> remain an un-configurable internal implementation detail of the
> >>>> partition assignor (see "Rejected Alternatives" for further thoughts
> >>> and
> >>>> reasoning)." -- When I was reading this the first time, I did not
> >>>> understand it, and it did only become clear later (eg while reading
> the
> >>>> discussion thread). I think it would be good to be a little bit more
> >>>> explicit, because this is not just some minor thing, but a core design
> >>>> decision (which I, btw, support).
> >>>>
> >>>>
> >>>> 102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).
> >>>>
> >>>>
> >>>> 103 (nit): "new non-internal package" -> replace 'non-internal' with
> >>>> 'public' :)
> >>>>
> >>>>
> >>>> 104: Method name `TaskAssignor#onAssignmentComputed()` -> the name
> >>> seems
> >>>> to be a little bit clumsy? I kinda like the original
> >>> `finalAssignment()`
> >>>> -- I would also be happy with `onFinalAssignment` to address Bruno's
> >>>> line of thinking (which I think is a good call out). (Btw:
> >>>> `finalAssignment` is still used in the text on the KIP and should also
> >>>> be updated.)
> >>>>
> >>>>
> >>>> 105: Please remove all `private` variables. We should only show public
> >>>> stuff on the KIP. Everything else is an implementation detail.
> >>>>
> >>>>
> >>>> 106: `TaskAssignment#numStreamsClients()` -- why do we need this
> >>> method?
> >>>> Seems calling `assignment()` gives as a collection and we can just
> call
> >>>> size() on it to get the same value? -- Also, why do we explicitly call
> >>>> out the overwrite of `toString()`; seems unnecessary?
> >>>>
> >>>>
> >>>> 107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the
> >>>> number of StreamThreads on this client, which is equal to the number
> of
> >>>> main consumers and represents its overall capacity." -- Given our
> >>>> planned thread refactoring, this might not hold correct for long (and
> I
> >>>> am sure we will forget to updated the JavaDocs later). Talking to
> Lucas
> >>>> the plan is to cut down `StreamsThread` to host the consumer (and
> there
> >>>> will be only one, and it won't be configurable any longer), and we
> >>> would
> >>>> introduce a number of configurable "processing threads". Can/should we
> >>>> build this API in a forward looking manner?
> >>>>
> >>>>
> >>>> 108: Why do we need
> >>>> `StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how
> >>>> this would be useful?
> >>>>
> >>>>
> >>>> 109 `StreamsClientState#consumers`: should we rename this to
> >>>> `#consumerClientIds()`?
> >>>>
> >>>>
> >>>> 110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc
> >>>> says 'owned by consumers on this node' -- Should we just say `owned by
> >>>> the Streams client`?
> >>>>
> >>>>
> >>>> 111 `StreamsClientState#prevTasksByLag()`: it takes a `String
> consumer`
> >>>> parameter -- not clear what this is -- I guess it's a consumer's
> >>>> client.id? If yes, should we rename the parameter `consumerClientId`?
> >>>>
> >>>>
> >>>> 112 `ApplicationState`: what is the reason to have `allTasks()` and
> >>>> `stafefulTasks() -- why not have `statelessTasks()` and
> >>>> `statefulTasks()` instead? Or all three?
> >>>>
> >>>>
> >>>> 113 `ApplicationState#computeTaskLags()`: I understand the
> >>> indent/reason
> >>>> why we have this one, but it seems to be somewhat difficult to use
> >>>> correctly, as it triggers an internal side-effect... Would it be
> >>>> possible to replace this method in favor of passing in a `boolean
> >>>> computeTaskLag` parameter into #streamClientState() instead, what
> might
> >>>> make it less error prone to use, as it seems the returned
> >>>> `StreamsClient` object would be modified when calling
> >>> #computeTaskTags()
> >>>> and thus both are related to each other?
> >>>>
> >>>>
> >>>> 114 nit/typo: `ApplicationState#streamsClientStates()` returns
> >>>> `StreamsClientState` not `StreamsClient`.
> >>>>
> >>>>
> >>>> 115 `StreamsAssignorRetryableException`: not sure if I fully
> understand
> >>>> the purpose of this exception.
> >>>>
> >>>>
> >>>> 116 "No actual changes to functionality": allowing to plug in customer
> >>>> TaskAssignor sounds like adding new functionality. Can we rephrase
> this?
> >>>>
> >>>>
> >>>>
> >>>> 117: What happens if the returned assignment is "invalid" -- for
> >>>> example, a task might not have been assigned, or is assigned to two
> >>>> nodes? Or a standby is assigned to the same node as its active? Or a
> >>>> `StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if
> >>>> this list of potential issues is complete or not...)
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 4/18/24 2:05 AM, Bruno Cadonna wrote:
> >>>>> Hi Sophie,
> >>>>>
> >>>>> Thanks for the clarifications!
> >>>>>
> >>>>> (1)
> >>>>> What about replacing Node* with KafkaStreams* or StreamsClient*? I
> >>>>> prefer KafkaStreams* since that class represents the Kafka Streams
> >>>>> client. I am also fine with KafkaStreamsClient*. I really would like
> >>>>> to avoid introducing a new term in Kafka Streams for which we already
> >>>>> have an equivalent term even if it is used on the brokers since that
> >>>>> is a different level of abstraction. Additionally, I have never been
> a
> >>>>> big fan of the term "instance".
> >>>>>
> >>>>> (4)
> >>>>> I think the question is if we need to retrieve assignment metadata by
> >>>>> task for a Kafka client or if it is enough to iterate over the
> >>>>> assigned tasks. Could you explain why we cannot add additional
> >>>>> metadata to the class AssignedTask?
> >>>>> The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) )
> >>>>> could be something like
> >>>>>
> >>>>> public interface NodeAssignment {
> >>>>>       ProcessID processId();
> >>>>>
> >>>>>       Instant followupRebalanceDeadline();
> >>>>>
> >>>>>       Set<AssignedTask> assignment();
> >>>>>
> >>>>>       enum AssignedTaskType {
> >>>>>       STATELESS,
> >>>>>           STATEFUL,
> >>>>>           STANDBY
> >>>>>       }
> >>>>>
> >>>>>       static class AssignedTask {
> >>>>>           AssignedTaskType type();
> >>>>>           TaskId id();
> >>>>>
> >>>>>           ... other metadata needed in future
> >>>>>       }
> >>>>> }
> >>>>> If we need to retrieve assigned task by task ID, maybe it is better
> to
> >>>>> add methods like assignedFor(TaskId) and not to expose the Map.
> >>>>>
> >>>>> (5)
> >>>>> I am in favor of ApplicationState but I am also fine
> >>>>> ApplicationMetadata if you insist.
> >>>>>
> >>>>> (6)
> >>>>> Is
> >>>>>
> >>>>> void finalAssignment(GroupAssignment assignment, GroupSubscription
> >>>>> subscription);
> >>>>>
> >>>>> kind of a callback? If yes, would it make sense to call it
> >>>>> onAssignmentComputed()?
> >>>>>
> >>>>>
> >>>>> (7)
> >>>>> What do you think of changing the TaskAssignmentUtils signatures to
> >>>>>
> >>>>> public static TaskAssignment default*Assignment(final
> ApplicationState
> >>>>> applicationState, final TaskAssignment taskAssignment, ...) {...}
> >>>>>
> >>>>> to avoid to mutate the assignment in place?
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>> On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote:
> >>>>>> Thanks Bruno! I can provide a bit of context behind some of these
> >>>>>> decisions but I just want to say up front that I agree with every
> >>>>>> single one
> >>>>>> of your points, though I am going to push back a bit on the first
> one.
> >>>>>>
> >>>>>> [1] The idea here is to help avoid some confusion around the
> >>> overloaded
> >>>>>> term "client", which can mean either "an instance of Kafka Streams"
> or
> >>>>>> "a consumer/producer client". The problem is that the former applies
> >>> to
> >>>>>> the entire Streams process and therefore should be interpreted as
> "all
> >>>>>> of the StreamThread on an instance" whereas the latter is typically
> >>> used
> >>>>>> interchangeably to mean the consumer client in the consumer group,
> >>>>>> which implies a scope of just a single StreamThread on an instance.
> >>>>>> The "Node" name here was an attempt to clear this up, since
> >>>>>> differentiating
> >>>>>> between instance and thread level is critical to understanding and
> >>>>>> properly
> >>>>>> implementing the custom assignor.
> >>>>>>
> >>>>>> I do see what you mean about there not being a concept of Node in
> the
> >>>>>> Kafka Streams codebase, and that we usually do use "instance" when
> we
> >>>>>> need to differentiate between consumer client/one StreamThread and
> >>>>>> Kafka Streams client/all StreamThreads. As I'm typing this I'm
> >>>>>> convincing
> >>>>>> myself even more that we shouldn't just use "Client" without further
> >>>>>> distinction, but I'm not sure "Node" has to be the answer either.
> >>>>>>
> >>>>>> Could we replace "Node" with "KafkaStreamsClient" or is that too
> >>> wordy?
> >>>>>> I honestly do still like Node personally, and don't see what's wrong
> >>>>>> with
> >>>>>> introducing a new term since the "node" terminology is used heavily
> >>>>>> on the broker side and it means effectively the same thing in
> theory.
> >>>>>> But if we can't compromise between "Node" and "Client" then maybe
> >>>>>> we can settle on "Instance"? (Does feel a bit wordy too...maybe
> >>>>>> "Process"?)
> >>>>>>
> >>>>>> [2] Good catch(es). Makes sense to me
> >>>>>>
> >>>>>> [3] Totally agree, a single enum makes way more sense
> >>>>>>
> >>>>>> [4] Here again I can provide some background -- this is actually
> >>>>>> following
> >>>>>> a pattern that we used when refactoring the old PartitionAssignor
> into
> >>>>>> the new (at the time) Consumer PartitionAssignor interface. The idea
> >>> was
> >>>>>> to wrap the return type to protect the assign method in case we ever
> >>>>>> wanted
> >>>>>> to add something to what was returned, such as metadata for the
> entire
> >>>>>> group. This way we could avoid a massively disruptive
> deprecation-and-
> >>>>>> migration cycle for everyone who implements a custom assignor.
> >>>>>> That said, I just checked the GroupAssignment class we added for
> this
> >>>>>> in the ConsumerPartitionAssignor interface, and to this day we've
> >>> never
> >>>>>> added anything other that the map of consumer client to assignment.
> >>>>>>
> >>>>>> So maybe that was overly cautious. I'd be ok with flattening this
> map
> >>>>>> out.
> >>>>>> I guess the question is just, can we imagine any case in which we
> >>> might
> >>>>>> want the custom assignor to return additional metadata? To be honest
> >>>>>> I think this might be more likely than with the plain consumer
> client
> >>>>>> case,
> >>>>>> but again, I'm totally fine with just flattening it to a plain map
> >>>>>> return
> >>>>>> type
> >>>>>>
> >>>>>> [5] I guess not. I think ApplicationMetadata was added during the
> >>>>>> initial
> >>>>>> KIP discussion so that's probably why it doesn't follow the same
> >>> naming
> >>>>>> pattern. Personally I'm fine either way (I do think
> >>> ApplicationMetadata
> >>>>>> sounds a bit better but that's not a good enough reason :P)
> >>>>>>
> >>>>>> Thanks Bruno!
> >>>>>>
> >>>>>> On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna <cado...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> sorry, I am late to the party.
> >>>>>>>
> >>>>>>> I have a couple of comments:
> >>>>>>>
> >>>>>>> (1)
> >>>>>>> I would prefer Client* instead of Node* in the names. In Kafka
> >>> Streams
> >>>>>>> we do not really have the concept of node but we have the concept
> of
> >>>>>>> client (admittedly, we sometimes also use instance). I would like
> to
> >>>>>>> avoid introducing a new term to basically describe the Streams
> >>> client.
> >>>>>>> I know that we already have a ClientState but that would be in a
> >>>>>>> different package.
> >>>>>>>
> >>>>>>> (2)
> >>>>>>> Did you consider to use Instant instead of long as return type of
> >>>>>>> followupRebalanceDeadline()? Instant is a bit more flexible and
> >>>>>>> readable
> >>>>>>> as a plain long, IMO. BTW, you list followupRebalanceDeadline()
> >>>>>>> twice in
> >>>>>>> interface NodeAssignment.
> >>>>>>>
> >>>>>>> (3)
> >>>>>>> Did you consider to use an enum instead of class AssignedTask? As
> >>>>>>> far as
> >>>>>>> I understand not all combinations are possible. A stateless standby
> >>>>>>> task
> >>>>>>> does not exist. An enum with values STATELESS, STATEFUL, STANDBY
> >>> would
> >>>>>>> be clearer. Or even better instead of two methods in AssignedTask
> >>> that
> >>>>>>> return a boolean you could have one method -- say type() -- that
> >>>>>>> returns
> >>>>>>> the enum.
> >>>>>>>
> >>>>>>> (4)
> >>>>>>> Does the return type of assignment need to be a map from task ID to
> >>>>>>> AssignedTask? Wouldn't it be enough to be a collection of
> >>> AssignedTasks
> >>>>>>> with AssignedTask containing the task ID?
> >>>>>>>
> >>>>>>> (5)
> >>>>>>> I there a semantic difference between *State and *Metadata? I was
> >>>>>>> wondering whether ApplicationMetadata could also be
> ApplicationState
> >>>>>>> for
> >>>>>>> the sake of consistency.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>>
> >>>>>>> On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
> >>>>>>>> Cool, looks good to me!
> >>>>>>>>
> >>>>>>>> Seems like there is no further feedback, so maybe we can start to
> >>> call
> >>>>>>> for
> >>>>>>>> a vote?
> >>>>>>>>
> >>>>>>>> However, since as noted we are setting aside time to discuss this
> >>>>>>>> during
> >>>>>>>> the sync next Thursday, we can also wait until after that meeting
> to
> >>>>>>>> officially kick off the vote.
> >>>>>>>>
> >>>>>>>> On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <
> >>> desai.p.ro...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for the feedback Sophie!
> >>>>>>>>>
> >>>>>>>>> re1: Totally agree. The fact that it's related to the partition
> >>>>>>> assignor is
> >>>>>>>>> clear from just `task.assignor`. I'll update.
> >>>>>>>>> re3: This is a good point, and something I would find useful
> >>>>>>> personally. I
> >>>>>>>>> think its worth adding an interface that lets the plugin observe
> >>> the
> >>>>>>> final
> >>>>>>>>> assignment. I'll add that.
> >>>>>>>>> re4: I like the new `NodeAssignment` type. I'll update the KIP
> with
> >>>>>>> that.
> >>>>>>>>>
> >>>>>>>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai <
> >>> desai.p.ro...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for the feedback so far! I think pretty much all of it is
> >>>>>>>>>> reasonable. I'll reply to it inline:
> >>>>>>>>>>
> >>>>>>>>>>> 1. All the API logic is granular at the Task level, except the
> >>>>>>>>>> previousOwnerForPartition func. I’m not clear what’s the
> >>> motivation
> >>>>>>>>> behind
> >>>>>>>>>> it, does our controller also want to change how the
> >>>>>>>>>> partitions->tasks
> >>>>>>>>>> mapping is formed?
> >>>>>>>>>> You're right that this is out of place. I've removed this method
> >>> as
> >>>>>>> it's
> >>>>>>>>>> not needed by the task assignor.
> >>>>>>>>>>
> >>>>>>>>>>> 2. Just on the API layering itself: it feels a bit weird to
> have
> >>>>>>>>>>> the
> >>>>>>>>>> three built-in functions (defaultStandbyTaskAssignment etc)
> >>>>>>>>>> sitting in
> >>>>>>>>> the
> >>>>>>>>>> ApplicationMetadata class. If we consider them as some default
> >>> util
> >>>>>>>>>> functions, how about introducing moving those into their own
> >>> static
> >>>>>>> util
> >>>>>>>>>> methods to separate from the ApplicationMetadata “fact objects”
> ?
> >>>>>>>>>> Agreed. Updated in the latest revision of the kip. These have
> been
> >>>>>>> moved
> >>>>>>>>>> to TaskAssignorUtils
> >>>>>>>>>>
> >>>>>>>>>>> 3. I personally prefer `NodeAssignment` to be a read-only
> object
> >>>>>>>>>> containing the decisions made by the assignor, including the
> >>>>>>>>>> requestFollowupRebalance flag. For manipulating the half-baked
> >>>>>>>>>> results
> >>>>>>>>>> inside the assignor itself, maybe we can just be flexible to let
> >>>>>>>>>> users
> >>>>>>>>> use
> >>>>>>>>>> whatever struts / their own classes even, if they like. WDYT?
> >>>>>>>>>> Agreed. Updated in the latest version of the kip.
> >>>>>>>>>>
> >>>>>>>>>>> 1. For the API, thoughts on changing the method signature to
> >>>>>>>>>>> return a
> >>>>>>>>>> (non-Optional) TaskAssignor? Then we can either have the default
> >>>>>>>>>> implementation return new HighAvailabilityTaskAssignor or just
> >>>>>>>>>> have a
> >>>>>>>>>> default implementation class that people can extend if they
> don't
> >>>>>>>>>> want
> >>>>>>> to
> >>>>>>>>>> implement every method.
> >>>>>>>>>> Based on some other discussion, I actually decided to get rid of
> >>> the
> >>>>>>>>>> plugin interface, and instead use config to specify individual
> >>>>>>>>>> plugin
> >>>>>>>>>> behaviour. So the method you're referring to is no longer part
> of
> >>>>>>>>>> the
> >>>>>>>>>> proposal.
> >>>>>>>>>>
> >>>>>>>>>>> 3. Speaking of ApplicationMetadata, the javadoc says it's read
> >>> only
> >>>>>>> but
> >>>>>>>>>> theres methods that return void on it? It's not totally clear to
> >>>>>>>>>> me how
> >>>>>>>>>> that interface is supposed to be used by the assignor. It'd be
> >>>>>>>>>> nice if
> >>>>>>> we
> >>>>>>>>>> could flip that interface such that it becomes part of the
> output
> >>>>>>> instead
> >>>>>>>>>> of an input to the plugin.
> >>>>>>>>>> I've moved those methods to a util class. They're really utility
> >>>>>>> methods
> >>>>>>>>>> the assignor might want to call to do some default or optimized
> >>>>>>>>> assignment
> >>>>>>>>>> for some cases like rack-awareness.
> >>>>>>>>>>
> >>>>>>>>>>> 4. We should consider wrapping UUID in a ProcessID class so
> that
> >>> we
> >>>>>>>>>> control
> >>>>>>>>>> the interface (there are a few places where UUID is directly
> >>> used).
> >>>>>>>>>> I like it. Updated the proposal.
> >>>>>>>>>>
> >>>>>>>>>>> 5. What does NodeState#newAssignmentForNode() do? I thought the
> >>>>>>>>>>> point
> >>>>>>>>>> was
> >>>>>>>>>> for the plugin to make the assignment? Is that the result of the
> >>>>>>> default
> >>>>>>>>>> logic?
> >>>>>>>>>> It doesn't need to be part of the interface. I've removed it.
> >>>>>>>>>>
> >>>>>>>>>>> re 2/6:
> >>>>>>>>>>
> >>>>>>>>>> I generally agree with these points, but I'd rather hash that
> out
> >>>>>>>>>> in a
> >>>>>>> PR
> >>>>>>>>>> than in the KIP review, as it'll be clearer what gets used how.
> It
> >>>>>>> seems
> >>>>>>>>> to
> >>>>>>>>>> me (committers please correct me if I'm wrong) that as long as
> >>>>>>>>>> we're on
> >>>>>>>>> the
> >>>>>>>>>> same page about what information the interfaces are returning,
> >>>>>>>>>> that's
> >>>>>>> ok
> >>>>>>>>> at
> >>>>>>>>>> this level of discussion.
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai
> >>>>>>>>>> <desai.p.ro...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello All,
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to start a discussion on KIP-924 (
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
> >>>>>>>>> )
> >>>>>>>>>>> which proposes an interface to allow users to plug into the
> >>> streams
> >>>>>>>>>>> partition assignor. The motivation section in the KIP goes into
> >>>>>>>>>>> some
> >>>>>>>>> more
> >>>>>>>>>>> detail on why we think this is a useful addition. Thanks in
> >>> advance
> >>>>>>> for
> >>>>>>>>>>> your feedback!
> >>>>>>>>>>>
> >>>>>>>>>>> Best Regards,
> >>>>>>>>>>>
> >>>>>>>>>>> Rohan
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>
> >>
> >
>

Reply via email to