Hey Vahid, Comments below:
I'm not very clear on the first part of this paragraph. You could clarify > it for me, but in general balancing out the partitions across consumers in > a group as much as possible would normally mean balancing the load within > the cluster, and that's something a user would want to have compared to > cases where the assignments and therefore the load could be quite > unbalanced depending on the subscriptions. I'm just wondering what kind of use cases require differing subscriptions in a steady state. Usually we expect all consumers in the group to have the same subscription, in which case the balance provided by round robin should be even (in terms of the number of assigned partitions). The only case that comes to mind is a rolling upgrade scenario in which the consumers in the group are restarted one by one with an updated subscription. It would be ideal to provide better balance in this situation, but once the upgrade finishes, the assignment should be balanced again, so it's unclear to me how significant the gain is. On the other hand, if there are cases which require differing subscriptions in a long term state, it would make this feature more compelling. Since the new consumer is single threaded there is no such problem in its > round robin strategy. It simply considers consumers one by one for each > partition assignment, and when one consumer is assigned a partition, the > next assignment starts with considering the next consumer in the list (and > not the same consumer that was just assigned). This removes the > possibility of the issue reported in KAFKA-2019 surfacing in the new > consumer. In the sticky strategy we do not have this issue either, since > every time an assignment is about to happen we start with the consumer > with least number of assignments. So we will not have a scenario where a > consumer is repeated assigned partitions as in KAFKA-2019 (unless that > consumer is lagging behind other consumers on the number of partitions > assigned). Thanks for checking into this. I think the other factor is that the round robin assignor sorts the consumers using the id given them by the coordinator, which at the moment looks like this: "{clientId}-{uuid}". So if the group uses a common clientId, then it shouldn't usually be the case that two consumers on the same host get ordered together. We could actually change the order of these fields in a compatible way if we didn't like the dependence on the clientId. It seems anyway that the sticky assignor is not needed to deal with this problem. Even though consumer groups are usually stable, it might be the case that > consumers do not initially join the group at the same time. The sticky > strategy in that situation lets those who joined earlier stick to their > partitions to some extent (assuming fairness take precedence over > stickiness). In terms of specific use cases, Andrew touched on examples of > how Kafka can benefit from a sticky assignor. I could add those to the KIP > if you also think they help building the case in favor of sticky assignor. > I agree with you about the downside and I'll make sure I add that to the > KIP as you suggested. Yep, I agree that it helps in some situations, but I think the impact is amortized over the life of the group. It also takes a bit more work to explain this to users and may require them to change their usage pattern a little bit. I think we expect users to do something like the following in their rebalance listener: class MyRebalanceListener { void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { commitOffsets(partition); cleanupState(partition); } } void onPartitionsAssigned(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { initializeState(partition); initializeOffset(partition); } } } This is fairly intuitive, but if you use this pattern, then sticky assignment doesn't give you anything because you always cleanup state prior to the rebalance. Instead you need to do something like this: class MyRebalanceListener { Collection<TopicPartition> lastAssignment = Collections.emptyList(); void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { commitOffsets(partition); } } void onPartitionsAssigned(Collection<TopicPartition> assignment) { for (TopicPartition partition : difference(lastAssignment, assignment) { cleanupState(partition); } for (TopicPartition partition : difference(assignment, lastAssignment) { initializeState(partition); } for (TopicPartition partition : assignment) { initializeOffset(partition); } this.lastAssignment = assignment; } } This seems harder to explain and probably is the reason why Andy was suggesting that it would be more ideal if we could simply skip the call to onRevoked() if the partitions remain assigned to the consumer after the rebalance. Unfortunately, the need to commit offsets prior to rebalancing makes this tricky. The other option suggested by Andy would be to introduce a third method in the rebalance listener (e.g. doOffsetCommit(partitions)). Then the consumer would call doOffsetCommit() prior to every rebalance, but only invoke onPartitionsRevoked() when partitions have actually been assigned to another consumer following the rebalance. Either way, we're making the API more complex, which would be nice to avoid unless really necessary. Overall, I think my feeling at the moment is that the sticky assignor is a nice improvement over the currently available assignors, but the gain seems a little marginal and maybe not worth the cost of the complexity mentioned above. It's not a strong feeling though and it would be nice to hear what others think. The other thing worth mentioning is that we've talked a few times in the past about the concept of "partial rebalancing," which would allow the group to reassign only a subset of the partitions it was consuming. This would let part of the group continue consuming while the group is rebalancing. We don't have any proposals ready to support this, but if we want to have this long term, then it might reduce some of the benefit provided by the sticky assignor. Thanks, Jason On Thu, Jun 23, 2016 at 5:04 PM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > Thank you Andy for your feedback on the KIP. > > I agree with Jason on the responses he provided below. > > If we give precedence to fairness over stickiness there is no assumption > that can be made about which assignment would remain and which would be > revoked. > If we give precedence to stickiness over fairness, we can be sure that all > existing valid assignments (those with their topic partition still valid) > would remain. > > I'll add your example to the KIP, but this is how it should work with > sticky assignor: > > We have two consumers C0, C1 and two topics t0, t1 each with 2 partitions. > Therefore, the partitions are t0p0, t0p1, t1p0, t1p1. Let's assume the two > consumers are subscribed to both t0 and t1. > The assignment using the stick assignor will be: > * C0: [t0p0, t1p0] > * C1: [t0p1, t1p1] > > Now if we add C2 (subscribed to both topics), this is what we get: > * C0: [t1p0] > * C1: [t0p1, t1p1] > * C2: [t0p0] > > I think both range and round robin assignors would produce this: > * C0: [t0p0, t1p1] > * C1: [t0p1] > * C2: [t1p0] > > Regards, > --Vahid > > > > > From: Jason Gustafson <ja...@confluent.io> > To: dev@kafka.apache.org > Date: 06/23/2016 10:06 AM > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Hey Andy, > > Thanks for jumping in. A couple comments: > > In addition, I think it is important that during a rebalance consumers do > > not first have all partitions revoked, only to have a very similar, (or > the > > same!), set reassigned. This is less than initiative and complicates > client > > code unnecessarily. Instead, the `ConsumerPartitionListener` should only > be > > called for true changes in assignment I.e. any new partitions assigned > and > > any existing ones revoked, when comparing the new assignment to the > > previous one. > > > The problem is that the revocation callback is called before you know what > the assignment for the next generation will be. This is necessary for the > consumer to be able to commit offsets for its assigned partitions. Once > the > consumer has a new assignment, it is no longer safe to commit offsets from > the previous generation. Unless sticky assignment can give us some > guarantee on which partitions will remain after the rebalance, all of them > must be included in the revocation callback. > > > > There is one last scenario I'd like to highlight that I think the KIP > > should describe: say you have a group consuming from two topics, each > topic > > with two partitions. As of 0.9.0.1 the maximum number of consumers you > can > > have is 2, not 4. With 2 consumers each will get one partition from each > > topic. A third consumer with not have any partitions assigned. This > should > > be fixed by the 'fair' part of the strategy, but it would be good to see > > this covered explicitly in the KIP. > > > This would be true for range assignment, but with 4 partitions total, > round-robin assignment would give one partition to each of the 4 consumers > (assuming subscriptions match). > > Thanks, > Jason > > > On Thu, Jun 23, 2016 at 1:42 AM, Andrew Coates <big.andy.coa...@gmail.com> > wrote: > > > Hi all, > > > > I think sticky assignment is immensely important / useful in many > > situations. Apps that use Kafka are many and varied. Any app that stores > > any state, either in the form of data from incoming messages, cached > > results from previous out-of-process calls or expensive operations, (and > > let's face it, that's most!), can see a big negative impact from > partition > > movement. > > > > The main issue partition movement brings is that it makes building > elastic > > services very hard. Consider: you've got an app consuming from Kafka > that > > locally caches data to improve performance. You want the app to auto > scale > > as the throughout to the topic(s) increases. Currently, when one or > more > > new instance are added and the group rebalances, all existing instances > > have all partitions revoked, and then a new, potentially quite > different, > > set assigned. An intuitive pattern is to evict partition state, I.e. the > > cached data, when a partition is revoked. So in this case all apps flush > > their entire cache causing throughput to drop massively, right when you > > want to increase it! > > > > Even if the app is not flushing partition state when partitions are > > revoked, the lack of a 'sticky' strategy means that a proportion of the > > cached state is now useless, and instances have partitions assigned for > > which they have no cached state, again negatively impacting throughout. > > > > With a 'sticky' strategy throughput can be maintained and indeed > increased, > > as intended. > > > > The same is also true in the presence of failure. An instance failing, > > (maybe due to high load), can invalidate the caching of existing > instances, > > negatively impacting throughout of the remaining instances, (possibly at > a > > time the system needs throughput the most!) > > > > My question would be 'why move partitions if you don't have to?'. I will > > certainly be setting the 'sticky' assignment strategy as the default > once > > it's released, and I have a feeling it will become the default in the > > communitie's 'best-practice' guides. > > > > In addition, I think it is important that during a rebalance consumers > do > > not first have all partitions revoked, only to have a very similar, (or > the > > same!), set reassigned. This is less than initiative and complicates > client > > code unnecessarily. Instead, the `ConsumerPartitionListener` should only > be > > called for true changes in assignment I.e. any new partitions assigned > and > > any existing ones revoked, when comparing the new assignment to the > > previous one. > > > > I think the change to how the client listener is called should be part > of > > this work. > > > > There is one last scenario I'd like to highlight that I think the KIP > > should describe: say you have a group consuming from two topics, each > topic > > with two partitions. As of 0.9.0.1 the maximum number of consumers you > can > > have is 2, not 4. With 2 consumers each will get one partition from each > > topic. A third consumer with not have any partitions assigned. This > should > > be fixed by the 'fair' part of the strategy, but it would be good to see > > this covered explicitly in the KIP. > > > > Thanks, > > > > > > Andy > > > > > > > > > > > > > > > > > > On Thu, 23 Jun 2016, 00:41 Jason Gustafson, <ja...@confluent.io> wrote: > > > > > Hey Vahid, > > > > > > Thanks for the updates. I think the lack of comments on this KIP > suggests > > > that the motivation might need a little work. Here are the two main > > > benefits of this assignor as I see them: > > > > > > 1. It can give a more balanced assignment when subscriptions do not > match > > > in a group (this is the same problem solved by KIP-49). > > > 2. It potentially allows applications to save the need to cleanup > > partition > > > state when rebalancing since partitions are more likely to stay > assigned > > to > > > the same consumer. > > > > > > Does that seem right to you? > > > > > > I think it's unclear how serious the first problem is. Providing > better > > > balance when subscriptions differ is nice, but are rolling updates the > > only > > > scenario where this is encountered? Or are there more general use > cases > > > where differing subscriptions could persist for a longer duration? I'm > > also > > > wondering if this assignor addresses the problem found in KAFKA-2019. > It > > > would be useful to confirm whether this problem still exists with the > new > > > consumer's round robin strategy and how (whether?) it is addressed by > > this > > > assignor. > > > > > > The major selling point seems to be the second point. This is > definitely > > > nice to have, but would you expect a lot of value in practice since > > > consumer groups are usually assumed to be stable? It might help to > > describe > > > some specific use cases to help motivate the proposal. One of the > > downsides > > > is that it requires users to restructure their code to get any benefit > > from > > > it. In particular, they need to move partition cleanup out of the > > > onPartitionsRevoked() callback and into onPartitionsAssigned(). This > is a > > > little awkward and will probably make explaining the consumer more > > > difficult. It's probably worth including a discussion of this point in > > the > > > proposal with an example. > > > > > > Thanks, > > > Jason > > > > > > > > > > > > On Tue, Jun 7, 2016 at 4:05 PM, Vahid S Hashemian < > > > vahidhashem...@us.ibm.com > > > > wrote: > > > > > > > Hi Jason, > > > > > > > > I updated the KIP and added some details about the user data, the > > > > assignment algorithm, and the alternative strategies to consider. > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > > > > > > Please let me know if I missed to add something. Thank you. > > > > > > > > Regards, > > > > --Vahid > > > > > > > > > > > > > > > > > > > > > >