Hi all,

I've updated the KIP based on the feedback. Major changes I made:
1. Add rack aware assignment to `StickyTaskAssignor`
2. Reject `Prefer reliability and then find optimal cost` option in standby
task assignment.


On Wed, May 31, 2023 at 12:09 PM Hao Li <h...@confluent.io> wrote:

> Hi all,
>
> Thanks for the feedback! I will update the KIP accordingly.
>
> *For Sophie's comments:*
>
> 1 and 2. Good catch. Fixed these.
>
> 3 and 4 Yes. We can make this public config and call out the
> clientConsumer config users need to set.
>
> 5. It's ideal to take the previous assignment in HAAssignor into
> consideration when we compute our target assignment, the complications come
> with making sure the assignment can eventually converge and we don't do
> probing rebalance infinitely. It's not only about storing the previous
> assignment or get it somehow. We can actually get the previous assignment
> now like we do in StickyAssignor. But the previous assignment will change
> in each round of probing rebalance. The proposal which added some weight to
> make the rack aware assignment lean towards the original HAA's target
> assignment will add benefits of stability in some corner cases in case of
> tie in cross rack traffic cost. But it's not sticky. But the bottom line is
> it won't be worse than current HAA's stickiness.
>
> 6. I'm fine with changing the assignor config to public. Actually, I think
> we can min-cost algorithm with StickyAssignor as well to mitigate the
> problem of 5. So we can have one public config to choose an assignor and
> one public config to enable the rack aware assignment.
>
> *For Bruno's comments:*
>
> The proposal was to implement all the options and use configs to choose
> them during runtime. We can make those configs public as suggested.
> 1, 2, 3, 4, 5: agree and will fix those.
> 6: subscription protocol is not changed.
> 7: yeah. Let me fix the notations.
> 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc.
> 9: I'm also ok with just optimizing reliability for standby tasks. Or we
> could simply run the "balance reliability over cost" greedy algorithm to
> see if any cost could be reduced.
> 10: Make sense. Will fix the wording.
> 11: Make sense. Will update the test part.
>
> *For Walker's comments:*
> 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5
> and 6. I think we could use the rack aware assignment for StickyAssignor as
> well. For HAA assignments, it's less sticky and we can only shoot for
> minimizing the cross rack traffic eventually when everything is stable.
> 2. Yeah. This is a good point and we can also turn it on for
> StickyAssignor.
>
> Thanks,
> Hao
>
>
> On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman <
> ableegold...@gmail.com> wrote:
>
>> Hey Hao, thanks for the KIP!
>>
>> 1. There's a typo in the "internal.rack.aware.assignment.strategry"
>> config,
>> this
>> should be internal.rack.aware.assignment.strategy.
>>
>> 2.
>>
>> >  For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
>> Number of
>> > edges E is T * N where T is the number of clients and N is the number of
>> > Tasks. This is because a task can be assigned to any client so there
>> will
>> > be an edge between every task and every client. The total complexity
>> would
>> > be O(T * N) if we want to be more specific.
>>
>> I feel like I'm missing something here, but if E = T * N and the
>> complexity
>> is ~O(E^2), doesn't
>> this make the total complexity order of O(T^2 * N^2)?
>>
>> 3.
>>
>> > Since 3.C.I and 3.C.II have different tradeoffs and work better in
>> > different workloads etc, we
>>
>> could add an internal configuration to choose one of them at runtime.
>> >
>> Why only an internal configuration? Same goes for
>> internal.rack.aware.assignment.standby.strategry (which also has the typo)
>>
>> 4.
>>
>> >  There are no changes in public interfaces.
>>
>> I think it would be good to explicitly call out that users can utilize
>> this
>> new feature by setting the
>> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example
>>
>> 5.
>>
>> > The idea is that if we always try to make it overlap as much with
>> > HAAssignor’s target
>>
>> assignment, at least there’s a higher chance that tasks won’t be shuffled
>> a
>> > lot if the clients
>>
>> remain the same across rebalances.
>> >
>> This line definitely gave me some pause -- if there was one major takeaway
>> I had after KIP-441,
>> one thing that most limited the feature's success, it was our assumption
>> that clients are relatively
>> stable across rebalances. This was mostly true at limited scale or for
>> on-prem setups, but
>> unsurprisingly broke down in cloud environments or larger clusters. Not
>> only do clients naturally
>> fall in and out of the group, autoscaling is becoming more and more of a
>> thing.
>>
>> Lastly, and this is more easily solved but still worth calling out, an
>> assignment is only deterministic
>> as long as the client.id is persisted. Currently in Streams, we only
>> write
>> the process UUID to the
>> state directory if there is one, ie if at least one persistent stateful
>> task exists in the topology. This
>> made sense in the context of KIP-441, which targeted heavily stateful
>> deployments, but this KIP
>> presumably intends to target more than just the persistent & stateful
>> subset of applications. To
>> make matters even worse,  "persistent" is defined in a semantically
>> inconsistent way throughout
>> Streams.
>>
>> All this is to say, it may sound more complicated to remember the previous
>> assignment, but (a)
>> imo it only introduces a lot more complexity and shaky assumptions to
>> continue down this
>> path, and (b) we actually already do persist some amount of state, like
>> the
>> process UUID, and
>> (c) it seems like this is the perfect opportunity to finally rid ourselves
>> of the determinism constraint
>> which has frankly caused more trouble and time lost in sum than it would
>> have taken us to just
>> write the HighAvailabilityTaskAssignor to consider the previous assignment
>> from the start in KIP-441
>>
>> 6.
>>
>> > StickyTaskAssignor  users who would like to use rack aware assignment
>> > should upgrade their
>>
>> Kafka Streams version to the version in which HighAvailabilityTaskAssignor
>> > and rack awareness
>>
>> assignment are available.
>>
>> Building off of the above, the HAAssignor hasn't worked out perfectly for
>> everybody up until now,
>> given that we are only adding complexity to it now, on the flipside I
>> would
>> hesitate to try and force
>> everyone to use it if they want to upgrade. We added a "secret" backdoor
>> internal config to allow
>> users to set the task assignor back in KIP-441 for this reason. WDYT about
>> bumping this to a public
>> config on the side in this KIP?
>>
>>
>> On Tue, May 23, 2023 at 11:46 AM Hao Li <h...@confluent.io.invalid> wrote:
>>
>> > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add
>> it to
>> > the test plan. I will also add tests to verify the new optimizer will
>> > produce a balanced assignment which has no worse cross AZ cost than the
>> > existing assignor.
>> >
>> > Hao
>> >
>> > On Mon, May 22, 2023 at 3:39 PM John Roesler <vvcep...@apache.org>
>> wrote:
>> >
>> > > Hi Hao,
>> > >
>> > > Thanks for the KIP!
>> > >
>> > > Overall, I think this is a great idea. I always wanted to circle back
>> > > after the Smooth Scaling KIP to put a proper optimization algorithm
>> into
>> > > place. I think this has the promise to really improve the quality of
>> the
>> > > balanced assignments we produce.
>> > >
>> > > Thanks for providing the details about the MaxCut/MinFlow algorithm.
>> It
>> > > seems like a good choice for me, assuming we choose the right scaling
>> > > factors for the weights we add to the graph. Unfortunately, I don't
>> think
>> > > that there's a good way to see how easy or hard this is going to be
>> until
>> > > we actually implement it and test it.
>> > >
>> > > That leads to the only real piece of feedback I had on the KIP, which
>> is
>> > > the testing portion. You mentioned system/integration/unit tests, but
>> > > there's not too much information about what those tests will do. I'd
>> like
>> > > to suggest that we invest in more simulation testing specifically,
>> > similar
>> > > to what we did in
>> > >
>> >
>> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
>> > > .
>> > >
>> > > In fact, it seems like we _could_ write the simulation up front, and
>> then
>> > > implement the algorithm in a dummy way and just see whether it passes
>> the
>> > > simulations or not, before actually integrating it with Kafka Streams.
>> > >
>> > > Basically, I'd be +1 on this KIP today, but I'd feel confident about
>> it
>> > if
>> > > we had a little more detail regarding how we are going to verify that
>> the
>> > > new optimizer is actually going to produce more optimal plans than the
>> > > existing assigner we have today.
>> > >
>> > > Thanks again!
>> > > -John
>> > >
>> > > On 2023/05/22 16:49:22 Hao Li wrote:
>> > > > Hi Colt,
>> > > >
>> > > > Thanks for the comments.
>> > > >
>> > > > > and I struggle to see how the algorithm isn't at least O(N) where
>> N
>> > is
>> > > > the number of Tasks...?
>> > > >
>> > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant.
>> Number
>> > > of
>> > > > edges E is T * N where T is the number of clients and N is the
>> number
>> > of
>> > > > Tasks. This is because a task can be assigned to any client so there
>> > will
>> > > > be an edge between every task and every client. The total complexity
>> > > would
>> > > > be O(T * N) if we want to be more specific.
>> > > >
>> > > > > But if the leaders for each partition are spread across multiple
>> > zones,
>> > > > how will you handle that?
>> > > >
>> > > > This is what the min-cost flow solution is trying to solve? i.e.
>> Find
>> > an
>> > > > assignment of tasks to clients where across AZ traffic can be
>> > minimized.
>> > > > But there are some constraints to the solution and one of them is we
>> > need
>> > > > to balance task assignment first (
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment
>> > > ).
>> > > > So in your example of three tasks' partitions being in the same AZ
>> of a
>> > > > client, if there are other clients, we still want to balance the
>> tasks
>> > to
>> > > > other clients even if putting all tasks to a single client can
>> result
>> > in
>> > > 0
>> > > > cross AZ traffic. In
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm
>> > > > section, the algorithm will try to find a min-cost solution based on
>> > > > balanced assignment instead of pure min-cost.
>> > > >
>> > > > Thanks,
>> > > > Hao
>> > > >
>> > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <c...@littlehorse.io>
>> > wrote:
>> > > >
>> > > > > Hello Hao,
>> > > > >
>> > > > > First of all, THANK YOU for putting this together. I had been
>> hoping
>> > > > > someone might bring something like this forward. A few comments:
>> > > > >
>> > > > > **1: Runtime Complexity
>> > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow
>> > > problem in
>> > > > > O(E^2CU) time where C is max cost and U is max capacity. In our
>> > > particular
>> > > > > case, C is 1 and U is at most 3 (A task can have at most 3 topics
>> > > including
>> > > > > changelog topic?). So the algorithm runs in O(E^2) time for our
>> case.
>> > > > >
>> > > > > A Task can have multiple input topics, and also multiple state
>> > stores,
>> > > and
>> > > > > multiple output topics. The most common case is three topics as
>> you
>> > > > > described, but this is not necessarily guaranteed. Also, math is
>> one
>> > > of my
>> > > > > weak points, but to me O(E^2) is equivalent to O(1), and I
>> struggle
>> > to
>> > > see
>> > > > > how the algorithm isn't at least O(N) where N is the number of
>> > > Tasks...?
>> > > > >
>> > > > > **2: Broker-Side Partition Assignments
>> > > > > Consider the case with just three topics in a Task (one input, one
>> > > output,
>> > > > > one changelog). If all three partition leaders are in the same
>> Rack
>> > (or
>> > > > > better yet, the same broker), then we could get massive savings by
>> > > > > assigning the Task to that Rack/availability zone. But if the
>> leaders
>> > > for
>> > > > > each partition are spread across multiple zones, how will you
>> handle
>> > > that?
>> > > > > Is that outside the scope of this KIP, or is it worth introducing
>> a
>> > > > > kafka-streams-generate-rebalance-proposal.sh tool?
>> > > > >
>> > > > > Colt McNealy
>> > > > > *Founder, LittleHorse.io*
>> > > > >
>> > > > >
>> > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <h...@confluent.io.invalid>
>> > > wrote:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > I have submitted KIP-925 to add rack awareness logic in task
>> > > assignment
>> > > > > in
>> > > > > > Kafka Streams and would like to start a discussion:
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
>> > > > > >
>> > > > > > --
>> > > > > > Thanks,
>> > > > > > Hao
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > > Thanks,
>> > > > Hao
>> > > >
>> > >
>> >
>> >
>> > --
>> > Thanks,
>> > Hao
>> >
>>
>
>
> --
> Thanks,
> Hao
>


-- 
Thanks,
Hao

Reply via email to