Hi Jason, Thanks for reviewing the KIP. I will add the details you requested, but to summarize:
Regarding the structure of the user data: Right now the user data will have the current assignments only which is a mapping of consumers to their assigned topic partitions. Is this mapping what you're also suggesting with CurrentAssignment field? I see how adding a version (as sticky assignor version) will be useful. Also how having a protocol name would be useful, perhaps for validation. But could you clarify the "Subscription" field and how you think it'll come into play? Regarding the algorithm: There could be similarities between how this KIP is implemented and how KIP-49 is handling the fairness. But since we had to take stickiness into consideration we started fresh and did not adopt from KIP-49. The Sticky assignor implementation is comprehensive and guarantees the fairest possible assignment with highest stickiness. I even have a unit test that randomly generates an assignment problem and verifies that a fair and sticky assignment is calculated. KIP-54 gives priority to fairness over stickiness (which makes the implementation more complex). We could have another strategy that gives priority to stickiness over fairness (which supposedly will have a better performance). The main distinction between KIP-54 and KIP-49 is that KIP-49 calculates the assignment without considering the previous assignments (fairness only); whereas for KIP-54 previous assignments play a big role (fairness and stickiness). I believe if there is a situation where the stickiness requirements do not exist it would make sense to use a fair-only assignment without the overhead of sticky assignment, as you mentioned. So, I could see three different strategies that could enrich assignment policy options. It would be great to have some feedback from the community about what is the best way to move forward with these two KIPs. In the meantime, I'll add some more details in the KIP about the approach for calculating assignments. Thanks again. Regards, --Vahid From: Jason Gustafson <ja...@confluent.io> To: dev@kafka.apache.org Date: 06/06/2016 01:26 PM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hi Vahid, Can you add some detail to the KIP on the structure of the user data? I'm guessing it would be something like this: ProtocolName => "sticky" ProtocolMetadata => Version Subscription UserData Version => int16 Subscription => [Topic] Topic => string UserData => CurrentAssignment CurrentAssignment => [Topic [Partition]] Topic => string Partiton => int32 It would also be helpful to include a little more detail on the algorithm. >From what I can tell, it looks like you're adopting some of the strategies from KIP-49 to handle differing subscriptions better. If so, then I wonder if it makes sense to combine the two KIPs? Or do you think there would be an advantage to having the "fair" assignment strategy without the overhead of the sticky assignor? Thanks, Jason On Fri, Jun 3, 2016 at 11:33 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Sorry for being late on this thread. > > The assign() function is auto-triggered during the rebalance by one of the > consumers when it receives all subscription information collected from the > server-side coordinator. > > More details can be found here: > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol > > As for Kafka Streams, they way it did "stickiness" is by 1) let all > consumers put their current assigned topic-partitions and server ids into > the "metadata" field of the JoinGroupRequest, 2) when the selected consumer > triggers assign() along with all the subscriptions as well as their > metadata, it can parse the metadata to learn about the existing assignment > map; and hence when making the new assignment it will try to assign > partitions to its current owners "with best effort". > > > Hope this helps. > > > Guozhang > > > On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian < > vahidhashem...@us.ibm.com> wrote: > > > Hi Guozhang, > > > > I was looking at the implementation of StreamsPartitionAssignor through > > its unit tests and expected to find some tests that > > - verify stickiness by making at least two calls to the assign() method > > (so we check the second assign() call output preserves the assignments > > coming from the first assign() call output); or > > - start off by a preset assignment, call assign() after some subscription > > change, and verify the previous assignment are preserved. > > But none of the methods seem to do these. Did I overlook them, or > > stickiness is being tested in some other fashion? > > > > Also, if there is a high-level write-up about how this assignor works > > could you please point me to it? Thanks. > > > > Regards. > > --Vahid > > > > > > > > > > From: Guozhang Wang <wangg...@gmail.com> > > To: "dev@kafka.apache.org" <dev@kafka.apache.org> > > Date: 05/02/2016 10:34 AM > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > > > > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing > > some sort of sticky partitioning mechanism. This is done through the > > userData field though; i.e. all group members send their current > "assigned > > partitions" in their join group request, which will be grouped and send > to > > the leader, the leader then does best-effort for sticky-partitioning. > > > > > > Guozhang > > > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava < > e...@confluent.io> > > wrote: > > > > > I think I'm unclear how we leverage the > > > onPartitionsRevoked/onPartitionsAssigned here in any way that's > > different > > > from our normal usage -- certainly you can use them to generate a diff, > > but > > > you still need to commit when partitions are revoked and that has a > > > non-trivial cost. Are we just saying that you might be able to save > some > > > overhead, e.g. closing/reopening some other resources by doing a flush > > but > > > not a close() or something? You still need to flush any output and > > commit > > > offsets before returning from onPartitionsRevoked, right? Otherwise you > > > couldn't guarantee clean handoff of partitions. > > > > > > In terms of the rebalancing, the basic requirements in the KIP seem > > sound. > > > Passing previous assignment data via UserData also seems reasonable > > since > > > it avoids redistributing all assignment data to all members and doesn't > > > rely on the next generation leader being a member of the current > > > generation. Hopefully this shouldn't be surprising since I think I > > > discussed this w/ Jason before he updated the relevant wiki pages :) > > > > > > -Ewen > > > > > > > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < > > > vahidhashem...@us.ibm.com> wrote: > > > > > > > HI Jason, > > > > > > > > Thanks for your feedback. > > > > > > > > I believe your suggestion on how to take advantage of this assignor > is > > > > valid. We can leverage onPartitionsRevoked() and > > onPartitionsAssigned() > > > > callbacks and do a comparison of assigned partitions before and after > > the > > > > re-balance and do the cleanup only if there is a change (e.g., if > some > > > > previously assigned partition is not in the assignment). > > > > > > > > On your second question, a number of tests that I ran shows that the > > old > > > > assignments are preserved in the current implementation; except for > > when > > > > the consumer group leader is killed; in which case, a fresh > assignment > > is > > > > performed. This is something that needs to be fixed. I tried to use > > your > > > > pointers to find out where the best place is to preserve the old > > > > assignment in such circumstances but have not been able to pinpoint > > it. > > > If > > > > you have any suggestion on this please share. Thanks. > > > > > > > > Regards, > > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > > From: Jason Gustafson <ja...@confluent.io> > > > > To: dev@kafka.apache.org > > > > Date: 04/14/2016 11:37 AM > > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment > > Strategy > > > > > > > > > > > > > > > > Hi Vahid, > > > > > > > > Thanks for the proposal. I think one of the advantages of having > > sticky > > > > assignment would be reduce the need to cleanup local partition state > > > > between rebalances. Do you have any thoughts on how the user would > > take > > > > advantage of this assignor in the consumer to do this? Maybe one > > approach > > > > is to delay cleanup until you detect a change from the previous > > > assignment > > > > in the onPartitionsAssigned() callback? > > > > > > > > Also, can you provide some detail on how the sticky assignor works at > > the > > > > group protocol level? For example, do you pass old assignments > through > > > the > > > > "UserData" field in the consumer's JoinGroup? > > > > > > > > Thanks, > > > > Jason > > > > > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > > > > vahidhashem...@us.ibm.com> wrote: > > > > > > > > > Hi all, > > > > > > > > > > I have started a new KIP under > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > > > > > > > > The corresponding JIRA is at > > > > > https://issues.apache.org/jira/browse/KAFKA-2273 > > > > > The corresponding PR is at > https://github.com/apache/kafka/pull/1020 > > > > > > > > > > Your feedback is much appreciated. > > > > > > > > > > Regards, > > > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Ewen > > > > > > > > > > > -- > > -- Guozhang > > > > > > > > > > > > > -- > -- Guozhang >