Hi Hao,

Thanks for the updates!

What do you think about dropping config rack.aware.assignment.enabled and add value NONE to the enum for the possible values of config rack.aware.assignment.strategy?

Best,
Bruno

On 31.05.23 23:31, Hao Li wrote:
Hi all,

I've updated the KIP based on the feedback. Major changes I made:
1. Add rack aware assignment to `StickyTaskAssignor`
2. Reject `Prefer reliability and then find optimal cost` option in standby
task assignment.


On Wed, May 31, 2023 at 12:09 PM Hao Li <h...@confluent.io> wrote:

Hi all,

Thanks for the feedback! I will update the KIP accordingly.

*For Sophie's comments:*

1 and 2. Good catch. Fixed these.

3 and 4 Yes. We can make this public config and call out the
clientConsumer config users need to set.

5. It's ideal to take the previous assignment in HAAssignor into
consideration when we compute our target assignment, the complications come
with making sure the assignment can eventually converge and we don't do
probing rebalance infinitely. It's not only about storing the previous
assignment or get it somehow. We can actually get the previous assignment
now like we do in StickyAssignor. But the previous assignment will change
in each round of probing rebalance. The proposal which added some weight to
make the rack aware assignment lean towards the original HAA's target
assignment will add benefits of stability in some corner cases in case of
tie in cross rack traffic cost. But it's not sticky. But the bottom line is
it won't be worse than current HAA's stickiness.

6. I'm fine with changing the assignor config to public. Actually, I think
we can min-cost algorithm with StickyAssignor as well to mitigate the
problem of 5. So we can have one public config to choose an assignor and
one public config to enable the rack aware assignment.

*For Bruno's comments:*

The proposal was to implement all the options and use configs to choose
them during runtime. We can make those configs public as suggested.
1, 2, 3, 4, 5: agree and will fix those.
6: subscription protocol is not changed.
7: yeah. Let me fix the notations.
8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc.
9: I'm also ok with just optimizing reliability for standby tasks. Or we
could simply run the "balance reliability over cost" greedy algorithm to
see if any cost could be reduced.
10: Make sense. Will fix the wording.
11: Make sense. Will update the test part.

*For Walker's comments:*
1. Stability for HAA is an issue. See my comments for Sophie's feedback 5
and 6. I think we could use the rack aware assignment for StickyAssignor as
well. For HAA assignments, it's less sticky and we can only shoot for
minimizing the cross rack traffic eventually when everything is stable.
2. Yeah. This is a good point and we can also turn it on for
StickyAssignor.

Thanks,
Hao


On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
ableegold...@gmail.com> wrote:

Hey Hao, thanks for the KIP!

1. There's a typo in the "internal.rack.aware.assignment.strategry"
config,
this
should be internal.rack.aware.assignment.strategy.

2.

  For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
Number of
edges E is T * N where T is the number of clients and N is the number of
Tasks. This is because a task can be assigned to any client so there
will
be an edge between every task and every client. The total complexity
would
be O(T * N) if we want to be more specific.

I feel like I'm missing something here, but if E = T * N and the
complexity
is ~O(E^2), doesn't
this make the total complexity order of O(T^2 * N^2)?

3.

Since 3.C.I and 3.C.II have different tradeoffs and work better in
different workloads etc, we

could add an internal configuration to choose one of them at runtime.

Why only an internal configuration? Same goes for
internal.rack.aware.assignment.standby.strategry (which also has the typo)

4.

  There are no changes in public interfaces.

I think it would be good to explicitly call out that users can utilize
this
new feature by setting the
ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example

5.

The idea is that if we always try to make it overlap as much with
HAAssignor’s target

assignment, at least there’s a higher chance that tasks won’t be shuffled
a
lot if the clients

remain the same across rebalances.

This line definitely gave me some pause -- if there was one major takeaway
I had after KIP-441,
one thing that most limited the feature's success, it was our assumption
that clients are relatively
stable across rebalances. This was mostly true at limited scale or for
on-prem setups, but
unsurprisingly broke down in cloud environments or larger clusters. Not
only do clients naturally
fall in and out of the group, autoscaling is becoming more and more of a
thing.

Lastly, and this is more easily solved but still worth calling out, an
assignment is only deterministic
as long as the client.id is persisted. Currently in Streams, we only
write
the process UUID to the
state directory if there is one, ie if at least one persistent stateful
task exists in the topology. This
made sense in the context of KIP-441, which targeted heavily stateful
deployments, but this KIP
presumably intends to target more than just the persistent & stateful
subset of applications. To
make matters even worse,  "persistent" is defined in a semantically
inconsistent way throughout
Streams.

All this is to say, it may sound more complicated to remember the previous
assignment, but (a)
imo it only introduces a lot more complexity and shaky assumptions to
continue down this
path, and (b) we actually already do persist some amount of state, like
the
process UUID, and
(c) it seems like this is the perfect opportunity to finally rid ourselves
of the determinism constraint
which has frankly caused more trouble and time lost in sum than it would
have taken us to just
write the HighAvailabilityTaskAssignor to consider the previous assignment
from the start in KIP-441

6.

StickyTaskAssignor  users who would like to use rack aware assignment
should upgrade their

Kafka Streams version to the version in which HighAvailabilityTaskAssignor
and rack awareness

assignment are available.

Building off of the above, the HAAssignor hasn't worked out perfectly for
everybody up until now,
given that we are only adding complexity to it now, on the flipside I
would
hesitate to try and force
everyone to use it if they want to upgrade. We added a "secret" backdoor
internal config to allow
users to set the task assignor back in KIP-441 for this reason. WDYT about
bumping this to a public
config on the side in this KIP?


On Tue, May 23, 2023 at 11:46 AM Hao Li <h...@confluent.io.invalid> wrote:

Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
it to
the test plan. I will also add tests to verify the new optimizer will
produce a balanced assignment which has no worse cross AZ cost than the
existing assignor.

Hao

On Mon, May 22, 2023 at 3:39 PM John Roesler <vvcep...@apache.org>
wrote:

Hi Hao,

Thanks for the KIP!

Overall, I think this is a great idea. I always wanted to circle back
after the Smooth Scaling KIP to put a proper optimization algorithm
into
place. I think this has the promise to really improve the quality of
the
balanced assignments we produce.

Thanks for providing the details about the MaxCut/MinFlow algorithm.
It
seems like a good choice for me, assuming we choose the right scaling
factors for the weights we add to the graph. Unfortunately, I don't
think
that there's a good way to see how easy or hard this is going to be
until
we actually implement it and test it.

That leads to the only real piece of feedback I had on the KIP, which
is
the testing portion. You mentioned system/integration/unit tests, but
there's not too much information about what those tests will do. I'd
like
to suggest that we invest in more simulation testing specifically,
similar
to what we did in


https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
.

In fact, it seems like we _could_ write the simulation up front, and
then
implement the algorithm in a dummy way and just see whether it passes
the
simulations or not, before actually integrating it with Kafka Streams.

Basically, I'd be +1 on this KIP today, but I'd feel confident about
it
if
we had a little more detail regarding how we are going to verify that
the
new optimizer is actually going to produce more optimal plans than the
existing assigner we have today.

Thanks again!
-John

On 2023/05/22 16:49:22 Hao Li wrote:
Hi Colt,

Thanks for the comments.

and I struggle to see how the algorithm isn't at least O(N) where
N
is
the number of Tasks...?

For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
Number
of
edges E is T * N where T is the number of clients and N is the
number
of
Tasks. This is because a task can be assigned to any client so there
will
be an edge between every task and every client. The total complexity
would
be O(T * N) if we want to be more specific.

But if the leaders for each partition are spread across multiple
zones,
how will you handle that?

This is what the min-cost flow solution is trying to solve? i.e.
Find
an
assignment of tasks to clients where across AZ traffic can be
minimized.
But there are some constraints to the solution and one of them is we
need
to balance task assignment first (



https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
).
So in your example of three tasks' partitions being in the same AZ
of a
client, if there are other clients, we still want to balance the
tasks
to
other clients even if putting all tasks to a single client can
result
in
0
cross AZ traffic. In



https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
section, the algorithm will try to find a min-cost solution based on
balanced assignment instead of pure min-cost.

Thanks,
Hao

On Tue, May 9, 2023 at 5:55 PM Colt McNealy <c...@littlehorse.io>
wrote:

Hello Hao,

First of all, THANK YOU for putting this together. I had been
hoping
someone might bring something like this forward. A few comments:

**1: Runtime Complexity
Klein’s cycle canceling algorithm can solve the min-cost flow
problem in
O(E^2CU) time where C is max cost and U is max capacity. In our
particular
case, C is 1 and U is at most 3 (A task can have at most 3 topics
including
changelog topic?). So the algorithm runs in O(E^2) time for our
case.

A Task can have multiple input topics, and also multiple state
stores,
and
multiple output topics. The most common case is three topics as
you
described, but this is not necessarily guaranteed. Also, math is
one
of my
weak points, but to me O(E^2) is equivalent to O(1), and I
struggle
to
see
how the algorithm isn't at least O(N) where N is the number of
Tasks...?

**2: Broker-Side Partition Assignments
Consider the case with just three topics in a Task (one input, one
output,
one changelog). If all three partition leaders are in the same
Rack
(or
better yet, the same broker), then we could get massive savings by
assigning the Task to that Rack/availability zone. But if the
leaders
for
each partition are spread across multiple zones, how will you
handle
that?
Is that outside the scope of this KIP, or is it worth introducing
a
kafka-streams-generate-rebalance-proposal.sh tool?

Colt McNealy
*Founder, LittleHorse.io*


On Tue, May 9, 2023 at 4:03 PM Hao Li <h...@confluent.io.invalid>
wrote:

Hi all,

I have submitted KIP-925 to add rack awareness logic in task
assignment
in
Kafka Streams and would like to start a discussion:






https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams

--
Thanks,
Hao




--
Thanks,
Hao




--
Thanks,
Hao




--
Thanks,
Hao



Reply via email to