I actually feel these set of tests (whatever they may be) are somewhat irrelevant here. My main concern with the current client-side proposal (i.e., without Becket's follow-up suggestions) is that it makes a significant compromise to the original charter of the new consumer - i.e., reduce/eliminate herd and split brain problems in both group management and partition assignment. I understand the need for client-side partition assignment in some use cases (which we are also interested in), but I also think we should make every effort to keep full server-side coordination for the remaining (majority) of use cases especially if it does not complicate the protocol. The proposed changes do not complicate the protocol IMO - i.e., there is no further modification to the request/response formats beyond the current client-side proposal. It only involves a trivial reinterpretation of the content of the protocol metadata field.
Joel On Wed, Aug 26, 2015 at 9:33 PM, Neha Narkhede <n...@confluent.io> wrote: > Hey Becket, > > In that case, the broker side partition assignment would be ideal because >> it avoids >> issues like metadata inconsistency / split brain / exploding subscription >> set propagation. > > > As per our previous discussions regarding each of those concerns (referring > to this email thread, KIP calls and JIRA comments), we are going to run a > set of tests using the LinkedIn deployment numbers that we will wait for > you to share. The purpose is to see if those concerns are really valid or > not. I'd prefer to see that before making any more changes that will > complicate the protocol. > > On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > >> Hi folks, >> >> After further discussion in LinkedIn, we found that while having a more >> general group management protocol is very useful, the vast majority of the >> clients will not use customized partition assignment strategy. In that >> case, the broker side partition assignment would be ideal because it avoids >> issues like metadata inconsistency / split brain / exploding subscription >> set propagation. >> >> So we have the following proposal that satisfies the majority of the >> clients' needs without changing the currently proposed binary protocol. >> i.e., Continue to support broker-side assignment if the assignment strategy >> is recognized by the coordinator. >> >> 1. Keep the binary protocol as currently proposed. >> >> 2. Change the way we interpret ProtocolMetadata: >> 2.1 On consumer side, change partition.assignment.strategy to >> partition.assignor.class. Implement the something like the following >> PartitionAssignor Interface: >> >> public interface PartitionAssignor { >> List<String> protocolTypes(); >> byte[] protocolMetadata(); >> // return the Topic->List<Partition> map that are assigned to this >> consumer. >> List<TopicPartition> assignPartitions(String protocolType, byte[] >> responseProtocolMetadata); >> } >> >> public abstract class AbstractPartitionAssignor implements >> PartitionAssignor { >> protected final KafkaConsumer consumer; >> AbstractPartitionAssignor(KafkaConsumer consumer) { >> this.consumer = consumer; >> } >> } >> >> 2.2 The ProtocolMetadata in JoinGroupRequest will be >> partitionAssignor.protocolMetadata(). When partition.assignor.class is >> "range" or "roundrobin", the ProtocolMetadata in JoinGroupRequest will be a >> JSON subscription set. ("range", "roundrobin" will be reserved words, we >> can also consider reserving some Prefix such as "broker-" to be more clear) >> 2.3 On broker side when ProtocolType is "range" or "roundroubin", >> coordinator will parse the ProtocolMetadata in the JoinGroupRequest and >> assign the partitions for consumers. In the JoinGroupResponse, the >> ProtocolMetadata will be the global assignment of partitions. >> 2.4 On client side, after receiving the JoinGroupResponse, >> partitionAssignor.assignPartitions() will be invoked to return the actual >> assignment. If the assignor is RangeAssignor or RoundRobinAssignor, they >> will parse the assignment from the ProtocolMetadata returned by >> coordinator. >> >> This approach has a few merits: >> 1. Does not change the proposed binary protocol, which is still general. >> 2. The majority of the consumers will not suffer from inconsistent metadata >> / split brain / exploding subscription set propagation. This is >> specifically to deal with the issue that the current proposal caters to a >> 20% use-case while adversely impacting the more common 80% use-cases. >> 3. Easy to implement. The only thing needed is implement a partitioner >> class. For most users, the default range and roundrobin partitioner are >> good enough. >> >> Thoughts? >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >> > Follow-up from the kip call: >> > >> > 1. Onur brought up the question of whether this protocol provides enough >> > coordination capabilities to be generally useful in practice (is that >> > accurate, Onur?). If it doesn't, then each use case would probably need a >> > dependence on zookeeper anyway, and we haven't really gained anything. >> The >> > group membership provided by this protocol is a useful primitive for >> > coordination, but it's limited in the sense that everything shared among >> > the group has to be communicated at the time the group is created. If any >> > shared data changes, then the only way the group can ensure agreement is >> to >> > force a rebalance. This is expensive since all members must stall while >> the >> > rebalancing takes place. As we have also seen, there is a practical limit >> > on the amount of metadata that can be sent through this protocol when >> > groups get a little larger. This protocol is therefore not suitable to >> > cases which require frequent communication or which require a large >> amount >> > of data to be communicated. For the use cases listed on the wiki, neither >> > of these appear to be an issue, but there may be other limitations which >> > would limit reuse of the protocol. Perhaps it would be sufficient to >> sketch >> > how these cases might work? >> > >> > 2. We talked a little bit about the issue of metadata churn. Becket >> brought >> > up the interesting point that not only do we depend on topic metadata >> > changing relatively infrequently, but we also expect timely agreement >> among >> > the brokers on what that metadata is. To resolve this, we can have the >> > consumers fetch metadata from the coordinator. We still depend on topic >> > metadata not changing frequently, but this should resolve any >> disagreement >> > among the brokers themselves. In fact, since we expect that disagreement >> is >> > relatively rare, we can have the consumers fetch from the coordinator >> only >> > when when a disagreement occurs. The nice thing about this proposal is >> that >> > it doesn't affect the join group semantics, so the coordinator would >> remain >> > oblivious to the metadata used by the group for agreement. Also, if >> > metadata churn becomes an issue, it might be possible to have the >> > coordinator provide a snapshot for the group to ensure that a generation >> > would be able to reach agreement (this would probably require adding >> > groupId/generation to the metadata request). >> > >> > 3. We talked briefly about support for multiple protocols in the join >> group >> > request in order to allow changing the assignment strategy without >> > downtime. I think it's a little doubtful that this would get much use in >> > practice, but I agree it's a nice option to have on the table. An >> > alternative, for the sake of argument, is to have each member provide >> only >> > one version of the protocol, and to let the coordinator choose the >> protocol >> > with the largest number of supporters. All members which can't support >> the >> > selected protocol would be kicked out of the group. The drawback in a >> > rolling upgrade is that the total capacity of the group would be >> > momentarily halved. It would also be a little tricky to handle the case >> of >> > retrying when a consumer is kicked out of the group. We wouldn't want it >> to >> > be able to effect a rebalance, for example, if it would just be kicked >> out >> > again. That would probably complicate the group management logic on the >> > coordinator. >> > >> > >> > Thanks, >> > Jason >> > >> > >> > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin <j...@linkedin.com.invalid >> > >> > wrote: >> > >> > > Jun, >> > > >> > > Yes, I agree. If the metadata can be synced quickly there should not be >> > an >> > > issue. It just occurred to me that there is a proposal to allow >> consuming >> > > from followers in ISR, that could potentially cause more frequent >> > metadata >> > > change for consumers. Would that be an issue? >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson <ja...@confluent.io> >> > > wrote: >> > > >> > > > Hi Jun, >> > > > >> > > > Answers below: >> > > > >> > > > 1. When there are multiple common protocols in the JoinGroupRequest, >> > > which >> > > > one would the coordinator pick? >> > > > >> > > > I was intending to use the list to indicate preference. If all group >> > > > members support protocols ["A", "B"] in that order, then we will >> choose >> > > > "A." If some support ["B", "A"], then we would either choose based on >> > > > respective counts or just randomly. The main use case of supporting >> the >> > > > list is for rolling upgrades when a change is made to the assignment >> > > > strategy. In that case, the new assignment strategy would be listed >> > first >> > > > in the upgraded client. I think it's debatable whether this feature >> > would >> > > > get much use in practice, so we might consider dropping it. >> > > > >> > > > 2. If the protocols don't agree, the group construction fails. What >> > > exactly >> > > > does it mean? Do we send an error in every JoinGroupResponse and >> remove >> > > all >> > > > members in the group in the coordinator? >> > > > >> > > > Yes, that is right. It would be handled similarly to inconsistent >> > > > assignment strategies in the current protocol. The coordinator >> returns >> > an >> > > > error in each join group response, and the client propagates the >> error >> > to >> > > > the user. >> > > > >> > > > 3. Consumer embedded protocol: The proposal has two different formats >> > of >> > > > subscription depending on whether wildcards are used or not. This >> > seems a >> > > > bit complicated. Would it be better to always use the metadata hash? >> > The >> > > > clients know the subscribed topics already. This way, the client code >> > > > behaves the same whether wildcards are used or not. >> > > > >> > > > Yeah, I think this is possible (Neha also suggested it). I haven't >> > > updated >> > > > the wiki yet, but the patch I started working on uses only the >> metadata >> > > > hash. In the case that an explicit topic list is provided, the hash >> > just >> > > > covers the metadata for those topics. >> > > > >> > > > >> > > > Thanks, >> > > > Jason >> > > > >> > > > >> > > > >> > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao <j...@confluent.io> wrote: >> > > > >> > > > > Jason, >> > > > > >> > > > > Thanks for the writeup. A few comments below. >> > > > > >> > > > > 1. When there are multiple common protocols in the >> JoinGroupRequest, >> > > > which >> > > > > one would the coordinator pick? >> > > > > 2. If the protocols don't agree, the group construction fails. What >> > > > exactly >> > > > > does it mean? Do we send an error in every JoinGroupResponse and >> > remove >> > > > all >> > > > > members in the group in the coordinator? >> > > > > 3. Consumer embedded protocol: The proposal has two different >> formats >> > > of >> > > > > subscription depending on whether wildcards are used or not. This >> > > seems a >> > > > > bit complicated. Would it be better to always use the metadata >> hash? >> > > The >> > > > > clients know the subscribed topics already. This way, the client >> code >> > > > > behaves the same whether wildcards are used or not. >> > > > > >> > > > > Jiangjie, >> > > > > >> > > > > With respect to rebalance churns due to topics being >> created/deleted. >> > > > With >> > > > > the new consumer, the rebalance can probably settle within 200ms >> when >> > > > there >> > > > > is a topic change. So, as long as we are not changing topic more >> > than 5 >> > > > > times per sec, there shouldn't be constant churns, right? >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > >> > > > > >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson < >> ja...@confluent.io >> > > >> > > > > wrote: >> > > > > >> > > > > > Hi Kafka Devs, >> > > > > > >> > > > > > One of the nagging issues in the current design of the new >> consumer >> > > has >> > > > > > been the need to support a variety of assignment strategies. >> We've >> > > > > > encountered this in particular in the design of copycat and the >> > > > > processing >> > > > > > framework (KIP-28). From what I understand, Samza also has a >> number >> > > of >> > > > > use >> > > > > > cases with custom assignment needs. The new consumer protocol >> > > supports >> > > > > new >> > > > > > assignment strategies by hooking them into the broker. For many >> > > > > > environments, this is a major pain and in some cases, a >> > non-starter. >> > > It >> > > > > > also challenges the validation that the coordinator can provide. >> > For >> > > > > > example, some assignment strategies call for partitions to be >> > > assigned >> > > > > > multiple times, which means that the coordinator can only check >> > that >> > > > > > partitions have been assigned at least once. >> > > > > > >> > > > > > To solve these issues, we'd like to propose moving assignment to >> > the >> > > > > > client. I've written a wiki which outlines some protocol changes >> to >> > > > > achieve >> > > > > > this: >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal >> > > > > > . >> > > > > > To summarize briefly, instead of the coordinator assigning the >> > > > partitions >> > > > > > itself, all subscriptions are forwarded to each member of the >> group >> > > > which >> > > > > > then decides independently which partitions it should consume. >> The >> > > > > protocol >> > > > > > provides a mechanism for the coordinator to validate that all >> > > consumers >> > > > > use >> > > > > > the same assignment strategy, but it does not ensure that the >> > > resulting >> > > > > > assignment is "correct." This provides a powerful capability for >> > > users >> > > > to >> > > > > > control the full data flow on the client side. They control how >> > data >> > > is >> > > > > > written to partitions through the Partitioner interface and they >> > > > control >> > > > > > how data is consumed through the assignment strategy, all without >> > > > > touching >> > > > > > the server. >> > > > > > >> > > > > > Of course nothing comes for free. In particular, this change >> > removes >> > > > the >> > > > > > ability of the coordinator to validate that commits are made by >> > > > consumers >> > > > > > who were assigned the respective partition. This might not be too >> > bad >> > > > > since >> > > > > > we retain the ability to validate the generation id, but it is a >> > > > > potential >> > > > > > concern. We have considered alternative protocols which add a >> > second >> > > > > > round-trip to the protocol in order to give the coordinator the >> > > ability >> > > > > to >> > > > > > confirm the assignment. As mentioned above, the coordinator is >> > > somewhat >> > > > > > limited in what it can actually validate, but this would return >> its >> > > > > ability >> > > > > > to validate commits. The tradeoff is that it increases the >> > protocol's >> > > > > > complexity which means more ways for the protocol to fail and >> > > > > consequently >> > > > > > more edge cases in the code. >> > > > > > >> > > > > > It also misses an opportunity to generalize the group membership >> > > > protocol >> > > > > > for additional use cases. In fact, after you've gone to the >> trouble >> > > of >> > > > > > moving assignment to the client, the main thing that is left in >> > this >> > > > > > protocol is basically a general group management capability. This >> > is >> > > > > > exactly what is needed for a few cases that are currently under >> > > > > discussion >> > > > > > (e.g. copycat or single-writer producer). We've taken this >> further >> > > step >> > > > > in >> > > > > > the proposal and attempted to envision what that general protocol >> > > might >> > > > > > look like and how it could be used both by the consumer and for >> > some >> > > of >> > > > > > these other cases. >> > > > > > >> > > > > > Anyway, since time is running out on the new consumer, we have >> > > perhaps >> > > > > one >> > > > > > last chance to consider a significant change in the protocol like >> > > this, >> > > > > so >> > > > > > have a look at the wiki and share your thoughts. I've no doubt >> that >> > > > some >> > > > > > ideas seem clearer in my mind than they do on paper, so ask >> > questions >> > > > if >> > > > > > there is any confusion. >> > > > > > >> > > > > > Thanks! >> > > > > > Jason >> > > > > > >> > > > > >> > > > >> > > >> > >> > > > > -- > Thanks, > Neha