Thanks Sophie! I've just made the updates :) On Wed, May 31, 2023 at 2:29 PM Sophie Blee-Goldman <ableegold...@gmail.com> wrote:
> Thanks Hao -- I really like Walker's idea to separate the assignment > strategy from the rack awareness option. > Having two different configs for these makes a lot of sense to me and seems > like it will benefit the most users. > > Given your KIP already has a lot going on with it, I would be happy to > write up a quick KIP just for the public > assignor config to take that aspect off your hands. We can discuss that > separately and keep this thread focused > on the rack awareness algorithm. If that sounds good I'll kick off a KIP > tonight, then you can just update the > "Current Task Assignment Logic" section to mention that either assignor may > be plugged in, and the rack > awareness can be configured independently. > > If users are able to plug in the StickyAssignor if they experience > problems, this is good enough for me w.r.t my > other concern regarding task shuffling > > On Wed, May 31, 2023 at 12:09 PM Hao Li <h...@confluent.io.invalid> wrote: > > > Hi all, > > > > Thanks for the feedback! I will update the KIP accordingly. > > > > *For Sophie's comments:* > > > > 1 and 2. Good catch. Fixed these. > > > > 3 and 4 Yes. We can make this public config and call out the > clientConsumer > > config users need to set. > > > > 5. It's ideal to take the previous assignment in HAAssignor into > > consideration when we compute our target assignment, the complications > come > > with making sure the assignment can eventually converge and we don't do > > probing rebalance infinitely. It's not only about storing the previous > > assignment or get it somehow. We can actually get the previous assignment > > now like we do in StickyAssignor. But the previous assignment will change > > in each round of probing rebalance. The proposal which added some weight > to > > make the rack aware assignment lean towards the original HAA's target > > assignment will add benefits of stability in some corner cases in case of > > tie in cross rack traffic cost. But it's not sticky. But the bottom line > is > > it won't be worse than current HAA's stickiness. > > > > 6. I'm fine with changing the assignor config to public. Actually, I > think > > we can min-cost algorithm with StickyAssignor as well to mitigate the > > problem of 5. So we can have one public config to choose an assignor and > > one public config to enable the rack aware assignment. > > > > *For Bruno's comments:* > > > > The proposal was to implement all the options and use configs to choose > > them during runtime. We can make those configs public as suggested. > > 1, 2, 3, 4, 5: agree and will fix those. > > 6: subscription protocol is not changed. > > 7: yeah. Let me fix the notations. > > 8: It meant clients. In the figure, it maps to `c1_1`, `c1_2`, `c1_3` > etc. > > 9: I'm also ok with just optimizing reliability for standby tasks. Or we > > could simply run the "balance reliability over cost" greedy algorithm to > > see if any cost could be reduced. > > 10: Make sense. Will fix the wording. > > 11: Make sense. Will update the test part. > > > > *For Walker's comments:* > > 1. Stability for HAA is an issue. See my comments for Sophie's feedback 5 > > and 6. I think we could use the rack aware assignment for StickyAssignor > as > > well. For HAA assignments, it's less sticky and we can only shoot for > > minimizing the cross rack traffic eventually when everything is stable. > > 2. Yeah. This is a good point and we can also turn it on for > > StickyAssignor. > > > > Thanks, > > Hao > > > > > > On Tue, May 30, 2023 at 2:28 PM Sophie Blee-Goldman < > > ableegold...@gmail.com> > > wrote: > > > > > Hey Hao, thanks for the KIP! > > > > > > 1. There's a typo in the "internal.rack.aware.assignment.strategry" > > config, > > > this > > > should be internal.rack.aware.assignment.strategy. > > > > > > 2. > > > > > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. > > Number > > > of > > > > edges E is T * N where T is the number of clients and N is the number > > of > > > > Tasks. This is because a task can be assigned to any client so there > > will > > > > be an edge between every task and every client. The total complexity > > > would > > > > be O(T * N) if we want to be more specific. > > > > > > I feel like I'm missing something here, but if E = T * N and the > > complexity > > > is ~O(E^2), doesn't > > > this make the total complexity order of O(T^2 * N^2)? > > > > > > 3. > > > > > > > Since 3.C.I and 3.C.II have different tradeoffs and work better in > > > > different workloads etc, we > > > > > > could add an internal configuration to choose one of them at runtime. > > > > > > > Why only an internal configuration? Same goes for > > > internal.rack.aware.assignment.standby.strategry (which also has the > > typo) > > > > > > 4. > > > > > > > There are no changes in public interfaces. > > > > > > I think it would be good to explicitly call out that users can utilize > > this > > > new feature by setting the > > > ConsumerConfig's CLIENT_RACK_CONFIG, possibly with a brief example > > > > > > 5. > > > > > > > The idea is that if we always try to make it overlap as much with > > > > HAAssignor’s target > > > > > > assignment, at least there’s a higher chance that tasks won’t be > > shuffled a > > > > lot if the clients > > > > > > remain the same across rebalances. > > > > > > > This line definitely gave me some pause -- if there was one major > > takeaway > > > I had after KIP-441, > > > one thing that most limited the feature's success, it was our > assumption > > > that clients are relatively > > > stable across rebalances. This was mostly true at limited scale or for > > > on-prem setups, but > > > unsurprisingly broke down in cloud environments or larger clusters. Not > > > only do clients naturally > > > fall in and out of the group, autoscaling is becoming more and more of > a > > > thing. > > > > > > Lastly, and this is more easily solved but still worth calling out, an > > > assignment is only deterministic > > > as long as the client.id is persisted. Currently in Streams, we only > > write > > > the process UUID to the > > > state directory if there is one, ie if at least one persistent stateful > > > task exists in the topology. This > > > made sense in the context of KIP-441, which targeted heavily stateful > > > deployments, but this KIP > > > presumably intends to target more than just the persistent & stateful > > > subset of applications. To > > > make matters even worse, "persistent" is defined in a semantically > > > inconsistent way throughout > > > Streams. > > > > > > All this is to say, it may sound more complicated to remember the > > previous > > > assignment, but (a) > > > imo it only introduces a lot more complexity and shaky assumptions to > > > continue down this > > > path, and (b) we actually already do persist some amount of state, like > > the > > > process UUID, and > > > (c) it seems like this is the perfect opportunity to finally rid > > ourselves > > > of the determinism constraint > > > which has frankly caused more trouble and time lost in sum than it > would > > > have taken us to just > > > write the HighAvailabilityTaskAssignor to consider the previous > > assignment > > > from the start in KIP-441 > > > > > > 6. > > > > > > > StickyTaskAssignor users who would like to use rack aware assignment > > > > should upgrade their > > > > > > Kafka Streams version to the version in which > > HighAvailabilityTaskAssignor > > > > and rack awareness > > > > > > assignment are available. > > > > > > Building off of the above, the HAAssignor hasn't worked out perfectly > for > > > everybody up until now, > > > given that we are only adding complexity to it now, on the flipside I > > would > > > hesitate to try and force > > > everyone to use it if they want to upgrade. We added a "secret" > backdoor > > > internal config to allow > > > users to set the task assignor back in KIP-441 for this reason. WDYT > > about > > > bumping this to a public > > > config on the side in this KIP? > > > > > > > > > On Tue, May 23, 2023 at 11:46 AM Hao Li <h...@confluent.io.invalid> > > wrote: > > > > > > > Thanks John! Yeah. The ConvergenceTest looks very helpful. I will add > > it > > > to > > > > the test plan. I will also add tests to verify the new optimizer will > > > > produce a balanced assignment which has no worse cross AZ cost than > the > > > > existing assignor. > > > > > > > > Hao > > > > > > > > On Mon, May 22, 2023 at 3:39 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > > > > Hi Hao, > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > Overall, I think this is a great idea. I always wanted to circle > back > > > > > after the Smooth Scaling KIP to put a proper optimization algorithm > > > into > > > > > place. I think this has the promise to really improve the quality > of > > > the > > > > > balanced assignments we produce. > > > > > > > > > > Thanks for providing the details about the MaxCut/MinFlow > algorithm. > > It > > > > > seems like a good choice for me, assuming we choose the right > scaling > > > > > factors for the weights we add to the graph. Unfortunately, I don't > > > think > > > > > that there's a good way to see how easy or hard this is going to be > > > until > > > > > we actually implement it and test it. > > > > > > > > > > That leads to the only real piece of feedback I had on the KIP, > which > > > is > > > > > the testing portion. You mentioned system/integration/unit tests, > but > > > > > there's not too much information about what those tests will do. > I'd > > > like > > > > > to suggest that we invest in more simulation testing specifically, > > > > similar > > > > > to what we did in > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java > > > > > . > > > > > > > > > > In fact, it seems like we _could_ write the simulation up front, > and > > > then > > > > > implement the algorithm in a dummy way and just see whether it > passes > > > the > > > > > simulations or not, before actually integrating it with Kafka > > Streams. > > > > > > > > > > Basically, I'd be +1 on this KIP today, but I'd feel confident > about > > it > > > > if > > > > > we had a little more detail regarding how we are going to verify > that > > > the > > > > > new optimizer is actually going to produce more optimal plans than > > the > > > > > existing assigner we have today. > > > > > > > > > > Thanks again! > > > > > -John > > > > > > > > > > On 2023/05/22 16:49:22 Hao Li wrote: > > > > > > Hi Colt, > > > > > > > > > > > > Thanks for the comments. > > > > > > > > > > > > > and I struggle to see how the algorithm isn't at least O(N) > > where N > > > > is > > > > > > the number of Tasks...? > > > > > > > > > > > > For O(E^2 * (CU)) complexity, C and U can be viewed as constant. > > > Number > > > > > of > > > > > > edges E is T * N where T is the number of clients and N is the > > number > > > > of > > > > > > Tasks. This is because a task can be assigned to any client so > > there > > > > will > > > > > > be an edge between every task and every client. The total > > complexity > > > > > would > > > > > > be O(T * N) if we want to be more specific. > > > > > > > > > > > > > But if the leaders for each partition are spread across > multiple > > > > zones, > > > > > > how will you handle that? > > > > > > > > > > > > This is what the min-cost flow solution is trying to solve? i.e. > > Find > > > > an > > > > > > assignment of tasks to clients where across AZ traffic can be > > > > minimized. > > > > > > But there are some constraints to the solution and one of them is > > we > > > > need > > > > > > to balance task assignment first ( > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Designforrackawareassignment > > > > > ). > > > > > > So in your example of three tasks' partitions being in the same > AZ > > > of a > > > > > > client, if there are other clients, we still want to balance the > > > tasks > > > > to > > > > > > other clients even if putting all tasks to a single client can > > result > > > > in > > > > > 0 > > > > > > cross AZ traffic. In > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams#KIP925:RackawaretaskassignmentinKafkaStreams-Algorithm > > > > > > section, the algorithm will try to find a min-cost solution based > > on > > > > > > balanced assignment instead of pure min-cost. > > > > > > > > > > > > Thanks, > > > > > > Hao > > > > > > > > > > > > On Tue, May 9, 2023 at 5:55 PM Colt McNealy <c...@littlehorse.io > > > > > > wrote: > > > > > > > > > > > > > Hello Hao, > > > > > > > > > > > > > > First of all, THANK YOU for putting this together. I had been > > > hoping > > > > > > > someone might bring something like this forward. A few > comments: > > > > > > > > > > > > > > **1: Runtime Complexity > > > > > > > > Klein’s cycle canceling algorithm can solve the min-cost flow > > > > > problem in > > > > > > > O(E^2CU) time where C is max cost and U is max capacity. In our > > > > > particular > > > > > > > case, C is 1 and U is at most 3 (A task can have at most 3 > topics > > > > > including > > > > > > > changelog topic?). So the algorithm runs in O(E^2) time for our > > > case. > > > > > > > > > > > > > > A Task can have multiple input topics, and also multiple state > > > > stores, > > > > > and > > > > > > > multiple output topics. The most common case is three topics as > > you > > > > > > > described, but this is not necessarily guaranteed. Also, math > is > > > one > > > > > of my > > > > > > > weak points, but to me O(E^2) is equivalent to O(1), and I > > struggle > > > > to > > > > > see > > > > > > > how the algorithm isn't at least O(N) where N is the number of > > > > > Tasks...? > > > > > > > > > > > > > > **2: Broker-Side Partition Assignments > > > > > > > Consider the case with just three topics in a Task (one input, > > one > > > > > output, > > > > > > > one changelog). If all three partition leaders are in the same > > Rack > > > > (or > > > > > > > better yet, the same broker), then we could get massive savings > > by > > > > > > > assigning the Task to that Rack/availability zone. But if the > > > leaders > > > > > for > > > > > > > each partition are spread across multiple zones, how will you > > > handle > > > > > that? > > > > > > > Is that outside the scope of this KIP, or is it worth > > introducing a > > > > > > > kafka-streams-generate-rebalance-proposal.sh tool? > > > > > > > > > > > > > > Colt McNealy > > > > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > > > > > > > > > > On Tue, May 9, 2023 at 4:03 PM Hao Li <h...@confluent.io.invalid > > > > > > > wrote: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I have submitted KIP-925 to add rack awareness logic in task > > > > > assignment > > > > > > > in > > > > > > > > Kafka Streams and would like to start a discussion: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams > > > > > > > > > > > > > > > > -- > > > > > > > > Thanks, > > > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Thanks, > > > > > > Hao > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Hao > > > > > > > > > > > > > -- > > Thanks, > > Hao > > > -- Thanks, Hao