Before I go into my own feedback, which I know not everyone has the time to
read, I just want to announce that we plan to go over this KIP during the
next Streams project sync on April 11. If you are interested in joining the
discussion and need an invitation, just reach out to me in a separate
thread. All are welcome.

Now on to the KIP itself...

Made a pass over the latest KIP and discussion, and had just a few (mostly
minor) points. I'll start with the trivial:

1. nit: the config name is a bit awkward with the double "assignor" in the
name, in my opinion we should either remove the "partition.assignor" part
of the name altogether

2. Just to clarify a point about the `#previousOwnerForPartition` method,
that was originally needed not because users might mix up the
task->partition mapping, but in order to ensure an assignment that
satisfies the cooperative rebalancing protocol of not assigning any
partition to a thread if it had a (different) previous owner this
generation. Since there's no thread-level assignment in this KIP, and
perhaps more importantly I believe the StreamsPartitionAssignor should
still be responsible for guaranteeing cooperative rebalancing semantics
rather than forcing this on the user's TaskAssignor, I completely agree
that we don't need the  `#previousOwnerForPartition` API any more

3. On that note, while I agree that we should leave the thread-level
assignment within a node up to the StreamsPartitionAssignor, and leave it
out of this KIP, one thing that might be valuable is the ability to read
the final thread-level assignment, for example so users could implement
metrics or other monitoring of assignment. Right now the only way to know
which partition/task is assigned to which thread is to look at the logs,
which is hardly optimal. Have you considered including some kind of API for
the StreamsPartitionAssignor to inform the TaskAssignor of the eventual
thread-level assignment that it decides on?
FWIW I am personally fine with leaving this out of the current KIP, as the
current proposal would make it easy to do this as a quick follow-up KIP. It
also would become less useful if/when we decouple the consumers from the
processing threads and do away with thread-level assignment altogether,
though it might be a while before we completely switch over, so it could
still be useful to know the thread-level assignment for a while beyond 3.8.
Just wondering if you've thought about it

4. Lastly, regarding Almog's point 2/6, I do think it would be reasonable
to consolidate the top-level APIs in NodeAssignment in favor of a single
API. For example, something like this:

public interface NodeAssignment {
  ProcessID processId();

  long followupRebalanceDeadline();

  Map<TaskId, AssignedTask> assignment();

  static class AssignedTask {
    boolean isActive;
    boolean isStateful;
  }
}

This would make things easier on the users, as they could build up this map
one task at a time, while also helping to enforce the requirement that a
Node not contain the active and standby version of the same task. The
StreamsPartitionAssignor can then loop through the map and separate the
assigned tasks into the sub-maps that it uses to perform the thread-level
assignment from there. Assuming this is what you had in mind for #2 Almog,
I'm in favor

However, I'm less in favor of combining APIs in NodeState, ie point #6 in
your/Almog's reply. If the goal is to keep things simple for users and make
it so that someone who only wanted to modify the existing assignment logic,
rather than re-build it from the ground up, could do so without a huge
hassle, then keeping all of these individual APIs would be necessary.
There's also a performance element to this, as (IIRC) most of those "extra"
APIs such as sorted tasks by lag are simply exposing pre-computed metadata
that gets attached to each node while the StreamsPartitionAssignor is
processing the rebalance inputs. Moving those to a static utility class
would mean having to recompute everything, as they are getters rather than
utilities, and due to the assignment complexity we have to be performance
sensitive. It will probably make more sense when you see it implemented.



On Sat, Nov 18, 2023 at 4:56 PM Guozhang Wang <guozhang.wang...@gmail.com>
wrote:

> Hi Rohan,
>
> I took another look at the updated wiki page and do not have any major
> questions. Regarding returning a plugin object v.s. configuring a
> plugin object, I do not have a strong opinion except that the latter
> seems more consistent with existing patterns. Just curious, any other
> motivations to go with the latter from you?
>
>
> Guozhang
>
> On Thu, Nov 9, 2023 at 11:19 PM Rohan Desai <desai.p.ro...@gmail.com>
> wrote:
> >
> > Thanks for the feedback so far! I think pretty much all of it is
> > reasonable. I'll reply to it inline:
> >
> > > 1. All the API logic is granular at the Task level, except the
> > previousOwnerForPartition func. I’m not clear what’s the motivation
> behind
> > it, does our controller also want to change how the partitions->tasks
> > mapping is formed?
> > You're right that this is out of place. I've removed this method as it's
> > not needed by the task assignor.
> >
> > > 2. Just on the API layering itself: it feels a bit weird to have the
> > three built-in functions (defaultStandbyTaskAssignment etc) sitting in
> the
> > ApplicationMetadata class. If we consider them as some default util
> > functions, how about introducing moving those into their own static util
> > methods to separate from the ApplicationMetadata “fact objects” ?
> > Agreed. Updated in the latest revision of the kip. These have been moved
> to
> > TaskAssignorUtils
> >
> > > 3. I personally prefer `NodeAssignment` to be a read-only object
> > containing the decisions made by the assignor, including the
> > requestFollowupRebalance flag. For manipulating the half-baked results
> > inside the assignor itself, maybe we can just be flexible to let users
> use
> > whatever struts / their own classes even, if they like. WDYT?
> > Agreed. Updated in the latest version of the kip.
> >
> > > 1. For the API, thoughts on changing the method signature to return a
> > (non-Optional) TaskAssignor? Then we can either have the default
> > implementation return new HighAvailabilityTaskAssignor or just have a
> > default implementation class that people can extend if they don't want to
> > implement every method.
> > Based on some other discussion, I actually decided to get rid of the
> plugin
> > interface, and instead use config to specify individual plugin behaviour.
> > So the method you're referring to is no longer part of the proposal.
> >
> > > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
> > theres methods that return void on it? It's not totally clear to me how
> > that interface is supposed to be used by the assignor. It'd be nice if we
> > could flip that interface such that it becomes part of the output instead
> > of an input to the plugin.
> > I've moved those methods to a util class. They're really utility methods
> > the assignor might want to call to do some default or optimized
> assignment
> > for some cases like rack-awareness.
> >
> > > 4. We should consider wrapping UUID in a ProcessID class so that we
> > control
> > the interface (there are a few places where UUID is directly used).
> > I like it. Updated the proposal.
> >
> > > 5. What does NodeState#newAssignmentForNode() do? I thought the point
> was
> > for the plugin to make the assignment? Is that the result of the default
> > logic?
> > It doesn't need to be part of the interface. I've removed it.
> >
> > > re 2/6:
> >
> > I generally agree with these points, but I'd rather hash that out in a PR
> > than in the KIP review, as it'll be clearer what gets used how. It seems
> to
> > me (committers please correct me if I'm wrong) that as long as we're on
> the
> > same page about what information the interfaces are returning, that's ok
> at
> > this level of discussion.
> >
> > On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai <desai.p.ro...@gmail.com>
> wrote:
> >
> > > Hello All,
> > >
> > > I'd like to start a discussion on KIP-924 (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
> )
> > > which proposes an interface to allow users to plug into the streams
> > > partition assignor. The motivation section in the KIP goes into some
> more
> > > detail on why we think this is a useful addition. Thanks in advance for
> > > your feedback!
> > >
> > > Best Regards,
> > >
> > > Rohan
> > >
> > >
>

Reply via email to