Thanks Hao -- I really like Walker's idea to separate the assignment strategy from the rack awareness option. Having two different configs for these makes a lot of sense to me and seems like it will benefit the most users.
Given your KIP already has a lot going on with it, I would be happy to write up a quick KIP just for the public assignor config to take that aspect off your hands. We can discuss that separately and keep this thread focused on the rack awareness algorithm. If that sounds good I'll kick off a KIP tonight, then you can just update the "Current Task Assignment Logic" section to mention that either assignor may be plugged in, and the rack awareness can be configured independently. If users are able to plug in the StickyAssignor if they experience problems, this is good enough for me w.r.t my other concern regarding task shuffling On Wed, May 31, 2023 at 12:09 PM Hao Li <h...@confluent.io.invalid> wrote: > Hi all, > > Thanks for the feedback! I will update the KIP accordingly. > > *For Sophie's comments:* > > 1 and 2. Good catch. Fixed these. > > 3 and 4 Yes. We can make this public config and call out the clientConsumer > config users need to set. > > 5. It's ideal to take the previous assignment in HAAssignor into > consideration when we compute our target assignment, the complications come > with making sure the assignment can eventually converge and we don't do > probing rebalance infinitely. It's not only about storing the previous > assignment or get it somehow. We can actually get the previous assignment > now like we do in StickyAssignor. But the previous assignment will change > in each round of probing rebalance. The proposal which added some weight to > make the rack aware assignment lean towards the original HAA's target > assignment will add benefits of stability in some corner cases in case of > tie in cross rack traffic cost. But it's not sticky. But the bottom line is > it won't be worse than current HAA's stickiness. > > 6. I'm fine with changing the assignor config to public. Actually, I think > we can min-cost algorithm with StickyAssignor as well to mitigate the > problem of 5. So we can have one public config to choose an assignor and > one public config to enable the rack aware assignment. > > *For Bruno's comments:* > > The proposal was to implement all the options and use configs to choose > them during runtime. We can make those configs public as suggested. > 1, 2, 3, 4, 5: agree and will fix those. > 6: subscription protocol is not changed. > 7: yeah. Let me fix the notations. > 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc. > 9: I'm also ok with just optimizing reliability for standby tasks. Or we > could simply run the "balance reliability over cost" greedy algorithm to > see if any cost could be reduced. > 10: Make sense. Will fix the wording. > 11: Make sense. Will update the test part. > > *For Walker's comments:* > 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5 > and 6. I think we could use the rack aware assignment for StickyAssignor as > well. For HAA assignments, it's less sticky and we can only shoot for > minimizing the cross rack traffic eventually when everything is stable. > 2. Yeah. This is a good point and we can also turn it on for > StickyAssignor. > > Thanks, > Hao > > > On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman < > ableegold...@gmail.com> > wrote: > > > Hey Hao, thanks for the KIP! > > > > 1. There's a typo in the "internal.rack.aware.assignment.strategry" > config, > > this > > should be internal.rack.aware.assignment.strategy. > > > > 2. > > > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. > Number > > of > > > edges E is T * N where T is the number of clients and N is the number > of > > > Tasks. This is because a task can be assigned to any client so there > will > > > be an edge between every task and every client. The total complexity > > would > > > be O(T * N) if we want to be more specific. > > > > I feel like I'm missing something here, but if E = T * N and the > complexity > > is ~O(E^2), doesn't > > this make the total complexity order of O(T^2 * N^2)? > > > > 3. > > > > > Since 3.C.I and 3.C.II have different tradeoffs and work better in > > > different workloads etc, we > > > > could add an internal configuration to choose one of them at runtime. > > > > > Why only an internal configuration? Same goes for > > internal.rack.aware.assignment.standby.strategry (which also has the > typo) > > > > 4. > > > > > There are no changes in public interfaces. > > > > I think it would be good to explicitly call out that users can utilize > this > > new feature by setting the > > ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example > > > > 5. > > > > > The idea is that if we always try to make it overlap as much with > > > HAAssignor’s target > > > > assignment, at least there’s a higher chance that tasks won’t be > shuffled a > > > lot if the clients > > > > remain the same across rebalances. > > > > > This line definitely gave me some pause -- if there was one major > takeaway > > I had after KIP-441, > > one thing that most limited the feature's success, it was our assumption > > that clients are relatively > > stable across rebalances. This was mostly true at limited scale or for > > on-prem setups, but > > unsurprisingly broke down in cloud environments or larger clusters. Not > > only do clients naturally > > fall in and out of the group, autoscaling is becoming more and more of a > > thing. > > > > Lastly, and this is more easily solved but still worth calling out, an > > assignment is only deterministic > > as long as the client.id is persisted. Currently in Streams, we only > write > > the process UUID to the > > state directory if there is one, ie if at least one persistent stateful > > task exists in the topology. This > > made sense in the context of KIP-441, which targeted heavily stateful > > deployments, but this KIP > > presumably intends to target more than just the persistent & stateful > > subset of applications. To > > make matters even worse, "persistent" is defined in a semantically > > inconsistent way throughout > > Streams. > > > > All this is to say, it may sound more complicated to remember the > previous > > assignment, but (a) > > imo it only introduces a lot more complexity and shaky assumptions to > > continue down this > > path, and (b) we actually already do persist some amount of state, like > the > > process UUID, and > > (c) it seems like this is the perfect opportunity to finally rid > ourselves > > of the determinism constraint > > which has frankly caused more trouble and time lost in sum than it would > > have taken us to just > > write the HighAvailabilityTaskAssignor to consider the previous > assignment > > from the start in KIP-441 > > > > 6. > > > > > StickyTaskAssignor users who would like to use rack aware assignment > > > should upgrade their > > > > Kafka Streams version to the version in which > HighAvailabilityTaskAssignor > > > and rack awareness > > > > assignment are available. > > > > Building off of the above, the HAAssignor hasn't worked out perfectly for > > everybody up until now, > > given that we are only adding complexity to it now, on the flipside I > would > > hesitate to try and force > > everyone to use it if they want to upgrade. We added a "secret" backdoor > > internal config to allow > > users to set the task assignor back in KIP-441 for this reason. WDYT > about > > bumping this to a public > > config on the side in this KIP? > > > > > > On Tue, May 23, 2023 at 11:46 AM Hao Li <h...@confluent.io.invalid> > wrote: > > > > > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add > it > > to > > > the test plan. I will also add tests to verify the new optimizer will > > > produce a balanced assignment which has no worse cross AZ cost than the > > > existing assignor. > > > > > > Hao > > > > > > On Mon, May 22, 2023 at 3:39 PM John Roesler <vvcep...@apache.org> > > wrote: > > > > > > > Hi Hao, > > > > > > > > Thanks for the KIP! > > > > > > > > Overall, I think this is a great idea. I always wanted to circle back > > > > after the Smooth Scaling KIP to put a proper optimization algorithm > > into > > > > place. I think this has the promise to really improve the quality of > > the > > > > balanced assignments we produce. > > > > > > > > Thanks for providing the details about the MaxCut/MinFlow algorithm. > It > > > > seems like a good choice for me, assuming we choose the right scaling > > > > factors for the weights we add to the graph. Unfortunately, I don't > > think > > > > that there's a good way to see how easy or hard this is going to be > > until > > > > we actually implement it and test it. > > > > > > > > That leads to the only real piece of feedback I had on the KIP, which > > is > > > > the testing portion. You mentioned system/integration/unit tests, but > > > > there's not too much information about what those tests will do. I'd > > like > > > > to suggest that we invest in more simulation testing specifically, > > > similar > > > > to what we did in > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java > > > > . > > > > > > > > In fact, it seems like we _could_ write the simulation up front, and > > then > > > > implement the algorithm in a dummy way and just see whether it passes > > the > > > > simulations or not, before actually integrating it with Kafka > Streams. > > > > > > > > Basically, I'd be +1 on this KIP today, but I'd feel confident about > it > > > if > > > > we had a little more detail regarding how we are going to verify that > > the > > > > new optimizer is actually going to produce more optimal plans than > the > > > > existing assigner we have today. > > > > > > > > Thanks again! > > > > -John > > > > > > > > On 2023/05/22 16:49:22 Hao Li wrote: > > > > > Hi Colt, > > > > > > > > > > Thanks for the comments. > > > > > > > > > > > and I struggle to see how the algorithm isn't at least O(N) > where N > > > is > > > > > the number of Tasks...? > > > > > > > > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. > > Number > > > > of > > > > > edges E is T * N where T is the number of clients and N is the > number > > > of > > > > > Tasks. This is because a task can be assigned to any client so > there > > > will > > > > > be an edge between every task and every client. The total > complexity > > > > would > > > > > be O(T * N) if we want to be more specific. > > > > > > > > > > > But if the leaders for each partition are spread across multiple > > > zones, > > > > > how will you handle that? > > > > > > > > > > This is what the min-cost flow solution is trying to solve? i.e. > Find > > > an > > > > > assignment of tasks to clients where across AZ traffic can be > > > minimized. > > > > > But there are some constraints to the solution and one of them is > we > > > need > > > > > to balance task assignment first ( > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment > > > > ). > > > > > So in your example of three tasks' partitions being in the same AZ > > of a > > > > > client, if there are other clients, we still want to balance the > > tasks > > > to > > > > > other clients even if putting all tasks to a single client can > result > > > in > > > > 0 > > > > > cross AZ traffic. In > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm > > > > > section, the algorithm will try to find a min-cost solution based > on > > > > > balanced assignment instead of pure min-cost. > > > > > > > > > > Thanks, > > > > > Hao > > > > > > > > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <c...@littlehorse.io> > > > wrote: > > > > > > > > > > > Hello Hao, > > > > > > > > > > > > First of all, THANK YOU for putting this together. I had been > > hoping > > > > > > someone might bring something like this forward. A few comments: > > > > > > > > > > > > **1: Runtime Complexity > > > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow > > > > problem in > > > > > > O(E^2CU) time where C is max cost and U is max capacity. In our > > > > particular > > > > > > case, C is 1 and U is at most 3 (A task can have at most 3 topics > > > > including > > > > > > changelog topic?). So the algorithm runs in O(E^2) time for our > > case. > > > > > > > > > > > > A Task can have multiple input topics, and also multiple state > > > stores, > > > > and > > > > > > multiple output topics. The most common case is three topics as > you > > > > > > described, but this is not necessarily guaranteed. Also, math is > > one > > > > of my > > > > > > weak points, but to me O(E^2) is equivalent to O(1), and I > struggle > > > to > > > > see > > > > > > how the algorithm isn't at least O(N) where N is the number of > > > > Tasks...? > > > > > > > > > > > > **2: Broker-Side Partition Assignments > > > > > > Consider the case with just three topics in a Task (one input, > one > > > > output, > > > > > > one changelog). If all three partition leaders are in the same > Rack > > > (or > > > > > > better yet, the same broker), then we could get massive savings > by > > > > > > assigning the Task to that Rack/availability zone. But if the > > leaders > > > > for > > > > > > each partition are spread across multiple zones, how will you > > handle > > > > that? > > > > > > Is that outside the scope of this KIP, or is it worth > introducing a > > > > > > kafka-streams-generate-rebalance-proposal.sh tool? > > > > > > > > > > > > Colt McNealy > > > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > > > > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <h...@confluent.io.invalid> > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I have submitted KIP-925 to add rack awareness logic in task > > > > assignment > > > > > > in > > > > > > > Kafka Streams and would like to start a discussion: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams > > > > > > > > > > > > > > -- > > > > > > > Thanks, > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Thanks, > > > > > Hao > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Hao > > > > > > > > -- > Thanks, > Hao >