>From what I understand, the "largest number of partitions" trick is based on the assumption that topics can only expand their partitions. What happens when a topic gets deleted and recreated? This breaks that assumption.
On Fri, Aug 28, 2015 at 6:33 AM, Neha Narkhede <n...@confluent.io> wrote: > Thanks for re-reviewing Joel. > > > > > > > On Fri, Aug 28, 2015 at 2:51 AM -0700, "Joel Koshy" <jjkosh...@gmail.com> > wrote: > > > > > > > > > > > > I think we think this proposal addresses 100% of the split brain issues > > ever seen in the ZK-based protocol, but I think you think there are still > > issues. Can you explain what your thinking of and when you think it would > > happen? I want to make sure you aren't assuming client-side=>split-brain > > since I think that is totally not the case. > > Yes I had concluded that client-side assignment would still result in > split-brain wrt partition counts, but I overlooked a key sentence in > the wiki - i.e., that the assignment algorithm for consumers can just > use the largest number of partitions for each topic reported by any of > the consumers. i.e., I assumed that consumers would just fail > rebalance if the partition counts were inconsistent but that is not > the case since this conflict can be easily resolved as described > without further join-group requests. Sorry about that. There is still > the issue of the coordinator having to send back n*m worth of > metadata, but that was not my biggest concern. I'll look over it again > and reply back tomorrow. > > Joel > > On Thu, Aug 27, 2015 at 2:55 PM, Jay Kreps wrote: > > Hey Joel, > > > > I really don't think we should do both. There are pros and cons but we > > should make a decision and work on operationalizing one approach. Much of > > really making something like this work is getting all the bugs out, > getting > > monitoring in place, getting rigorous system tests in place. Trying to do > > those things twice with the same resources will just mean we do them half > > as well. I also think this buys nothing from the user's point of > view--they > > want co-ordination that works correctly, the debate we are having is > purely > > a "how should we build that" debate. So this is really not the kind of > > thing we'd want to make pluggable and if we did that would just > complicate > > life for the user. > > > > I think we think this proposal addresses 100% of the split brain issues > > ever seen in the ZK-based protocol, but I think you think there are still > > issues. Can you explain what your thinking of and when you think it would > > happen? I want to make sure you aren't assuming client-side=>split-brain > > since I think that is totally not the case. > > > > With respect to "herd issues" I actually think all the proposals address > > this by scaling the co-ordinator out to all nodes and making the > > co-ordination vastly cheaper. No proposal, of course, gets rid of the > fact > > that all clients rejoin at once when there is a membership change, but > that > > is kind of fundamental to the problem. > > > > -Jay > > > > On Thu, Aug 27, 2015 at 2:02 PM, Joel Koshy wrote: > > > >> I actually feel these set of tests (whatever they may be) are somewhat > >> irrelevant here. My main concern with the current client-side proposal > >> (i.e., without Becket's follow-up suggestions) is that it makes a > >> significant compromise to the original charter of the new consumer - > >> i.e., reduce/eliminate herd and split brain problems in both group > >> management and partition assignment. I understand the need for > >> client-side partition assignment in some use cases (which we are also > >> interested in), but I also think we should make every effort to keep > >> full server-side coordination for the remaining (majority) of use > >> cases especially if it does not complicate the protocol. The proposed > >> changes do not complicate the protocol IMO - i.e., there is no further > >> modification to the request/response formats beyond the current > >> client-side proposal. It only involves a trivial reinterpretation of > >> the content of the protocol metadata field. > >> > >> Joel > >> > >> On Wed, Aug 26, 2015 at 9:33 PM, Neha Narkhede wrote: > >> > Hey Becket, > >> > > >> > In that case, the broker side partition assignment would be ideal > because > >> >> it avoids > >> >> issues like metadata inconsistency / split brain / exploding > >> subscription > >> >> set propagation. > >> > > >> > > >> > As per our previous discussions regarding each of those concerns > >> (referring > >> > to this email thread, KIP calls and JIRA comments), we are going to > run a > >> > set of tests using the LinkedIn deployment numbers that we will wait > for > >> > you to share. The purpose is to see if those concerns are really > valid or > >> > not. I'd prefer to see that before making any more changes that will > >> > complicate the protocol. > >> > > >> > On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin > > > >> > wrote: > >> > > >> >> Hi folks, > >> >> > >> >> After further discussion in LinkedIn, we found that while having a > more > >> >> general group management protocol is very useful, the vast majority > of > >> the > >> >> clients will not use customized partition assignment strategy. In > that > >> >> case, the broker side partition assignment would be ideal because it > >> avoids > >> >> issues like metadata inconsistency / split brain / exploding > >> subscription > >> >> set propagation. > >> >> > >> >> So we have the following proposal that satisfies the majority of the > >> >> clients' needs without changing the currently proposed binary > protocol. > >> >> i.e., Continue to support broker-side assignment if the assignment > >> strategy > >> >> is recognized by the coordinator. > >> >> > >> >> 1. Keep the binary protocol as currently proposed. > >> >> > >> >> 2. Change the way we interpret ProtocolMetadata: > >> >> 2.1 On consumer side, change partition.assignment.strategy to > >> >> partition.assignor.class. Implement the something like the following > >> >> PartitionAssignor Interface: > >> >> > >> >> public interface PartitionAssignor { > >> >> List protocolTypes(); > >> >> byte[] protocolMetadata(); > >> >> // return the Topic->List map that are assigned to this > >> >> consumer. > >> >> List assignPartitions(String protocolType, byte[] > >> >> responseProtocolMetadata); > >> >> } > >> >> > >> >> public abstract class AbstractPartitionAssignor implements > >> >> PartitionAssignor { > >> >> protected final KafkaConsumer consumer; > >> >> AbstractPartitionAssignor(KafkaConsumer consumer) { > >> >> this.consumer = consumer; > >> >> } > >> >> } > >> >> > >> >> 2.2 The ProtocolMetadata in JoinGroupRequest will be > >> >> partitionAssignor.protocolMetadata(). When partition.assignor.class > is > >> >> "range" or "roundrobin", the ProtocolMetadata in JoinGroupRequest > will > >> be a > >> >> JSON subscription set. ("range", "roundrobin" will be reserved > words, we > >> >> can also consider reserving some Prefix such as "broker-" to be more > >> clear) > >> >> 2.3 On broker side when ProtocolType is "range" or "roundroubin", > >> >> coordinator will parse the ProtocolMetadata in the JoinGroupRequest > and > >> >> assign the partitions for consumers. In the JoinGroupResponse, the > >> >> ProtocolMetadata will be the global assignment of partitions. > >> >> 2.4 On client side, after receiving the JoinGroupResponse, > >> >> partitionAssignor.assignPartitions() will be invoked to return the > >> actual > >> >> assignment. If the assignor is RangeAssignor or RoundRobinAssignor, > they > >> >> will parse the assignment from the ProtocolMetadata returned by > >> >> coordinator. > >> >> > >> >> This approach has a few merits: > >> >> 1. Does not change the proposed binary protocol, which is still > general. > >> >> 2. The majority of the consumers will not suffer from inconsistent > >> metadata > >> >> / split brain / exploding subscription set propagation. This is > >> >> specifically to deal with the issue that the current proposal caters > to > >> a > >> >> 20% use-case while adversely impacting the more common 80% use-cases. > >> >> 3. Easy to implement. The only thing needed is implement a > partitioner > >> >> class. For most users, the default range and roundrobin partitioner > are > >> >> good enough. > >> >> > >> >> Thoughts? > >> >> > >> >> Thanks, > >> >> > >> >> Jiangjie (Becket) Qin > >> >> > >> >> On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson > >> >> wrote: > >> >> > >> >> > Follow-up from the kip call: > >> >> > > >> >> > 1. Onur brought up the question of whether this protocol provides > >> enough > >> >> > coordination capabilities to be generally useful in practice (is > that > >> >> > accurate, Onur?). If it doesn't, then each use case would probably > >> need a > >> >> > dependence on zookeeper anyway, and we haven't really gained > anything. > >> >> The > >> >> > group membership provided by this protocol is a useful primitive > for > >> >> > coordination, but it's limited in the sense that everything shared > >> among > >> >> > the group has to be communicated at the time the group is created. > If > >> any > >> >> > shared data changes, then the only way the group can ensure > agreement > >> is > >> >> to > >> >> > force a rebalance. This is expensive since all members must stall > >> while > >> >> the > >> >> > rebalancing takes place. As we have also seen, there is a practical > >> limit > >> >> > on the amount of metadata that can be sent through this protocol > when > >> >> > groups get a little larger. This protocol is therefore not > suitable to > >> >> > cases which require frequent communication or which require a large > >> >> amount > >> >> > of data to be communicated. For the use cases listed on the wiki, > >> neither > >> >> > of these appear to be an issue, but there may be other limitations > >> which > >> >> > would limit reuse of the protocol. Perhaps it would be sufficient > to > >> >> sketch > >> >> > how these cases might work? > >> >> > > >> >> > 2. We talked a little bit about the issue of metadata churn. Becket > >> >> brought > >> >> > up the interesting point that not only do we depend on topic > metadata > >> >> > changing relatively infrequently, but we also expect timely > agreement > >> >> among > >> >> > the brokers on what that metadata is. To resolve this, we can have > the > >> >> > consumers fetch metadata from the coordinator. We still depend on > >> topic > >> >> > metadata not changing frequently, but this should resolve any > >> >> disagreement > >> >> > among the brokers themselves. In fact, since we expect that > >> disagreement > >> >> is > >> >> > relatively rare, we can have the consumers fetch from the > coordinator > >> >> only > >> >> > when when a disagreement occurs. The nice thing about this > proposal is > >> >> that > >> >> > it doesn't affect the join group semantics, so the coordinator > would > >> >> remain > >> >> > oblivious to the metadata used by the group for agreement. Also, if > >> >> > metadata churn becomes an issue, it might be possible to have the > >> >> > coordinator provide a snapshot for the group to ensure that a > >> generation > >> >> > would be able to reach agreement (this would probably require > adding > >> >> > groupId/generation to the metadata request). > >> >> > > >> >> > 3. We talked briefly about support for multiple protocols in the > join > >> >> group > >> >> > request in order to allow changing the assignment strategy without > >> >> > downtime. I think it's a little doubtful that this would get much > use > >> in > >> >> > practice, but I agree it's a nice option to have on the table. An > >> >> > alternative, for the sake of argument, is to have each member > provide > >> >> only > >> >> > one version of the protocol, and to let the coordinator choose the > >> >> protocol > >> >> > with the largest number of supporters. All members which can't > support > >> >> the > >> >> > selected protocol would be kicked out of the group. The drawback > in a > >> >> > rolling upgrade is that the total capacity of the group would be > >> >> > momentarily halved. It would also be a little tricky to handle the > >> case > >> >> of > >> >> > retrying when a consumer is kicked out of the group. We wouldn't > want > >> it > >> >> to > >> >> > be able to effect a rebalance, for example, if it would just be > kicked > >> >> out > >> >> > again. That would probably complicate the group management logic on > >> the > >> >> > coordinator. > >> >> > > >> >> > > >> >> > Thanks, > >> >> > Jason > >> >> > > >> >> > > >> >> > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin > >> > >> > > >> >> > wrote: > >> >> > > >> >> > > Jun, > >> >> > > > >> >> > > Yes, I agree. If the metadata can be synced quickly there should > >> not be > >> >> > an > >> >> > > issue. It just occurred to me that there is a proposal to allow > >> >> consuming > >> >> > > from followers in ISR, that could potentially cause more frequent > >> >> > metadata > >> >> > > change for consumers. Would that be an issue? > >> >> > > > >> >> > > Thanks, > >> >> > > > >> >> > > Jiangjie (Becket) Qin > >> >> > > > >> >> > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson < > >> ja...@confluent.io> > >> >> > > wrote: > >> >> > > > >> >> > > > Hi Jun, > >> >> > > > > >> >> > > > Answers below: > >> >> > > > > >> >> > > > 1. When there are multiple common protocols in the > >> JoinGroupRequest, > >> >> > > which > >> >> > > > one would the coordinator pick? > >> >> > > > > >> >> > > > I was intending to use the list to indicate preference. If all > >> group > >> >> > > > members support protocols ["A", "B"] in that order, then we > will > >> >> choose > >> >> > > > "A." If some support ["B", "A"], then we would either choose > >> based on > >> >> > > > respective counts or just randomly. The main use case of > >> supporting > >> >> the > >> >> > > > list is for rolling upgrades when a change is made to the > >> assignment > >> >> > > > strategy. In that case, the new assignment strategy would be > >> listed > >> >> > first > >> >> > > > in the upgraded client. I think it's debatable whether this > >> feature > >> >> > would > >> >> > > > get much use in practice, so we might consider dropping it. > >> >> > > > > >> >> > > > 2. If the protocols don't agree, the group construction fails. > >> What > >> >> > > exactly > >> >> > > > does it mean? Do we send an error in every JoinGroupResponse > and > >> >> remove > >> >> > > all > >> >> > > > members in the group in the coordinator? > >> >> > > > > >> >> > > > Yes, that is right. It would be handled similarly to > inconsistent > >> >> > > > assignment strategies in the current protocol. The coordinator > >> >> returns > >> >> > an > >> >> > > > error in each join group response, and the client propagates > the > >> >> error > >> >> > to > >> >> > > > the user. > >> >> > > > > >> >> > > > 3. Consumer embedded protocol: The proposal has two different > >> formats > >> >> > of > >> >> > > > subscription depending on whether wildcards are used or not. > This > >> >> > seems a > >> >> > > > bit complicated. Would it be better to always use the metadata > >> hash? > >> >> > The > >> >> > > > clients know the subscribed topics already. This way, the > client > >> code > >> >> > > > behaves the same whether wildcards are used or not. > >> >> > > > > >> >> > > > Yeah, I think this is possible (Neha also suggested it). I > haven't > >> >> > > updated > >> >> > > > the wiki yet, but the patch I started working on uses only the > >> >> metadata > >> >> > > > hash. In the case that an explicit topic list is provided, the > >> hash > >> >> > just > >> >> > > > covers the metadata for those topics. > >> >> > > > > >> >> > > > > >> >> > > > Thanks, > >> >> > > > Jason > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao > >> wrote: > >> >> > > > > >> >> > > > > Jason, > >> >> > > > > > >> >> > > > > Thanks for the writeup. A few comments below. > >> >> > > > > > >> >> > > > > 1. When there are multiple common protocols in the > >> >> JoinGroupRequest, > >> >> > > > which > >> >> > > > > one would the coordinator pick? > >> >> > > > > 2. If the protocols don't agree, the group construction > fails. > >> What > >> >> > > > exactly > >> >> > > > > does it mean? Do we send an error in every JoinGroupResponse > and > >> >> > remove > >> >> > > > all > >> >> > > > > members in the group in the coordinator? > >> >> > > > > 3. Consumer embedded protocol: The proposal has two different > >> >> formats > >> >> > > of > >> >> > > > > subscription depending on whether wildcards are used or not. > >> This > >> >> > > seems a > >> >> > > > > bit complicated. Would it be better to always use the > metadata > >> >> hash? > >> >> > > The > >> >> > > > > clients know the subscribed topics already. This way, the > client > >> >> code > >> >> > > > > behaves the same whether wildcards are used or not. > >> >> > > > > > >> >> > > > > Jiangjie, > >> >> > > > > > >> >> > > > > With respect to rebalance churns due to topics being > >> >> created/deleted. > >> >> > > > With > >> >> > > > > the new consumer, the rebalance can probably settle within > 200ms > >> >> when > >> >> > > > there > >> >> > > > > is a topic change. So, as long as we are not changing topic > more > >> >> > than 5 > >> >> > > > > times per sec, there shouldn't be constant churns, right? > >> >> > > > > > >> >> > > > > Thanks, > >> >> > > > > > >> >> > > > > Jun > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson < > >> >> ja...@confluent.io > >> >> > > > >> >> > > > > wrote: > >> >> > > > > > >> >> > > > > > Hi Kafka Devs, > >> >> > > > > > > >> >> > > > > > One of the nagging issues in the current design of the new > >> >> consumer > >> >> > > has > >> >> > > > > > been the need to support a variety of assignment > strategies. > >> >> We've > >> >> > > > > > encountered this in particular in the design of copycat and > >> the > >> >> > > > > processing > >> >> > > > > > framework (KIP-28). From what I understand, Samza also has > a > >> >> number > >> >> > > of > >> >> > > > > use > >> >> > > > > > cases with custom assignment needs. The new consumer > protocol > >> >> > > supports > >> >> > > > > new > >> >> > > > > > assignment strategies by hooking them into the broker. For > >> many > >> >> > > > > > environments, this is a major pain and in some cases, a > >> >> > non-starter. > >> >> > > It > >> >> > > > > > also challenges the validation that the coordinator can > >> provide. > >> >> > For > >> >> > > > > > example, some assignment strategies call for partitions to > be > >> >> > > assigned > >> >> > > > > > multiple times, which means that the coordinator can only > >> check > >> >> > that > >> >> > > > > > partitions have been assigned at least once. > >> >> > > > > > > >> >> > > > > > To solve these issues, we'd like to propose moving > assignment > >> to > >> >> > the > >> >> > > > > > client. I've written a wiki which outlines some protocol > >> changes > >> >> to > >> >> > > > > achieve > >> >> > > > > > this: > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal > >> >> > > > > > . > >> >> > > > > > To summarize briefly, instead of the coordinator assigning > the > >> >> > > > partitions > >> >> > > > > > itself, all subscriptions are forwarded to each member of > the > >> >> group > >> >> > > > which > >> >> > > > > > then decides independently which partitions it should > consume. > >> >> The > >> >> > > > > protocol > >> >> > > > > > provides a mechanism for the coordinator to validate that > all > >> >> > > consumers > >> >> > > > > use > >> >> > > > > > the same assignment strategy, but it does not ensure that > the > >> >> > > resulting > >> >> > > > > > assignment is "correct." This provides a powerful > capability > >> for > >> >> > > users > >> >> > > > to > >> >> > > > > > control the full data flow on the client side. They control > >> how > >> >> > data > >> >> > > is > >> >> > > > > > written to partitions through the Partitioner interface and > >> they > >> >> > > > control > >> >> > > > > > how data is consumed through the assignment strategy, all > >> without > >> >> > > > > touching > >> >> > > > > > the server. > >> >> > > > > > > >> >> > > > > > Of course nothing comes for free. In particular, this > change > >> >> > removes > >> >> > > > the > >> >> > > > > > ability of the coordinator to validate that commits are > made > >> by > >> >> > > > consumers > >> >> > > > > > who were assigned the respective partition. This might not > be > >> too > >> >> > bad > >> >> > > > > since > >> >> > > > > > we retain the ability to validate the generation id, but it > >> is a > >> >> > > > > potential > >> >> > > > > > concern. We have considered alternative protocols which > add a > >> >> > second > >> >> > > > > > round-trip to the protocol in order to give the coordinator > >> the > >> >> > > ability > >> >> > > > > to > >> >> > > > > > confirm the assignment. As mentioned above, the > coordinator is > >> >> > > somewhat > >> >> > > > > > limited in what it can actually validate, but this would > >> return > >> >> its > >> >> > > > > ability > >> >> > > > > > to validate commits. The tradeoff is that it increases the > >> >> > protocol's > >> >> > > > > > complexity which means more ways for the protocol to fail > and > >> >> > > > > consequently > >> >> > > > > > more edge cases in the code. > >> >> > > > > > > >> >> > > > > > It also misses an opportunity to generalize the group > >> membership > >> >> > > > protocol > >> >> > > > > > for additional use cases. In fact, after you've gone to the > >> >> trouble > >> >> > > of > >> >> > > > > > moving assignment to the client, the main thing that is > left > >> in > >> >> > this > >> >> > > > > > protocol is basically a general group management > capability. > >> This > >> >> > is > >> >> > > > > > exactly what is needed for a few cases that are currently > >> under > >> >> > > > > discussion > >> >> > > > > > (e.g. copycat or single-writer producer). We've taken this > >> >> further > >> >> > > step > >> >> > > > > in > >> >> > > > > > the proposal and attempted to envision what that general > >> protocol > >> >> > > might > >> >> > > > > > look like and how it could be used both by the consumer and > >> for > >> >> > some > >> >> > > of > >> >> > > > > > these other cases. > >> >> > > > > > > >> >> > > > > > Anyway, since time is running out on the new consumer, we > have > >> >> > > perhaps > >> >> > > > > one > >> >> > > > > > last chance to consider a significant change in the > protocol > >> like > >> >> > > this, > >> >> > > > > so > >> >> > > > > > have a look at the wiki and share your thoughts. I've no > doubt > >> >> that > >> >> > > > some > >> >> > > > > > ideas seem clearer in my mind than they do on paper, so ask > >> >> > questions > >> >> > > > if > >> >> > > > > > there is any confusion. > >> >> > > > > > > >> >> > > > > > Thanks! > >> >> > > > > > Jason > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > > >> > > >> > > >> > -- > >> > Thanks, > >> > Neha > >> >