Thanks for the update Sophie!

On 5/31/24 3:41 PM, Sophie Blee-Goldman wrote:
Hi all! Coming in with hopefully the last set of minor updates. During
implementation of the new out-of-the-box assignors and the
TaskAssignmentUtils which make heavy use of the new API, we determined a
few quality-of-life improvements were needed to make some of the new
classes/methods a bit easier to use and assign tasks with. These are the
changes:

1. The return type of the ApplicationState#allTasks method was changed from
a Set<TaskInfo> to a Map<TaskId, TaskInfo>

2. Similarly, we changed the return type of KafkaStreamsAssignment#tasks
from a Set<AssignedTask> to a Map<TaskId, AssignedTask>

3. Added two new methods to KafkaStreamsAssignment to allow in-place
modification of the assigned tasks. While not strictly necessary, this
makes it possible to perform iterative assignment strategies and
post-assignment optimizations like the rack-aware algorithm without a huge
amount of code complexity and overhead from converting between mutable
Collections and the immutable KafkaStreamsAssignment. The two new methods
are #assignTask and #removeTask. You can find the full method signatures in
the KIP.

4. We realized that two of the rack-aware assignment configs -- trafficCost
and nonOverlapCost -- were exposed as an int but have a default value of
null, which would result in an NPE. We have changed these to be OptionalInt

5. Since the KafkaStreamsAssignment class holds tasks in a map keyed by
TaskId, it's actually not possible to create an assignment that hits the
error code  ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS. We
removed this error code accordingly.
Note that an IllegalStateException will still be thrown if a user does
attempt this, so the error will still be surfaced to the user. It's just
that this will happen immediately rather than after the face (which if
anything is an improvement since the stacktrace will show them exactly
where the bug in their assignment code occurred).

And that's it! Thanks all
Sophie

On Tue, May 28, 2024 at 1:36 PM Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

Ah, one more very small thing:

3. We changed the name of a KafkaStreamsAssignment method from #assignment
to just #tasks. The new signature is

  public Set<AssignedTask> tasks();

The reason for this is that the term "assignment" is used a lot already,
and if we call the object itself an "assignment" then we should refer to
the specific tasks that make up this assignment as just the "tasks"

Also, with the original name, this is a valid but very silly sounding
method call chain: TaskAssignment.assignment().get(0).assignment() (like
I said, too much "assignment" in the mix)

On Tue, May 28, 2024 at 1:13 PM Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

Hey all,

Two more quick updates to the KIP, please let me know if you have any
questions or feedback or naming suggestions:

1. We'd like to introduce an additional error code with the following
signature:
  * MISSING_PROCESS_ID: A ProcessId present in the input ApplicationState
was not present in the output TaskAssignment

2. While implementing the new TaskInfo class, specifically the
#sourceTopicPartitions and #changelogTopicPartitions APIs, we realized that
the source topic changelog optimization would create some overlap between
these two sets, which might be confusing for users as the API seems to
suggest these are disjoint sets. To make this distinction more clear, we
would like to introduce another small container class called the
TaskTopicPartition, which just contains metadata about how a TopicPartition
relates to a given task, such as whether it is a source topic and whether
it is a changelog topic. The TaskInfo API will then be simplified by
removing the separate #inputTopicPartitions, #changelogTopicPartitions, and
#partitionToRackIds methods, and replacing these with a single method:

Set<TaskTopicPartition> topicPartitions();

Please refer to the updated KIP for the complete definition of the new
TaskTopicPartition class


Thanks!
Sophie


On Wed, May 15, 2024 at 3:41 PM Sophie Blee-Goldman <
sop...@responsive.dev> wrote:

Thanks Bruno!

First, need to make one quick fix to what I said in the previous email
-- the new rackId() getter will be added to KafkaStreamsState, not
KafkaStreamsApplication (The KIP is correct, but what I put in the email
was not)

U1. I would actually prefer to keep the constructors as is, for reasons
I realize I forgot to mention. Let me know if this makes sense to you or
you would still prefer to break up the constructors anyways:

The KafkaStreamsApplication class has two required parameters and one
optional one. The required params are of course the processId and
assignment so imo it would not make sense to break these up across two
different constructors, since both have to be supplied. The
followupRebalanceDeadline on the other hand is purely optional, which is
why that one is in a separate, non-static constructor

Re: for vs of, unfortunately for is a protected keyword in java. I'm
open to other naming suggestions though. I actually personally prefer the
more direct naming style, eg something like #ofProcessIdAndAssignment (or
#forProcessIdAndAssignment if you'd prefer?) But we tend to trends towards
the shorter fluent naming style and others have mentioned a preference for
names like "#of" in the past. I'm happy with any name (any name that's
allowed in java, that is :P)

U3: This is a very good point, and in fact the existence of TaskMetadata
was why I went with TaskInfo here. I'm not super happy with it but at least
this TaskInfo is pretty well encapsulated in the assignment code and
generally won't be mixed up with the TaskMetadata/Task/etc part of the
code. If anyone would like to submit a KIP to clean all this up at some
point, I would definitely be supportive!

On Wed, May 15, 2024 at 2:44 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi,

Thank you for the updates!

I have a couple of comments.

U1
Yeah, that makes sense to me. However, could we break out the
assignment
part to make it more readable? Something like:


KafkaStreamsAssignment.for(processId).withAssignment(assignment).withFollowRebalance(rebalanceDeadline)

nits:
U1.a I slightly prefer "for" compared to "of", but I leave the decision
to others.
U1.b An alternative to "withAssignment()" could be simply "with()".


U3
Yeah, I like the TaskInfo approach. However, task metadata interfaces
start to proliferate a bit too much in our code base. We have
TaskMetadata, TaskInfo, and finally Task that provide similar methods.
I
think we should try to consolidate those interfaces. Does not need to
happen in this KIP, but we should consider it.

Best,
Bruno


On 5/15/24 6:01 AM, Sophie Blee-Goldman wrote:
Hey all,

I have a few updates to mention that have come up during the
implementation
phase. The KIP should reflect the latest proposal and what has been
merged
so far, but I'll list everything again here so that you don't have to
sift
through the entire KIP:

U1: The KafkaStreamsAssignment interface was converted to a class,
and two
public constructors were added in keeping with the fluent API used
elsewhere in Streams:


public  class KafkaStreamsAssignment {

      public static KafkaStreamsAssignment of(final ProcessId
processId,
final Set<AssignedTask> assignment);

      public KafkaStreamsAssignment withFollowupRebalance(final
Instant
rebalanceDeadline);

   }


U2: Any lag-related APIs in the KafkaStreamsState interface will
throw an
UnsupportedOperationException if the user opted out of computing the
task
lags when getting the KafkaStreamsState

U3: While refactoring the RackAwareTaskAssignor, we realized the
current
proposal was missing the requisite rack id information. We will need
to add
both the per-client rackId to the KafkaStreamsState, as well as the
per-task rack ids of all replicas hosting the topic partitions for
that
task. The former is straightforward and leads to this new method on
the
KafkaStreamsState interface:

interface KafkaStreamsAssignment {

      Optional<String> rackId();

   }


For the latter issue, we need to add the per-partition rack ids to the
ApplicationState interface, but the exact API is a bit less
straightforward
since we don't currently have any concept of a partition in the
proposed
API, instead dealing only with tasks.

option 1: https://github.com/apache/kafka/pull/15960
The easiest way to address this would be to add a single method to the
ApplicationState interface returning a map from TaskId to the rack
ids for
its partitions. To avoid a nasty multi-layered nested data structure,
we'd
also introduce a simple container for the partition to rack ids map,
with
separate maps for input topics vs changelogs (since the
RackAwareTaskAssignor needs the ability to differentiate these, and so
would the new rack-aware assignment utility methods). See the short
example
PR linked to above for the complete API being proposed in this option.

option 2: https://github.com/apache/kafka/pull/15959
While it is clear that tasks are the right level of abstraction for
the
TaskAssignor on the whole, it could be argued that the topic partition
information might be valuable to a more sophisticated assignor. So
another
option would be to go all-in and create a new metadata class for each
task
that exposes essential and useful information: eg the set of input
partitions and changelog partitions belonging to each task and the
mapping
of partition to rackIds, and perhaps also whether it is stateful and
the
names of any state stores for that TaskId. This would also allow us to
simplify the ApplicationState interface to return just a single set of
tasks with all metadata encapsulated in the task, rather than having
to
offer a separate API for stateful vs stateless tasks to differentiate
the
two. See the example PR for the full proposal and changes to the
existing
API

I personally am slightly in favor of option #2 (pull/15959
<https://github.com/apache/kafka/pull/15959>) as I believe including
general task metadata may be useful and this API would be easy to
evolve if
we wanted to add anything else in a future KIP. The current KIP was
updated
using this option, although nothing related to the rack ids has been
merged
yet. We're happy to defer to anyone with a strong preference for
either of
these options, or a new suggestion of their own.

As always, let us know if you have any questions or concerns or
feedback of
any kind.

Thanks!


On Mon, May 6, 2024 at 1:33 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

Thanks guys. Updated the error codes in both the code and the
explanation
under "Public Changes". To sum up, here are the error codes listed
in the
KIP:

enum AssignmentError {
      NONE,
      ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
      ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
      INVALID_STANDBY_TASK,
      UNKNOWN_PROCESS_ID,
      UNKNOWN_TASK_ID
}

Anything missing?

(also updated all the code block headings, thanks for noticing that
Bruno)

On Fri, May 3, 2024 at 9:33 AM Matthias J. Sax <mj...@apache.org>
wrote:

117f: Good point by Bruno. We should check for this, and could have
an
additional `INVALID_STANDBY_TASK` error code?


-Matthias

On 5/3/24 5:52 AM, Guozhang Wang wrote:
Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough
to
avoid the error Guozhang mentioned. The StreamsPartitionAssignor
passes
the information whether a task is stateless or stateful into the
task
assignor. However, the task assignor can return a standby task
for a
stateless task which is inconsistent.

Echoing Matthias' statement about the missing UNKNOWN_TASK_ID
error.

nit:
The titles of some code blocks in the KIP are not consistent with
their
content, e.g., KafkaStreamsState <-> NodeState


Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:
Thanks Sophie. My bad. You are of course right about
`TaskAssignment`
and the StreamsPartitionAssignor's responsibitliy to map tasks
of a
instance to consumers. When I wrote my reply, I forgot about this
detail.

Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by
Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:
Guozhang:

117. All three additions make sense to me. However, while
thinking
about
how users would actually produce an assignment, I realized that
it
seems
silly to make it their responsibility to distinguish between a
stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful
vs
stateless, so there's no need to add this extra step for users
to
figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent
task
types",
I'm proposing to just flatten the AssignedTask.Type enum to only
"ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
altogether.
Any objections?

-----

-Thanks, fixed the indentation of headers under "User APIs" and
"Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I
don't
personally feel too strongly about this, but the reason for the
return
type
being a Map<ProcessId, KafkaStreamsAssignment> rather than a
TaskAssignment
is because they are meant to be used iteratively/to create a
part of
the
full assignment, and not necessarily a full assignment for each.
Notice
that they all have an input parameter of the same type:
Map<ProcessId,
KafkaStreamsAssignment>. The idea is you can take the output of
any
of
these and pass it in to another to generate or optimize another
piece of
the overall assignment. For example, if you want to perform the
rack-aware
optimization on both active and standby tasks, you would need
to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If
we
return a
TaskAssignment, it will usually need to be unwrapped right away.
Perhaps
more importantly, I worry that returning a TaskAssignment will
make
it
seem
like each of these utility methods return a "full" and final
assignment
that can just be returned as-is from the TaskAssignor's #assign
method.
Whereas they are each just a single step in the full assignment
process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman
<sop...@responsive.dev>
wrote:

Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't
think of
anything
better.  I like your proposals though, will update the KIP
names.
I'll also
add a "NONE" option as well -- much better than just passing
in null
for no
error.

OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned
with
the
same active task

     Would also be an error if assigned to two consumers of the
same
client...
Needs to be rephrased.


Well the TaskAssignor only assigns tasks to KafkaStreams
clients,
it's not
responsible for the assignment of tasks to consumers within a
KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it
received a
valid
assignment from the TaskAssignor with only one copy of a task
assigned to a
single KAfkaStreams client, and then somehow ended up
assigning that
task
to multiple consumers on the KafkaStreams client. It wouldn't
be the
TaskAssignor's fault so imo it would not make sense to include
this
case in
the OVERLAPPING_CLIENT error (or as it's now called,
ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug
that
caused
the StreamsPartitionAssignor to assign a task to multiple
consumers, it
presumably wouldn't even notice since it's a bug -- if it did
notice, it
can just fix the issue. The error codes are about communicating
unfixable
issues due to the TaskAssignor itself returning an invalid
assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor
should
handle/react to invalid assignments though. I'm fine with just
throwing a
StreamsException and crashing after we invoke the
#onAssignmentComputed
callback to notify the user of the error.

On Wed, May 1, 2024 at 9:46 AM Guozhang Wang
<guozhang.wang...@gmail.com>
wrote:

Jumping back to the party here :)

107: I agree with the rationale behind this, and
`numProcessingThreads` looks good to me as it covers both the
current
and future scenarios.

117: I agree with Lucas and Bruno, and would add:
      * 117e: unknown taskID: fail
      * 117f: inconsistent task types (e.g. a known taskID was
indicated
stateless from ApplicationState, but the returned AssignedTask
states
stateful): fail
      * 117g: some ProcessID was not included in the returned
Set:
pass,
and interprets it as no tasks assigned to it.

And I'm open for any creative error codes folks would come up
with
:)

If any of these errors are detected, the
StreamsPartitionAssignor
will
immediately "fail" the rebalance and retry it by scheduling an
immediate
followup rebalance.

I'm also a bit concerned here, as such endless retry loops
have
happened in the past in my memory. Given that we would likely
see
most
of the user implementations be deterministic, I'm also leaning
towards
failing the app immediately and let the crowd educates us if
there
are
some very interesting scenarios out there that are not on our
radar to
re-consider this, rather than getting hard to debug cases in
the
dark.

-----

And here are just some nits about the KIP writings itself:

* I think some bullet points under `User APIs` and `Read-only
APIs`
should have a lower level indention? It caught me for a sec
until I
realized there are just two categories.

* In TaskAssignmentUtils , why not let those util functions
return
`TaskAssignment` (to me it feels more consistent with the user
APIs),
but instead return a Map<ProcessID, KafkaStreamsAssignment>?


Guozhang

On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax <
mj...@apache.org>
wrote:

I like the idea of error codes. Not sure if the name are
ideal?
UNKNOWN_PROCESS_ID makes sense, but the other two seems a
little
bit
difficult to understand?

Should we be very descriptive (and also try to avoid
coupling it
to
the
threading model -- important for the first error code):
      - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
      - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or
_INSTANCE

I think we also need to add NONE as option or make the error
parameter
an `Optional`?


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned
with
the
same active task

Would also be an error if assigned to two consumers of the
same
client... Needs to be rephrased.



If any of these errors are detected, the
StreamsPartitionAssignor
will immediately "fail" the rebalance and retry it by
scheduling an
immediate followup rebalance.

Does this make sense? If we assume that the task-assignment
is
deterministic, we would end up with an infinite retry loop?
Also,
assuming that an client leave the group, we cannot assign
some
task
any
longer... I would rather throw a StreamsException and let the
client
crash.



-Matthias

On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:
One last thing: I added an error code enum to be returned
from
the
#onAssignmentComputed method in case of an invalid
assignment. I
created
one code for each of the invalid cases we described above.
The
downside is
that this means we'll have to go through a deprecation
cycle if
we
want to
loosen up the restrictions on any of the enforced cases. The
upside
is that
we can very clearly mark what is an invalid assignment and
this
will
(hopefully) assist users who are new to customizing
assignments
by
clearly
denoting the requirements, and returning a clear error if
they
are
not
followed.

Of course the StreamsPartitionAssignor will also do a
"fallback &
retry" in
this case by returning the same assignment to the consumers
and
scheduling
a followup rebalance. I've added all of this to the
TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under
"Public
Changes"
as well.

Please let me know if there are any concerns, or if you have
suggestions
for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

Thanks guys! I agree with what Lucas said about 117c, we
can
always
loosen
a restriction later and I don't want to do anything now
that
might
get in
the way of the new threading models.

With that I think we're all in agreement on 117. I'll
update
the KIP
to
include what we've discussed

(and will fix the remaining #finalAssignment mention as
well,
thanks
Bruno. Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna <
cado...@apache.org>
wrote:

Hi again,

I forgot to ask whether you could add the agreement about
handling
invalid assignment to the KIP.

Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:
Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be
reasonable in
some
situations
c) fail: For the reasons Lucas pointed out. I am missing
a
good
use
case
here.
d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment()
instead
of
the
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named
finalAssignment
which
is
called with the final computed GroupAssignment ..."


Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:
Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open
point
117. I'm
against any fallbacks like "use the default assignor if
the
custom
assignment is invalid", as it's just going to hide
bugs. For
the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an
implementation
bug,
and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be
allowed.
117c) This is the tricky case. However, I'm leaning
towards
not
allowing this, unless we have a concrete use case. This
will
block us
from potentially using a single consumer for active and
standby
tasks
in the future. It's easier to drop the restriction
later if
we
have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
<sop...@responsive.dev> wrote:

Yeah I think that sums it up well. Either you computed
a
*possible*
assignment,
or you returned something that makes it literally
impossible for
the
StreamsPartitionAssignor to decipher/translate into an
actual
group
assignment, in which case it should just fail

That's more or less it for the open questions that
have been
raised
so far,
so I just want to remind folks that there's already a
voting
thread
for
this. I cast my vote a few minutes ago so it should
resurface in
everyone's
inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai <
desai.p.ro...@gmail.com

wrote:

117: as Sophie laid out, there are two cases here
right:
1. cases that are considered invalid by the existing
assignors
but
are
still valid assignments in the sense that they can be
used
to
generate a
valid consumer group assignment (from the perspective
of
the
consumer group
protocol). An assignment that excludes a task is one
such
example,
and
Sophie pointed out a good use case for it. I also
think it
makes
sense to
allow these. It's hard to predict how a user might
want to
use
the
custom
assignor, and its reasonable to expect them to use it
with
care
and
not
hand-hold them.
2. cases that are not valid because it is impossible
to
compute
a
valid
consumer group assignment from them. In this case it
seems
totally
reasonable to just throw a fatal exception that gets
passed to
the
uncaught
exception handler. If this case happens then there is
some
bug
in the
user's assignor and its totally reasonable to fail the
application
in that
case. We _could_ try to be more graceful and default
to
one of
the
existing
assignors. But it's usually better to fail hard and
fast
when
there
is some
illegal state detected imo.

On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai <
desai.p.ro...@gmail.com

wrote:

Bruno, I've incorporated your feedback into the KIP
document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai <
desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback Bruno! For the most part I
think
it
makes
sense,
but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different
-
that we
might want
to wrap the return type for `assign` in a class so
that
its
easily
extensible. This makes sense to me. Whether we do
that or
not, we
can
have
the return type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the
final
assignment. I
like your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai <
desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to
the
partition
assignor
is clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would
find
useful
personally.
I think its worth adding an interface that lets the
plugin
observe the
final assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll
update
the
KIP
with
that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai
<desai.p.ro...@gmail.com>
wrote:

Thanks for the feedback so far! I think pretty
much
all of
it is
reasonable. I'll reply to it inline:

1. All the API logic is granular at the Task
level,
except
the
previousOwnerForPartition func. I’m not clear
what’s
the
motivation
behind it, does our controller also want to
change how
the
partitions->tasks mapping is formed?
You're right that this is out of place. I've
removed
this
method
as
it's not needed by the task assignor.

2. Just on the API layering itself: it feels a
bit
weird to
have the
three built-in functions
(defaultStandbyTaskAssignment
etc)
sitting in
the ApplicationMetadata class. If we consider
them as
some
default
util
functions, how about introducing moving those into
their
own
static
util
methods to separate from the ApplicationMetadata
“fact
objects” ?
Agreed. Updated in the latest revision of the kip.
These
have
been
moved to TaskAssignorUtils

3. I personally prefer `NodeAssignment` to be a
read-only
object
containing the decisions made by the assignor,
including
the
requestFollowupRebalance flag. For manipulating
the
half-baked
results
inside the assignor itself, maybe we can just be
flexible
to let
users use
whatever struts / their own classes even, if they
like.
WDYT?
Agreed. Updated in the latest version of the kip.

1. For the API, thoughts on changing the method
signature
to
return
a
(non-Optional) TaskAssignor? Then we can either
have
the
default
implementation return new
HighAvailabilityTaskAssignor
or
just
have a
default implementation class that people can
extend if
they
don't
want to
implement every method.
Based on some other discussion, I actually
decided to
get
rid of
the
plugin interface, and instead use config to
specify
individual
plugin
behaviour. So the method you're referring to is no
longer
part
of the
proposal.

3. Speaking of ApplicationMetadata, the javadoc
says
it's
read
only
but
theres methods that return void on it? It's not
totally
clear to
me
how
that interface is supposed to be used by the
assignor.
It'd
be
nice
if we
could flip that interface such that it becomes
part of
the
output
instead
of an input to the plugin.
I've moved those methods to a util class. They're
really
utility
methods the assignor might want to call to do some
default
or
optimized
assignment for some cases like rack-awareness.

4. We should consider wrapping UUID in a
ProcessID
class so
that we
control
the interface (there are a few places where UUID
is
directly
used).
I like it. Updated the proposal.

5. What does NodeState#newAssignmentForNode()
do? I
thought the
point was
for the plugin to make the assignment? Is that the
result
of the
default logic?
It doesn't need to be part of the interface. I've
removed
it.

re 2/6:

I generally agree with these points, but I'd
rather
hash
that
out in a
PR than in the KIP review, as it'll be clearer
what
gets
used
how. It
seems
to me (committers please correct me if I'm wrong)
that
as
long as
we're on
the same page about what information the
interfaces are
returning,
that's
ok at this level of discussion.

On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai
<desai.p.ro...@gmail.com>
wrote:

Hello All,

I'd like to start a discussion on KIP-924 (





https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
)
which proposes an interface to allow users to
plug
into
the
streams
partition assignor. The motivation section in
the KIP
goes
into
some
more
detail on why we think this is a useful addition.
Thanks in
advance
for
your feedback!

Best Regards,

Rohan
















Reply via email to