Now to respond to Matthias:

FYI, I'm following the numbering scheme from your email but added **** to
mark responses with further questions or feedback and/or aren't yet
addressed in the KIP and need to be followed up on. You can more or less
just skip over the ones without stars to save time

100: I think this is leftover from a previous approach we considered.
Removed this line

101: I agree. Perhaps we hadn't fully committed to this decision when the
KIP was first written ;P Added a "Consumer Assignments" section under
"Public Changes" to address this more carefully and explicitly

102: fixed

103: fixed

****104: I do agree with Bruno on the structure of this callback name, ie
that it should start with "on", but any of the suggestions with that sound
good to me. I really don't feel too strongly but just to throw in a bit of
extra context, there is an analogous callback on the
ConsumerPartitionAssignor that is called #onAssignment. So personally I
would slightly prefer to just call it #onAssignment. However I'm happy to
go with whatever the consensus is -- @Bruno/@Matthias WDYT?

105: done (fyi I did leave the config doc string which is technically a
private variable but is part of the public contract)

106: good point about the numKafkaStreamsClients and toString methods --
removed those

****107: I guess this reflects how long it's been since the KIP was first
written :P But that's a fair point -- and yes, we should write this in a
forward looking manner. However, we can't build it in a way that's so
forward-looking that it doesn't work for the current version. Are you
proposing to remove this API altogether, or just rename it to
#numConsumerClients (or something like that) and update the javadocs
accordingly?  Assuming the latter, I totally agree, and have made the
change. But we definitely can't just remove it altogether (it may even be
relevant in the new threading model if we eventually allow configuring the
number of consumer clients independently of the processing threads -- but
that's a different conversation. The important thing is that this KIP be
compatible with the old/current threading model, in which case we need this
API).
Anyways, please take a look at the new javadocs and method name and lmk if
that makes sense to you

****108: The #followupRebalanceDeadline allows the custom assignor to
request followup rebalances, for example in order to probe for restoration
progress or other conditions for task assignment. This is fundamental to
the HighAvailabilityTaskAssignor (ie the default assignor) and may be
useful to custom assignors with similar such approaches. So it's definitely
necessary -- are you asking why we have it at all, or why it's an API on
the KafkaStreamsAssignment class and not, say, the TaskAssignment class?

109: Makes sense to me -- done

110: ack -- updated all mentions of "node" to "KafkaStreams client"

111: ack -- updated name from "consumer" to "consumerClientId"

112: that's fair -- I do think it's valuable to have "#allTasks" since some
assignors may not care about the stateful vs stateless distinction, but
it's weird to have #statefulTasks without #statelessTasks. Let's just have
all three. Added to the KIP

113: this makes sense to me. Updated computeTaskLags  to be an input
parameter instead of a mutating API. Also noted that it can throw a
TimeoutException in this case (this is relevant for point 115 below)

114: fixed

****115: Reasonable question -- I think the way it's described right now is
a bit awkward, and there's a better way to approach the issue. Ultimately
the "issue" is how we can handle failures in things like the task lag
computation, which notably makes a remote call to the brokers and can/has
been known to fail at times. Right now if this API fails, the
StreamsPartitionAssignor will just return the same assignment as the
previous one and trigger an immediate followup rebalance. This exception
was meant to be a "utility" that can be thrown to indicate to the
StreamsPartitionAssignor to just return the old assignment and trigger an
immediate followup.
That said, this exception does feel like an awkward way to do it,
especially since the TaskAssignor can already do all of this via native
APIs: it can request a followup rebalance (and better yet, determine for
itself what a reasonable retry/backoff interval would be). It can also just
return the same assignment -- the only issue is that implementing this is
kind of annoying. So I would propose that instead of throwing a
RetryableException, we should just add an additional utility method to the
TaskAssignmentUtils that does this "fallback" assignment and returns the
same tasks to their previous clients.
Added TaskAssignmentUtils#identityAssignment for this (though I'm happy to
take other name suggestions)

That's a long paragraph, sorry -- but hopefully it makes sense?

116: fair point. Updated the wording -- hopefully it makes more sense now.

****117: This is an interesting question. I think it's worth looking at
each of the cases you listed individually (and thinking about other cases
we might want to allow/disallow):

117a. Task assigned to two different clients: invalid assignment (but
should we enforce this?)
Imo we should look to the analogous ConsumerPartitionAssignor for this
question, which does not do any verification of the basic "each partition
is assigned to exactly one consumer" requirement) In the end this is an
advanced feature, and we should trust users to correctly implement and test
their custom assignors. So I would personally opt not to do any
post-assignment verification -- although I'm flexible on this if anyone
else feels strongly.

117b. Task not assigned to any client: valid assignment
Imo this should be allowed and could be used to implement some interesting
features. For example let's say you wanted to introduce a "pause" or
checkpoint-like feature, wherein Streams will stop processing new records
from the input topics but will continue processing everything downstream
until all intermediary records are flushed, ie the repartition topics are
drained. One way to do this would be to trigger a rebalance and assign only
downstream subtopologies, so that tasks which read from the input topics
aren't processed (until it's "restarted").

117c. Active and standby on the same KafkaStreams client: valid(?)
assignment
I'm not quite convinced either way on this, but think there could be edge
cases for which advanced users might want to have an active and standby
version of an in-memory store on the same client. Obviously this doesn't
make sense if you scale in/out by adding/removing entire KafkaStreams
clients, but those who scale up/down by adding/removing single
StreamThreads would benefit from relaxing this constraint.

117d. Unknown ProcessId: invalid assignment (should be enforced)
This one definitely sounds the most like a clear contract violation and in
fact, would make it impossible for the StreamsPartitionAssignor to
correctly process the returned KafkaStreamsAssignment into a set of
individual consumer assignments. So we almost certainly need to throw an
exception here/call out an error here. Maybe we can use the #onAssignment
callback to return an error code as well? WDYT?

118. Agreed, right now it's a bit hard to tell which APIs are for the user
to access, and which ones are for them to implement. Updated the KIP to
make this more clear by breaking up the classes into "User APIs" and
"Read-only APIs" (with a short description of each category).

119. Good point, added a public constructor for AssignedTask

120. Also good point, gave the #onAssignment callback a default no-op
implementation


On Wed, Apr 24, 2024 at 2:32 PM Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> Responding to Bruno first:
>
> (1) I actually think "KafkaStreams" is exactly right here -- for the
> reason you said, ultimately this is describing a literal instance of the
> "KafkaStreams" class. Glad we hashed this out! (I saw that Rohan went with
> StreamsClient but i also prefer KafkaStreams)
>
> (4) Rohan is  right about what I was saying -- but I'm now realizing that
> I completely misinterpreted what your concern was. Sorry for the
> long-winded and ultimately irrelevant answer. I'm completely fine with
> having the return type be a simple Set with additional info such as TaskId
> in the AssignedTask class (and I see Rohan already made this change so
> we're all good)
>
> (5) I don't insist either way :)   ApplicationState works for me
>
> On Fri, Apr 19, 2024 at 9:37 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> One more thing. It might be good to clearly call out, which interfaced a
>> user would implement, vs the other ones Kafka Streams implements and
>> TaskAssignor only uses.
>>
>> My understanding is, that users would implement `TaskAssignor`,
>> `TaskAssignment`, and `StreamsClientAssignment`.
>>
>> For `AssignedTask` it seems that users would actually only need to
>> instantiate them. Should we add a public constructor?
>>
>> Also wondering if we should add an empty default implementation for
>> `onAssignmentComputed()` as it seems not to be strictly necessary to use
>> this method?
>>
>>
>> -Matthias
>>
>> On 4/19/24 7:30 PM, Matthias J. Sax wrote:
>> > Great KIP. I have some minor comments/questions:
>> >
>> >
>> > 100 The KIP says: "In the future, additional plugins can use the same
>> > partition.assignor  prefix". What does this mean?
>> >
>> >
>> > 101 (nit) The KIP says: "Note that the thread-level assignment will
>> > remain an un-configurable internal implementation detail of the
>> > partition assignor (see "Rejected Alternatives" for further thoughts
>> and
>> > reasoning)." -- When I was reading this the first time, I did not
>> > understand it, and it did only become clear later (eg while reading the
>> > discussion thread). I think it would be good to be a little bit more
>> > explicit, because this is not just some minor thing, but a core design
>> > decision (which I, btw, support).
>> >
>> >
>> > 102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).
>> >
>> >
>> > 103 (nit): "new non-internal package" -> replace 'non-internal' with
>> > 'public' :)
>> >
>> >
>> > 104: Method name `TaskAssignor#onAssignmentComputed()` -> the name
>> seems
>> > to be a little bit clumsy? I kinda like the original
>> `finalAssignment()`
>> > -- I would also be happy with `onFinalAssignment` to address Bruno's
>> > line of thinking (which I think is a good call out). (Btw:
>> > `finalAssignment` is still used in the text on the KIP and should also
>> > be updated.)
>> >
>> >
>> > 105: Please remove all `private` variables. We should only show public
>> > stuff on the KIP. Everything else is an implementation detail.
>> >
>> >
>> > 106: `TaskAssignment#numStreamsClients()` -- why do we need this
>> method?
>> > Seems calling `assignment()` gives as a collection and we can just call
>> > size() on it to get the same value? -- Also, why do we explicitly call
>> > out the overwrite of `toString()`; seems unnecessary?
>> >
>> >
>> > 107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the
>> > number of StreamThreads on this client, which is equal to the number of
>> > main consumers and represents its overall capacity." -- Given our
>> > planned thread refactoring, this might not hold correct for long (and I
>> > am sure we will forget to updated the JavaDocs later). Talking to Lucas
>> > the plan is to cut down `StreamsThread` to host the consumer (and there
>> > will be only one, and it won't be configurable any longer), and we
>> would
>> > introduce a number of configurable "processing threads". Can/should we
>> > build this API in a forward looking manner?
>> >
>> >
>> > 108: Why do we need
>> > `StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how
>> > this would be useful?
>> >
>> >
>> > 109 `StreamsClientState#consumers`: should we rename this to
>> > `#consumerClientIds()`?
>> >
>> >
>> > 110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc
>> > says 'owned by consumers on this node' -- Should we just say `owned by
>> > the Streams client`?
>> >
>> >
>> > 111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer`
>> > parameter -- not clear what this is -- I guess it's a consumer's
>> > client.id? If yes, should we rename the parameter `consumerClientId`?
>> >
>> >
>> > 112 `ApplicationState`: what is the reason to have `allTasks()` and
>> > `stafefulTasks() -- why not have `statelessTasks()` and
>> > `statefulTasks()` instead? Or all three?
>> >
>> >
>> > 113 `ApplicationState#computeTaskLags()`: I understand the
>> indent/reason
>> > why we have this one, but it seems to be somewhat difficult to use
>> > correctly, as it triggers an internal side-effect... Would it be
>> > possible to replace this method in favor of passing in a `boolean
>> > computeTaskLag` parameter into #streamClientState() instead, what might
>> > make it less error prone to use, as it seems the returned
>> > `StreamsClient` object would be modified when calling
>> #computeTaskTags()
>> > and thus both are related to each other?
>> >
>> >
>> > 114 nit/typo: `ApplicationState#streamsClientStates()` returns
>> > `StreamsClientState` not `StreamsClient`.
>> >
>> >
>> > 115 `StreamsAssignorRetryableException`: not sure if I fully understand
>> > the purpose of this exception.
>> >
>> >
>> > 116 "No actual changes to functionality": allowing to plug in customer
>> > TaskAssignor sounds like adding new functionality. Can we rephrase this?
>> >
>> >
>> >
>> > 117: What happens if the returned assignment is "invalid" -- for
>> > example, a task might not have been assigned, or is assigned to two
>> > nodes? Or a standby is assigned to the same node as its active? Or a
>> > `StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if
>> > this list of potential issues is complete or not...)
>> >
>> >
>> >
>> > -Matthias
>> >
>> >
>> >
>> > On 4/18/24 2:05 AM, Bruno Cadonna wrote:
>> >> Hi Sophie,
>> >>
>> >> Thanks for the clarifications!
>> >>
>> >> (1)
>> >> What about replacing Node* with KafkaStreams* or StreamsClient*? I
>> >> prefer KafkaStreams* since that class represents the Kafka Streams
>> >> client. I am also fine with KafkaStreamsClient*. I really would like
>> >> to avoid introducing a new term in Kafka Streams for which we already
>> >> have an equivalent term even if it is used on the brokers since that
>> >> is a different level of abstraction. Additionally, I have never been a
>> >> big fan of the term "instance".
>> >>
>> >> (4)
>> >> I think the question is if we need to retrieve assignment metadata by
>> >> task for a Kafka client or if it is enough to iterate over the
>> >> assigned tasks. Could you explain why we cannot add additional
>> >> metadata to the class AssignedTask?
>> >> The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) )
>> >> could be something like
>> >>
>> >> public interface NodeAssignment {
>> >>      ProcessID processId();
>> >>
>> >>      Instant followupRebalanceDeadline();
>> >>
>> >>      Set<AssignedTask> assignment();
>> >>
>> >>      enum AssignedTaskType {
>> >>      STATELESS,
>> >>          STATEFUL,
>> >>          STANDBY
>> >>      }
>> >>
>> >>      static class AssignedTask {
>> >>          AssignedTaskType type();
>> >>          TaskId id();
>> >>
>> >>          ... other metadata needed in future
>> >>      }
>> >> }
>> >> If we need to retrieve assigned task by task ID, maybe it is better to
>> >> add methods like assignedFor(TaskId) and not to expose the Map.
>> >>
>> >> (5)
>> >> I am in favor of ApplicationState but I am also fine
>> >> ApplicationMetadata if you insist.
>> >>
>> >> (6)
>> >> Is
>> >>
>> >> void finalAssignment(GroupAssignment assignment, GroupSubscription
>> >> subscription);
>> >>
>> >> kind of a callback? If yes, would it make sense to call it
>> >> onAssignmentComputed()?
>> >>
>> >>
>> >> (7)
>> >> What do you think of changing the TaskAssignmentUtils signatures to
>> >>
>> >> public static TaskAssignment default*Assignment(final ApplicationState
>> >> applicationState, final TaskAssignment taskAssignment, ...) {...}
>> >>
>> >> to avoid to mutate the assignment in place?
>> >>
>> >>
>> >> Best,
>> >> Bruno
>> >>
>> >> On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote:
>> >>> Thanks Bruno! I can provide a bit of context behind some of these
>> >>> decisions but I just want to say up front that I agree with every
>> >>> single one
>> >>> of your points, though I am going to push back a bit on the first one.
>> >>>
>> >>> [1] The idea here is to help avoid some confusion around the
>> overloaded
>> >>> term "client", which can mean either "an instance of Kafka Streams" or
>> >>> "a consumer/producer client". The problem is that the former applies
>> to
>> >>> the entire Streams process and therefore should be interpreted as "all
>> >>> of the StreamThread on an instance" whereas the latter is typically
>> used
>> >>> interchangeably to mean the consumer client in the consumer group,
>> >>> which implies a scope of just a single StreamThread on an instance.
>> >>> The "Node" name here was an attempt to clear this up, since
>> >>> differentiating
>> >>> between instance and thread level is critical to understanding and
>> >>> properly
>> >>> implementing the custom assignor.
>> >>>
>> >>> I do see what you mean about there not being a concept of Node in the
>> >>> Kafka Streams codebase, and that we usually do use "instance" when we
>> >>> need to differentiate between consumer client/one StreamThread and
>> >>> Kafka Streams client/all StreamThreads. As I'm typing this I'm
>> >>> convincing
>> >>> myself even more that we shouldn't just use "Client" without further
>> >>> distinction, but I'm not sure "Node" has to be the answer either.
>> >>>
>> >>> Could we replace "Node" with "KafkaStreamsClient" or is that too
>> wordy?
>> >>> I honestly do still like Node personally, and don't see what's wrong
>> >>> with
>> >>> introducing a new term since the "node" terminology is used heavily
>> >>> on the broker side and it means effectively the same thing in theory.
>> >>> But if we can't compromise between "Node" and "Client" then maybe
>> >>> we can settle on "Instance"? (Does feel a bit wordy too...maybe
>> >>> "Process"?)
>> >>>
>> >>> [2] Good catch(es). Makes sense to me
>> >>>
>> >>> [3] Totally agree, a single enum makes way more sense
>> >>>
>> >>> [4] Here again I can provide some background -- this is actually
>> >>> following
>> >>> a pattern that we used when refactoring the old PartitionAssignor into
>> >>> the new (at the time) Consumer PartitionAssignor interface. The idea
>> was
>> >>> to wrap the return type to protect the assign method in case we ever
>> >>> wanted
>> >>> to add something to what was returned, such as metadata for the entire
>> >>> group. This way we could avoid a massively disruptive deprecation-and-
>> >>> migration cycle for everyone who implements a custom assignor.
>> >>> That said, I just checked the GroupAssignment class we added for this
>> >>> in the ConsumerPartitionAssignor interface, and to this day we've
>> never
>> >>> added anything other that the map of consumer client to assignment.
>> >>>
>> >>> So maybe that was overly cautious. I'd be ok with flattening this map
>> >>> out.
>> >>> I guess the question is just, can we imagine any case in which we
>> might
>> >>> want the custom assignor to return additional metadata? To be honest
>> >>> I think this might be more likely than with the plain consumer client
>> >>> case,
>> >>> but again, I'm totally fine with just flattening it to a plain map
>> >>> return
>> >>> type
>> >>>
>> >>> [5] I guess not. I think ApplicationMetadata was added during the
>> >>> initial
>> >>> KIP discussion so that's probably why it doesn't follow the same
>> naming
>> >>> pattern. Personally I'm fine either way (I do think
>> ApplicationMetadata
>> >>> sounds a bit better but that's not a good enough reason :P)
>> >>>
>> >>> Thanks Bruno!
>> >>>
>> >>> On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna <cado...@apache.org>
>> >>> wrote:
>> >>>
>> >>>> Hi,
>> >>>>
>> >>>> sorry, I am late to the party.
>> >>>>
>> >>>> I have a couple of comments:
>> >>>>
>> >>>> (1)
>> >>>> I would prefer Client* instead of Node* in the names. In Kafka
>> Streams
>> >>>> we do not really have the concept of node but we have the concept of
>> >>>> client (admittedly, we sometimes also use instance). I would like to
>> >>>> avoid introducing a new term to basically describe the Streams
>> client.
>> >>>> I know that we already have a ClientState but that would be in a
>> >>>> different package.
>> >>>>
>> >>>> (2)
>> >>>> Did you consider to use Instant instead of long as return type of
>> >>>> followupRebalanceDeadline()? Instant is a bit more flexible and
>> >>>> readable
>> >>>> as a plain long, IMO. BTW, you list followupRebalanceDeadline()
>> >>>> twice in
>> >>>> interface NodeAssignment.
>> >>>>
>> >>>> (3)
>> >>>> Did you consider to use an enum instead of class AssignedTask? As
>> >>>> far as
>> >>>> I understand not all combinations are possible. A stateless standby
>> >>>> task
>> >>>> does not exist. An enum with values STATELESS, STATEFUL, STANDBY
>> would
>> >>>> be clearer. Or even better instead of two methods in AssignedTask
>> that
>> >>>> return a boolean you could have one method -- say type() -- that
>> >>>> returns
>> >>>> the enum.
>> >>>>
>> >>>> (4)
>> >>>> Does the return type of assignment need to be a map from task ID to
>> >>>> AssignedTask? Wouldn't it be enough to be a collection of
>> AssignedTasks
>> >>>> with AssignedTask containing the task ID?
>> >>>>
>> >>>> (5)
>> >>>> I there a semantic difference between *State and *Metadata? I was
>> >>>> wondering whether ApplicationMetadata could also be ApplicationState
>> >>>> for
>> >>>> the sake of consistency.
>> >>>>
>> >>>> Best,
>> >>>> Bruno
>> >>>>
>> >>>>
>> >>>> On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:
>> >>>>> Cool, looks good to me!
>> >>>>>
>> >>>>> Seems like there is no further feedback, so maybe we can start to
>> call
>> >>>> for
>> >>>>> a vote?
>> >>>>>
>> >>>>> However, since as noted we are setting aside time to discuss this
>> >>>>> during
>> >>>>> the sync next Thursday, we can also wait until after that meeting to
>> >>>>> officially kick off the vote.
>> >>>>>
>> >>>>> On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai <
>> desai.p.ro...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>>> Thanks for the feedback Sophie!
>> >>>>>>
>> >>>>>> re1: Totally agree. The fact that it's related to the partition
>> >>>> assignor is
>> >>>>>> clear from just `task.assignor`. I'll update.
>> >>>>>> re3: This is a good point, and something I would find useful
>> >>>> personally. I
>> >>>>>> think its worth adding an interface that lets the plugin observe
>> the
>> >>>> final
>> >>>>>> assignment. I'll add that.
>> >>>>>> re4: I like the new `NodeAssignment` type. I'll update the KIP with
>> >>>> that.
>> >>>>>>
>> >>>>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai <
>> desai.p.ro...@gmail.com>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Thanks for the feedback so far! I think pretty much all of it is
>> >>>>>>> reasonable. I'll reply to it inline:
>> >>>>>>>
>> >>>>>>>> 1. All the API logic is granular at the Task level, except the
>> >>>>>>> previousOwnerForPartition func. I’m not clear what’s the
>> motivation
>> >>>>>> behind
>> >>>>>>> it, does our controller also want to change how the
>> >>>>>>> partitions->tasks
>> >>>>>>> mapping is formed?
>> >>>>>>> You're right that this is out of place. I've removed this method
>> as
>> >>>> it's
>> >>>>>>> not needed by the task assignor.
>> >>>>>>>
>> >>>>>>>> 2. Just on the API layering itself: it feels a bit weird to have
>> >>>>>>>> the
>> >>>>>>> three built-in functions (defaultStandbyTaskAssignment etc)
>> >>>>>>> sitting in
>> >>>>>> the
>> >>>>>>> ApplicationMetadata class. If we consider them as some default
>> util
>> >>>>>>> functions, how about introducing moving those into their own
>> static
>> >>>> util
>> >>>>>>> methods to separate from the ApplicationMetadata “fact objects” ?
>> >>>>>>> Agreed. Updated in the latest revision of the kip. These have been
>> >>>> moved
>> >>>>>>> to TaskAssignorUtils
>> >>>>>>>
>> >>>>>>>> 3. I personally prefer `NodeAssignment` to be a read-only object
>> >>>>>>> containing the decisions made by the assignor, including the
>> >>>>>>> requestFollowupRebalance flag. For manipulating the half-baked
>> >>>>>>> results
>> >>>>>>> inside the assignor itself, maybe we can just be flexible to let
>> >>>>>>> users
>> >>>>>> use
>> >>>>>>> whatever struts / their own classes even, if they like. WDYT?
>> >>>>>>> Agreed. Updated in the latest version of the kip.
>> >>>>>>>
>> >>>>>>>> 1. For the API, thoughts on changing the method signature to
>> >>>>>>>> return a
>> >>>>>>> (non-Optional) TaskAssignor? Then we can either have the default
>> >>>>>>> implementation return new HighAvailabilityTaskAssignor or just
>> >>>>>>> have a
>> >>>>>>> default implementation class that people can extend if they don't
>> >>>>>>> want
>> >>>> to
>> >>>>>>> implement every method.
>> >>>>>>> Based on some other discussion, I actually decided to get rid of
>> the
>> >>>>>>> plugin interface, and instead use config to specify individual
>> >>>>>>> plugin
>> >>>>>>> behaviour. So the method you're referring to is no longer part of
>> >>>>>>> the
>> >>>>>>> proposal.
>> >>>>>>>
>> >>>>>>>> 3. Speaking of ApplicationMetadata, the javadoc says it's read
>> only
>> >>>> but
>> >>>>>>> theres methods that return void on it? It's not totally clear to
>> >>>>>>> me how
>> >>>>>>> that interface is supposed to be used by the assignor. It'd be
>> >>>>>>> nice if
>> >>>> we
>> >>>>>>> could flip that interface such that it becomes part of the output
>> >>>> instead
>> >>>>>>> of an input to the plugin.
>> >>>>>>> I've moved those methods to a util class. They're really utility
>> >>>> methods
>> >>>>>>> the assignor might want to call to do some default or optimized
>> >>>>>> assignment
>> >>>>>>> for some cases like rack-awareness.
>> >>>>>>>
>> >>>>>>>> 4. We should consider wrapping UUID in a ProcessID class so that
>> we
>> >>>>>>> control
>> >>>>>>> the interface (there are a few places where UUID is directly
>> used).
>> >>>>>>> I like it. Updated the proposal.
>> >>>>>>>
>> >>>>>>>> 5. What does NodeState#newAssignmentForNode() do? I thought the
>> >>>>>>>> point
>> >>>>>>> was
>> >>>>>>> for the plugin to make the assignment? Is that the result of the
>> >>>> default
>> >>>>>>> logic?
>> >>>>>>> It doesn't need to be part of the interface. I've removed it.
>> >>>>>>>
>> >>>>>>>> re 2/6:
>> >>>>>>>
>> >>>>>>> I generally agree with these points, but I'd rather hash that out
>> >>>>>>> in a
>> >>>> PR
>> >>>>>>> than in the KIP review, as it'll be clearer what gets used how. It
>> >>>> seems
>> >>>>>> to
>> >>>>>>> me (committers please correct me if I'm wrong) that as long as
>> >>>>>>> we're on
>> >>>>>> the
>> >>>>>>> same page about what information the interfaces are returning,
>> >>>>>>> that's
>> >>>> ok
>> >>>>>> at
>> >>>>>>> this level of discussion.
>> >>>>>>>
>> >>>>>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai
>> >>>>>>> <desai.p.ro...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hello All,
>> >>>>>>>>
>> >>>>>>>> I'd like to start a discussion on KIP-924 (
>> >>>>>>>>
>> >>>>>>
>> >>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
>> >>>>>> )
>> >>>>>>>> which proposes an interface to allow users to plug into the
>> streams
>> >>>>>>>> partition assignor. The motivation section in the KIP goes into
>> >>>>>>>> some
>> >>>>>> more
>> >>>>>>>> detail on why we think this is a useful addition. Thanks in
>> advance
>> >>>> for
>> >>>>>>>> your feedback!
>> >>>>>>>>
>> >>>>>>>> Best Regards,
>> >>>>>>>>
>> >>>>>>>> Rohan
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>>
>

Reply via email to