Hi all, I've updated the KIP based on the feedback. Major changes I made: 1. Add rack aware assignment to `StickyTaskAssignor` 2. Reject `Prefer reliability and then find optimal cost` option in standby task assignment.
On Wed, May 31, 2023 at 12:09 PM Hao Li <h...@confluent.io> wrote: > Hi all, > > Thanks for the feedback! I will update the KIP accordingly. > > *For Sophie's comments:* > > 1 and 2. Good catch. Fixed these. > > 3 and 4 Yes. We can make this public config and call out the > clientConsumer config users need to set. > > 5. It's ideal to take the previous assignment in HAAssignor into > consideration when we compute our target assignment, the complications come > with making sure the assignment can eventually converge and we don't do > probing rebalance infinitely. It's not only about storing the previous > assignment or get it somehow. We can actually get the previous assignment > now like we do in StickyAssignor. But the previous assignment will change > in each round of probing rebalance. The proposal which added some weight to > make the rack aware assignment lean towards the original HAA's target > assignment will add benefits of stability in some corner cases in case of > tie in cross rack traffic cost. But it's not sticky. But the bottom line is > it won't be worse than current HAA's stickiness. > > 6. I'm fine with changing the assignor config to public. Actually, I think > we can min-cost algorithm with StickyAssignor as well to mitigate the > problem of 5. So we can have one public config to choose an assignor and > one public config to enable the rack aware assignment. > > *For Bruno's comments:* > > The proposal was to implement all the options and use configs to choose > them during runtime. We can make those configs public as suggested. > 1, 2, 3, 4, 5: agree and will fix those. > 6: subscription protocol is not changed. > 7: yeah. Let me fix the notations. > 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` etc. > 9: I'm also ok with just optimizing reliability for standby tasks. Or we > could simply run the "balance reliability over cost" greedy algorithm to > see if any cost could be reduced. > 10: Make sense. Will fix the wording. > 11: Make sense. Will update the test part. > > *For Walker's comments:* > 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5 > and 6. I think we could use the rack aware assignment for StickyAssignor as > well. For HAA assignments, it's less sticky and we can only shoot for > minimizing the cross rack traffic eventually when everything is stable. > 2. Yeah. This is a good point and we can also turn it on for > StickyAssignor. > > Thanks, > Hao > > > On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman < > ableegold...@gmail.com> wrote: > >> Hey Hao, thanks for the KIP! >> >> 1. There's a typo in the "internal.rack.aware.assignment.strategry" >> config, >> this >> should be internal.rack.aware.assignment.strategy. >> >> 2. >> >> > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. >> Number of >> > edges E is T * N where T is the number of clients and N is the number of >> > Tasks. This is because a task can be assigned to any client so there >> will >> > be an edge between every task and every client. The total complexity >> would >> > be O(T * N) if we want to be more specific. >> >> I feel like I'm missing something here, but if E = T * N and the >> complexity >> is ~O(E^2), doesn't >> this make the total complexity order of O(T^2 * N^2)? >> >> 3. >> >> > Since 3.C.I and 3.C.II have different tradeoffs and work better in >> > different workloads etc, we >> >> could add an internal configuration to choose one of them at runtime. >> > >> Why only an internal configuration? Same goes for >> internal.rack.aware.assignment.standby.strategry (which also has the typo) >> >> 4. >> >> > There are no changes in public interfaces. >> >> I think it would be good to explicitly call out that users can utilize >> this >> new feature by setting the >> ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example >> >> 5. >> >> > The idea is that if we always try to make it overlap as much with >> > HAAssignor’s target >> >> assignment, at least there’s a higher chance that tasks won’t be shuffled >> a >> > lot if the clients >> >> remain the same across rebalances. >> > >> This line definitely gave me some pause -- if there was one major takeaway >> I had after KIP-441, >> one thing that most limited the feature's success, it was our assumption >> that clients are relatively >> stable across rebalances. This was mostly true at limited scale or for >> on-prem setups, but >> unsurprisingly broke down in cloud environments or larger clusters. Not >> only do clients naturally >> fall in and out of the group, autoscaling is becoming more and more of a >> thing. >> >> Lastly, and this is more easily solved but still worth calling out, an >> assignment is only deterministic >> as long as the client.id is persisted. Currently in Streams, we only >> write >> the process UUID to the >> state directory if there is one, ie if at least one persistent stateful >> task exists in the topology. This >> made sense in the context of KIP-441, which targeted heavily stateful >> deployments, but this KIP >> presumably intends to target more than just the persistent & stateful >> subset of applications. To >> make matters even worse, "persistent" is defined in a semantically >> inconsistent way throughout >> Streams. >> >> All this is to say, it may sound more complicated to remember the previous >> assignment, but (a) >> imo it only introduces a lot more complexity and shaky assumptions to >> continue down this >> path, and (b) we actually already do persist some amount of state, like >> the >> process UUID, and >> (c) it seems like this is the perfect opportunity to finally rid ourselves >> of the determinism constraint >> which has frankly caused more trouble and time lost in sum than it would >> have taken us to just >> write the HighAvailabilityTaskAssignor to consider the previous assignment >> from the start in KIP-441 >> >> 6. >> >> > StickyTaskAssignor users who would like to use rack aware assignment >> > should upgrade their >> >> Kafka Streams version to the version in which HighAvailabilityTaskAssignor >> > and rack awareness >> >> assignment are available. >> >> Building off of the above, the HAAssignor hasn't worked out perfectly for >> everybody up until now, >> given that we are only adding complexity to it now, on the flipside I >> would >> hesitate to try and force >> everyone to use it if they want to upgrade. We added a "secret" backdoor >> internal config to allow >> users to set the task assignor back in KIP-441 for this reason. WDYT about >> bumping this to a public >> config on the side in this KIP? >> >> >> On Tue, May 23, 2023 at 11:46 AM Hao Li <h...@confluent.io.invalid> wrote: >> >> > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add >> it to >> > the test plan. I will also add tests to verify the new optimizer will >> > produce a balanced assignment which has no worse cross AZ cost than the >> > existing assignor. >> > >> > Hao >> > >> > On Mon, May 22, 2023 at 3:39 PM John Roesler <vvcep...@apache.org> >> wrote: >> > >> > > Hi Hao, >> > > >> > > Thanks for the KIP! >> > > >> > > Overall, I think this is a great idea. I always wanted to circle back >> > > after the Smooth Scaling KIP to put a proper optimization algorithm >> into >> > > place. I think this has the promise to really improve the quality of >> the >> > > balanced assignments we produce. >> > > >> > > Thanks for providing the details about the MaxCut/MinFlow algorithm. >> It >> > > seems like a good choice for me, assuming we choose the right scaling >> > > factors for the weights we add to the graph. Unfortunately, I don't >> think >> > > that there's a good way to see how easy or hard this is going to be >> until >> > > we actually implement it and test it. >> > > >> > > That leads to the only real piece of feedback I had on the KIP, which >> is >> > > the testing portion. You mentioned system/integration/unit tests, but >> > > there's not too much information about what those tests will do. I'd >> like >> > > to suggest that we invest in more simulation testing specifically, >> > similar >> > > to what we did in >> > > >> > >> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java >> > > . >> > > >> > > In fact, it seems like we _could_ write the simulation up front, and >> then >> > > implement the algorithm in a dummy way and just see whether it passes >> the >> > > simulations or not, before actually integrating it with Kafka Streams. >> > > >> > > Basically, I'd be +1 on this KIP today, but I'd feel confident about >> it >> > if >> > > we had a little more detail regarding how we are going to verify that >> the >> > > new optimizer is actually going to produce more optimal plans than the >> > > existing assigner we have today. >> > > >> > > Thanks again! >> > > -John >> > > >> > > On 2023/05/22 16:49:22 Hao Li wrote: >> > > > Hi Colt, >> > > > >> > > > Thanks for the comments. >> > > > >> > > > > and I struggle to see how the algorithm isn't at least O(N) where >> N >> > is >> > > > the number of Tasks...? >> > > > >> > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. >> Number >> > > of >> > > > edges E is T * N where T is the number of clients and N is the >> number >> > of >> > > > Tasks. This is because a task can be assigned to any client so there >> > will >> > > > be an edge between every task and every client. The total complexity >> > > would >> > > > be O(T * N) if we want to be more specific. >> > > > >> > > > > But if the leaders for each partition are spread across multiple >> > zones, >> > > > how will you handle that? >> > > > >> > > > This is what the min-cost flow solution is trying to solve? i.e. >> Find >> > an >> > > > assignment of tasks to clients where across AZ traffic can be >> > minimized. >> > > > But there are some constraints to the solution and one of them is we >> > need >> > > > to balance task assignment first ( >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment >> > > ). >> > > > So in your example of three tasks' partitions being in the same AZ >> of a >> > > > client, if there are other clients, we still want to balance the >> tasks >> > to >> > > > other clients even if putting all tasks to a single client can >> result >> > in >> > > 0 >> > > > cross AZ traffic. In >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm >> > > > section, the algorithm will try to find a min-cost solution based on >> > > > balanced assignment instead of pure min-cost. >> > > > >> > > > Thanks, >> > > > Hao >> > > > >> > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <c...@littlehorse.io> >> > wrote: >> > > > >> > > > > Hello Hao, >> > > > > >> > > > > First of all, THANK YOU for putting this together. I had been >> hoping >> > > > > someone might bring something like this forward. A few comments: >> > > > > >> > > > > **1: Runtime Complexity >> > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow >> > > problem in >> > > > > O(E^2CU) time where C is max cost and U is max capacity. In our >> > > particular >> > > > > case, C is 1 and U is at most 3 (A task can have at most 3 topics >> > > including >> > > > > changelog topic?). So the algorithm runs in O(E^2) time for our >> case. >> > > > > >> > > > > A Task can have multiple input topics, and also multiple state >> > stores, >> > > and >> > > > > multiple output topics. The most common case is three topics as >> you >> > > > > described, but this is not necessarily guaranteed. Also, math is >> one >> > > of my >> > > > > weak points, but to me O(E^2) is equivalent to O(1), and I >> struggle >> > to >> > > see >> > > > > how the algorithm isn't at least O(N) where N is the number of >> > > Tasks...? >> > > > > >> > > > > **2: Broker-Side Partition Assignments >> > > > > Consider the case with just three topics in a Task (one input, one >> > > output, >> > > > > one changelog). If all three partition leaders are in the same >> Rack >> > (or >> > > > > better yet, the same broker), then we could get massive savings by >> > > > > assigning the Task to that Rack/availability zone. But if the >> leaders >> > > for >> > > > > each partition are spread across multiple zones, how will you >> handle >> > > that? >> > > > > Is that outside the scope of this KIP, or is it worth introducing >> a >> > > > > kafka-streams-generate-rebalance-proposal.sh tool? >> > > > > >> > > > > Colt McNealy >> > > > > *Founder, LittleHorse.io* >> > > > > >> > > > > >> > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <h...@confluent.io.invalid> >> > > wrote: >> > > > > >> > > > > > Hi all, >> > > > > > >> > > > > > I have submitted KIP-925 to add rack awareness logic in task >> > > assignment >> > > > > in >> > > > > > Kafka Streams and would like to start a discussion: >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams >> > > > > > >> > > > > > -- >> > > > > > Thanks, >> > > > > > Hao >> > > > > > >> > > > > >> > > > >> > > > >> > > > -- >> > > > Thanks, >> > > > Hao >> > > > >> > > >> > >> > >> > -- >> > Thanks, >> > Hao >> > >> > > > -- > Thanks, > Hao > -- Thanks, Hao