Ewen,

To be more clear about the numbers in LinkedIn including the expected
growth, we have the following table (Assuming avg topic name length is 27
bytes):

#TopicMirrored       #MirrorMakerNodes       #ConsumerPerNode
#BytesPerRebalance         #BytesPerMirrorMakerRollingBounce
100                       26
4                                   33MB                               33MB
* 26 * 2 = 1.7 GB
300                       48                                4
                    342MB                             342MB * 48 * 2 = 32 GB
500                       64                                4
                    1GB                                 1GB * 48 * 2 = 96 GB

The number is planed based on the current growth rate of our traffic in
future 3 years. The bytes in the table are calculated based on the proposed
protocol. We are concerned about whether the broadcast based protocol would
be able to support the scale at that point. The table above only accounts
for one mirror maker consumer group. Given we have four mirror maker
consumer groups, even more bytes would be transferred.

In general, I think for Kafka as a distributed system that can support
thousands of consumers, the consumer rebalance needs to be cheap. It is
important for a few reasons. e.g. Consumer churn might happen from time to
time when we have so many consumers; Supporting for basic use case like
rolling bounce, etc.

Thanks,

Jiangjie (Becket) Qin


On Sun, Aug 30, 2015 at 12:58 AM, Jiangjie Qin <j...@linkedin.com> wrote:

> Ewen,
>
> 1. My concern is for the long subscription set that will be included in
> the protocol metadata. Because we are using selective copy with a long
> subscription set. 100 topics with 27 avg name length will give a 2.7K for
> topic name and 0.4K for the partition number bytes. This gives 3.1K for
> each subscription set. For each mirror maker consumer group, the total size
> of JoinGroupResponse will be 3.1K * 104 * 104 ~ 33 MB. We have several
> other things to think about as well: What happen if the four mirror maker
> consumer group have same broker as coordinator (unlikely for a large
> cluster but still possible)? What if the number of topics increase to 300
> or 500? What if the outbound traffic on the coordinator broker was large?
> What if quota is applied to a client (currently quota only applies to fetch
> and produce, but if metadata is big, maybe we also need to consider take it
> into account)?
>
> 2. For the rolling bounce. Are you saying we can send LeaveGroupRequest if
> it is a shutdown, and don't send LeaveGroupRequest if it is a bounce? I am
> not sure how would that work. But when people do a rolling bounce typically
> it is just a stop-then-start operation. When mirror maker shutdown, how can
> we know whether there will be a follow up start or not? If we send a
> LeaveGroupRequest to trigger a rebalance. In that case, the rebalance
> supposedly would be picked up by all the consumers in the group at next
> heartbeat. The rebalance would likely finish within a few seconds if the
> heartbeat interval is set to 3 seconds. So it seems another round of
> rebalance is unavoidable after the shutdown mirror maker node come back.
>
> 3. For the metadata inconsistency, I am actually not sure how useful it is
> to have all the consumer to fetch from the least used brokers as we are
> doing now. The "least used node" is purely based on the client side view,
> it does not necessarily mean the broker is actually "least used". It is
> totally possible that one of the consume thought a broker is least used
> while everyone else are sending data to that broker. Also, the amount of
> bytes for topic metadata will only be linear to the number of consumers in
> the group. I suppose it will cause big issue. We might get unlucky, for
> example the four mirror maker consumer groups are hashed to the same
> broker. In that case, the total TopicMetadataResponse sent by that that
> broker might be a few MB, but is way less than the total size of
> JoinGroupResponse. So that would be the first issue to solve.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
>
>
>
>
> On Sat, Aug 29, 2015 at 12:39 AM, Ewen Cheslack-Postava <e...@confluent.io
> > wrote:
>
>> @Onur - I think you reviewed the KAFKA-2464 patch in some detail. wrt the
>> specific problem you raised about using largest # of partitions, you're
>> right, with the updated proposal this isn't possible because we only send
>> a
>> hash of the metadata in order to avoid the overhead of per-member metadata
>> being transmitted back to members in the join group response.
>>
>> However, the KAKFA-2464 patch addresses this by avoiding inconsistencies
>> as
>> much as possible by preferring the coordinator for metadata fetches when
>> it
>> detects inconsistencies. If metadata is changing fast enough, this can
>> still result in another round of inconsistency. However, note that this
>> only applies to metadata that would be included in an subscription (i.e.,
>> if quickly created/deleted topics were excluded from mirror maker
>> consumers, those consumers would not be affected by those metadata changes
>> since the hash is computed based on the topics included in the
>> subscription). Additionally, it seems based on Becket's numbers that the
>> frequency that you *might* hit this is very small and the duration of the
>> delay in getting the group to reconverge should be 2 rounds of heartbeats,
>> or 6 seconds based on current defaults.
>>
>> @Becket - these numbers are *really* useful, thanks for including them.
>> The
>> average topic name length is actually quite useful it sorting out the
>> protocol overhead.
>>
>> 1. I think the only reason this is a question is because of the metadata?
>> Otherwise we always have the standard heartbeat cycle, which is currently
>> set at 3 seconds. If metadata mismatches, all clients need to refresh.
>> Because of concerns of inconsistency, the patch for KAFKA-2464 has all
>> clients in the group hit the coordinator to achieve consistency (only
>> after
>> they detect inconsistency, so normal case should be spread across
>> brokers).
>> I'm actually not convinced this is a good idea since it centralizes all
>> metadata lookups for the group to one host, which seems risky. But it does
>> guarantee consistency as long as the metadata doesn't change in that
>> window, and any member that can't fetch metadata should just drop out of
>> the group anyway. Your numbers seem to suggest it's unlikely to be *that*
>> unstable in that window. It seems the bad case is that it takes 9 seconds
>> rather than 6 seconds to converge. And those numbers confirm previous
>> arguments that the metadata changes slowly enough that a temporary
>> inconsistency can be resolved very quickly.
>> 2. Rolling bounces are pretty clean regardless of whether assignment is
>> server- or client-side. We're following the same basic process either way.
>> In the normal case, processes restart cleanly and upon start, they
>> immediately try to join the group. There are two cases. If you're the
>> *first* node to restart, you'll trigger a join group and everyone will
>> need
>> to rejoin at their next heartbeat. It'll take as long as the heartbeat
>> interval. If you're not the first node, then there's probably another node
>> that was bounced that's already triggered the next generation. In that
>> case
>> you're just waiting to join the next generation anyway, and you'll
>> probably
>> be done in the next 3 seconds (again, based on current intervals). I think
>> the major question here is how you schedule the rolling bounce -- if you
>> do
>> too quickly, you might have a long period where lots of nodes aren't doing
>> any real work because there's always join groups in progress, which
>> requires everyone stopping, committing, then starting again. However, this
>> is true with any version of the current protocol -- it has nothing to do
>> with client side partition assignment.
>> Note that this is clearly affected by the proposal for a LeaveGroup
>> message. Rolling bounce is a case where you might not want to trigger a
>> LeaveGroup because you actually expect to restart and rejoin the group
>> immediately. This is a good argument for not using TCP disconnect as a
>> LeaveGroup signal.
>> 3. I think this is addressed in the previous two comments. Maybe one thing
>> to address again is that metadata hashes can be based only on topics
>> matching subscriptions, so any changes in, e.g., test topics that do not
>> affect a consumer groups subscriptions should have 0 effect. They never
>> trigger any join groups, never affect membership, etc. Re: actual sync,
>> I'd
>> be curious if defaulting to the coordinator to ensure consistency has any
>> problems that you can think of? It's not strictly a guarantee of
>> consistency since the metadata can change between requests from different
>> clients, but practically speaking, I don't see a case where metadata would
>> change so quickly and so consistently that we'd run into problems.
>>
>> -Ewen
>>
>>
>>
>> On Fri, Aug 28, 2015 at 9:32 PM, Jiangjie Qin <j...@linkedin.com.invalid>
>> wrote:
>>
>> > Hi Neha,
>> >
>> > Following are some numbers we have in the pipeline. It would be very
>> > helpful to see how it goes with the proposed protocol. We will try to do
>> > some tests with the current patch as well. Please also let us know if
>> you
>> > want further information.
>> >
>> > 32 brokers, 1Gbps NIC
>> > 547 topics
>> > 27 chars average topic name length
>> > 2-3 consumers for each topic
>> >
>> > Four 26-node mirror maker instances (Four different consumer groups).
>> Each
>> > node has 4 consumers. (Each mirror maker instance has 104 consumers)
>> > We are actually using selective copy, so we have a big whitelist for
>> each
>> > mirror maker, copying about 100 topics (We expect it to grow to a
>> couple of
>> > hundreds).
>> > The mirror makers are co-located with target cluster, so the consumer
>> > traffic go through the WAN.
>> >
>> > We have 5 to 6 wildcard consumers consuming from all the topics.
>> >
>> > The topic creation frequency is not high now, roughly about 1 / day.
>> >
>> > The scenarios we are interested in are:
>> > 1. The time for one round of rebalance.
>> > 2. The time for a rolling bounce of mirror maker.
>> > 3. For wildcard topic, does metadata sync up cause problem.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> > On Fri, Aug 28, 2015 at 1:24 PM, Joel Koshy <jjkosh...@gmail.com>
>> wrote:
>> >
>> > > Another use-case I was thinking of was something like rack-aware
>> > > assignment of partitions to clients. This would require some
>> > > additional topic metadata to be propagated to and from the coordinator
>> > > and you would need some way to resolve conflicts for such strategies.
>> > > I think that could be addressed by attaching a generation id to the
>> > > metadata and use that (i.e., pick the highest) in order to resolve
>> > > conflicts without another round of join-group requests.
>> > >
>> > > Likewise, without delete/recreate, partition counts are a sort of
>> > > generation id since they are non-decreasing. If we need to account for
>> > > delete/recreate that could perhaps be addressed by an explicit
>> > > (per-topic) generation id attached to each topic in the metadata blob.
>> > > Does that make sense? I think that covers my concerns wrt the
>> > > split-brain issues.
>> > >
>> > > I'm still a bit wary of the n^2*m sized rebroadcast of all the
>> > > metadata - mainly because for various reasons at LinkedIn, we are
>> > > actually using large explicit whitelists (and not wildcards) in
>> > > several of our mirroring pipelines. At this point I feel that is a
>> > > reasonable cost to pay for having all the logic in one place (i.e.,
>> > > client side) but I would like to think a bit more on that.
>> > >
>> > > Joel
>> > >
>> > >
>> > > On Fri, Aug 28, 2015 at 1:02 PM, Onur Karaman
>> > > <okara...@linkedin.com.invalid> wrote:
>> > > > From what I understand, the "largest number of partitions" trick is
>> > based
>> > > > on the assumption that topics can only expand their partitions. What
>> > > > happens when a topic gets deleted and recreated? This breaks that
>> > > > assumption.
>> > > >
>> > > > On Fri, Aug 28, 2015 at 6:33 AM, Neha Narkhede <n...@confluent.io>
>> > > wrote:
>> > > >
>> > > >> Thanks for re-reviewing Joel.
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >> On Fri, Aug 28, 2015 at 2:51 AM -0700, "Joel Koshy" <
>> > > jjkosh...@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >> > I think we think this proposal addresses 100% of the split brain
>> > > issues
>> > > >> > ever seen in the ZK-based protocol, but I think you think there
>> are
>> > > still
>> > > >> > issues. Can you explain what your thinking of and when you think
>> it
>> > > would
>> > > >> > happen? I want to make sure you aren't assuming
>> > > client-side=>split-brain
>> > > >> > since I think that is totally not the case.
>> > > >>
>> > > >> Yes I had concluded that client-side assignment would still result
>> in
>> > > >> split-brain wrt partition counts, but I overlooked a key sentence
>> in
>> > > >> the wiki - i.e., that the assignment algorithm for consumers can
>> just
>> > > >> use the largest number of partitions for each topic reported by
>> any of
>> > > >> the consumers. i.e., I assumed that consumers would just fail
>> > > >> rebalance if the partition counts were inconsistent but that is not
>> > > >> the case since this conflict can be easily resolved as described
>> > > >> without further join-group requests. Sorry about that. There is
>> still
>> > > >> the issue of the coordinator having to send back n*m worth of
>> > > >> metadata, but that was not my biggest concern. I'll look over it
>> again
>> > > >> and reply back tomorrow.
>> > > >>
>> > > >> Joel
>> > > >>
>> > > >> On Thu, Aug 27, 2015 at 2:55 PM, Jay Kreps  wrote:
>> > > >> > Hey Joel,
>> > > >> >
>> > > >> > I really don't think we should do both. There are pros and cons
>> but
>> > we
>> > > >> > should make a decision and work on operationalizing one approach.
>> > > Much of
>> > > >> > really making something like this work is getting all the bugs
>> out,
>> > > >> getting
>> > > >> > monitoring in place, getting rigorous system tests in place.
>> Trying
>> > > to do
>> > > >> > those things twice with the same resources will just mean we do
>> them
>> > > half
>> > > >> > as well. I also think this buys nothing from the user's point of
>> > > >> view--they
>> > > >> > want co-ordination that works correctly, the debate we are
>> having is
>> > > >> purely
>> > > >> > a "how should we build that" debate. So this is really not the
>> kind
>> > of
>> > > >> > thing we'd want to make pluggable and if we did that would just
>> > > >> complicate
>> > > >> > life for the user.
>> > > >> >
>> > > >> > I think we think this proposal addresses 100% of the split brain
>> > > issues
>> > > >> > ever seen in the ZK-based protocol, but I think you think there
>> are
>> > > still
>> > > >> > issues. Can you explain what your thinking of and when you think
>> it
>> > > would
>> > > >> > happen? I want to make sure you aren't assuming
>> > > client-side=>split-brain
>> > > >> > since I think that is totally not the case.
>> > > >> >
>> > > >> > With respect to "herd issues" I actually think all the proposals
>> > > address
>> > > >> > this by scaling the co-ordinator out to all nodes and making the
>> > > >> > co-ordination vastly cheaper. No proposal, of course, gets rid of
>> > the
>> > > >> fact
>> > > >> > that all clients rejoin at once when there is a membership
>> change,
>> > but
>> > > >> that
>> > > >> > is kind of fundamental to the problem.
>> > > >> >
>> > > >> > -Jay
>> > > >> >
>> > > >> > On Thu, Aug 27, 2015 at 2:02 PM, Joel Koshy  wrote:
>> > > >> >
>> > > >> >> I actually feel these set of tests (whatever they may be) are
>> > > somewhat
>> > > >> >> irrelevant here. My main concern with the current client-side
>> > > proposal
>> > > >> >> (i.e., without Becket's follow-up suggestions) is that it makes
>> a
>> > > >> >> significant compromise to the original charter of the new
>> consumer
>> > -
>> > > >> >> i.e., reduce/eliminate herd and split brain problems in both
>> group
>> > > >> >> management and partition assignment. I understand the need for
>> > > >> >> client-side partition assignment in some use cases (which we are
>> > also
>> > > >> >> interested in), but I also think we should make every effort to
>> > keep
>> > > >> >> full server-side coordination for the remaining (majority) of
>> use
>> > > >> >> cases especially if it does not complicate the protocol. The
>> > proposed
>> > > >> >> changes do not complicate the protocol IMO - i.e., there is no
>> > > further
>> > > >> >> modification to the request/response formats beyond the current
>> > > >> >> client-side proposal. It only involves a trivial
>> reinterpretation
>> > of
>> > > >> >> the content of the protocol metadata field.
>> > > >> >>
>> > > >> >> Joel
>> > > >> >>
>> > > >> >> On Wed, Aug 26, 2015 at 9:33 PM, Neha Narkhede  wrote:
>> > > >> >> > Hey Becket,
>> > > >> >> >
>> > > >> >> > In that case, the broker side partition assignment would be
>> ideal
>> > > >> because
>> > > >> >> >> it avoids
>> > > >> >> >> issues like metadata inconsistency / split brain / exploding
>> > > >> >> subscription
>> > > >> >> >> set propagation.
>> > > >> >> >
>> > > >> >> >
>> > > >> >> > As per our previous discussions regarding each of those
>> concerns
>> > > >> >> (referring
>> > > >> >> > to this email thread, KIP calls and JIRA comments), we are
>> going
>> > to
>> > > >> run a
>> > > >> >> > set of tests using the LinkedIn deployment numbers that we
>> will
>> > > wait
>> > > >> for
>> > > >> >> > you to share. The purpose is to see if those concerns are
>> really
>> > > >> valid or
>> > > >> >> > not. I'd prefer to see that before making any more changes
>> that
>> > > will
>> > > >> >> > complicate the protocol.
>> > > >> >> >
>> > > >> >> > On Wed, Aug 26, 2015 at 4:57 PM, Jiangjie Qin > >
>> > > >> >> > wrote:
>> > > >> >> >
>> > > >> >> >> Hi folks,
>> > > >> >> >>
>> > > >> >> >> After further discussion in LinkedIn, we found that while
>> > having a
>> > > >> more
>> > > >> >> >> general group management protocol is very useful, the vast
>> > > majority
>> > > >> of
>> > > >> >> the
>> > > >> >> >> clients will not use customized partition assignment
>> strategy.
>> > In
>> > > >> that
>> > > >> >> >> case, the broker side partition assignment would be ideal
>> > because
>> > > it
>> > > >> >> avoids
>> > > >> >> >> issues like metadata inconsistency / split brain / exploding
>> > > >> >> subscription
>> > > >> >> >> set propagation.
>> > > >> >> >>
>> > > >> >> >> So we have the following proposal that satisfies the
>> majority of
>> > > the
>> > > >> >> >> clients' needs without changing the currently proposed binary
>> > > >> protocol.
>> > > >> >> >> i.e., Continue to support broker-side assignment if the
>> > assignment
>> > > >> >> strategy
>> > > >> >> >> is recognized by the coordinator.
>> > > >> >> >>
>> > > >> >> >> 1. Keep the binary protocol as currently proposed.
>> > > >> >> >>
>> > > >> >> >> 2. Change the way we interpret ProtocolMetadata:
>> > > >> >> >> 2.1 On consumer side, change partition.assignment.strategy to
>> > > >> >> >> partition.assignor.class. Implement the something like the
>> > > following
>> > > >> >> >> PartitionAssignor Interface:
>> > > >> >> >>
>> > > >> >> >> public interface PartitionAssignor {
>> > > >> >> >>   List protocolTypes();
>> > > >> >> >>   byte[] protocolMetadata();
>> > > >> >> >>   // return the Topic->List map that are assigned to this
>> > > >> >> >> consumer.
>> > > >> >> >>   List assignPartitions(String protocolType, byte[]
>> > > >> >> >> responseProtocolMetadata);
>> > > >> >> >> }
>> > > >> >> >>
>> > > >> >> >> public abstract class AbstractPartitionAssignor implements
>> > > >> >> >> PartitionAssignor {
>> > > >> >> >>   protected final KafkaConsumer consumer;
>> > > >> >> >>   AbstractPartitionAssignor(KafkaConsumer consumer) {
>> > > >> >> >>     this.consumer = consumer;
>> > > >> >> >>   }
>> > > >> >> >> }
>> > > >> >> >>
>> > > >> >> >> 2.2 The ProtocolMetadata in JoinGroupRequest will be
>> > > >> >> >> partitionAssignor.protocolMetadata(). When
>> > > partition.assignor.class
>> > > >> is
>> > > >> >> >> "range" or "roundrobin", the ProtocolMetadata in
>> > JoinGroupRequest
>> > > >> will
>> > > >> >> be a
>> > > >> >> >> JSON subscription set. ("range", "roundrobin" will be
>> reserved
>> > > >> words, we
>> > > >> >> >> can also consider reserving some Prefix such as "broker-" to
>> be
>> > > more
>> > > >> >> clear)
>> > > >> >> >> 2.3 On broker side when ProtocolType is "range" or
>> > "roundroubin",
>> > > >> >> >> coordinator will parse the ProtocolMetadata in the
>> > > JoinGroupRequest
>> > > >> and
>> > > >> >> >> assign the partitions for consumers. In the
>> JoinGroupResponse,
>> > the
>> > > >> >> >> ProtocolMetadata will be the global assignment of partitions.
>> > > >> >> >> 2.4 On client side, after receiving the JoinGroupResponse,
>> > > >> >> >> partitionAssignor.assignPartitions() will be invoked to
>> return
>> > the
>> > > >> >> actual
>> > > >> >> >> assignment. If the assignor is RangeAssignor or
>> > > RoundRobinAssignor,
>> > > >> they
>> > > >> >> >> will parse the assignment from the ProtocolMetadata returned
>> by
>> > > >> >> >> coordinator.
>> > > >> >> >>
>> > > >> >> >> This approach has a few merits:
>> > > >> >> >> 1. Does not change the proposed binary protocol, which is
>> still
>> > > >> general.
>> > > >> >> >> 2. The majority of the consumers will not suffer from
>> > inconsistent
>> > > >> >> metadata
>> > > >> >> >> / split brain / exploding subscription set propagation. This
>> is
>> > > >> >> >> specifically to deal with the issue that the current proposal
>> > > caters
>> > > >> to
>> > > >> >> a
>> > > >> >> >> 20% use-case while adversely impacting the more common 80%
>> > > use-cases.
>> > > >> >> >> 3. Easy to implement. The only thing needed is implement a
>> > > >> partitioner
>> > > >> >> >> class. For most users, the default range and roundrobin
>> > > partitioner
>> > > >> are
>> > > >> >> >> good enough.
>> > > >> >> >>
>> > > >> >> >> Thoughts?
>> > > >> >> >>
>> > > >> >> >> Thanks,
>> > > >> >> >>
>> > > >> >> >> Jiangjie (Becket) Qin
>> > > >> >> >>
>> > > >> >> >> On Tue, Aug 18, 2015 at 2:51 PM, Jason Gustafson
>> > > >> >> >> wrote:
>> > > >> >> >>
>> > > >> >> >> > Follow-up from the kip call:
>> > > >> >> >> >
>> > > >> >> >> > 1. Onur brought up the question of whether this protocol
>> > > provides
>> > > >> >> enough
>> > > >> >> >> > coordination capabilities to be generally useful in
>> practice
>> > (is
>> > > >> that
>> > > >> >> >> > accurate, Onur?). If it doesn't, then each use case would
>> > > probably
>> > > >> >> need a
>> > > >> >> >> > dependence on zookeeper anyway, and we haven't really
>> gained
>> > > >> anything.
>> > > >> >> >> The
>> > > >> >> >> > group membership provided by this protocol is a useful
>> > primitive
>> > > >> for
>> > > >> >> >> > coordination, but it's limited in the sense that everything
>> > > shared
>> > > >> >> among
>> > > >> >> >> > the group has to be communicated at the time the group is
>> > > created.
>> > > >> If
>> > > >> >> any
>> > > >> >> >> > shared data changes, then the only way the group can ensure
>> > > >> agreement
>> > > >> >> is
>> > > >> >> >> to
>> > > >> >> >> > force a rebalance. This is expensive since all members must
>> > > stall
>> > > >> >> while
>> > > >> >> >> the
>> > > >> >> >> > rebalancing takes place. As we have also seen, there is a
>> > > practical
>> > > >> >> limit
>> > > >> >> >> > on the amount of metadata that can be sent through this
>> > protocol
>> > > >> when
>> > > >> >> >> > groups get a little larger. This protocol is therefore not
>> > > >> suitable to
>> > > >> >> >> > cases which require frequent communication or which
>> require a
>> > > large
>> > > >> >> >> amount
>> > > >> >> >> > of data to be communicated. For the use cases listed on the
>> > > wiki,
>> > > >> >> neither
>> > > >> >> >> > of these appear to be an issue, but there may be other
>> > > limitations
>> > > >> >> which
>> > > >> >> >> > would limit reuse of the protocol. Perhaps it would be
>> > > sufficient
>> > > >> to
>> > > >> >> >> sketch
>> > > >> >> >> > how these cases might work?
>> > > >> >> >> >
>> > > >> >> >> > 2. We talked a little bit about the issue of metadata
>> churn.
>> > > Becket
>> > > >> >> >> brought
>> > > >> >> >> > up the interesting point that not only do we depend on
>> topic
>> > > >> metadata
>> > > >> >> >> > changing relatively infrequently, but we also expect timely
>> > > >> agreement
>> > > >> >> >> among
>> > > >> >> >> > the brokers on what that metadata is. To resolve this, we
>> can
>> > > have
>> > > >> the
>> > > >> >> >> > consumers fetch metadata from the coordinator. We still
>> depend
>> > > on
>> > > >> >> topic
>> > > >> >> >> > metadata not changing frequently, but this should resolve
>> any
>> > > >> >> >> disagreement
>> > > >> >> >> > among the brokers themselves. In fact, since we expect that
>> > > >> >> disagreement
>> > > >> >> >> is
>> > > >> >> >> > relatively rare, we can have the consumers fetch from the
>> > > >> coordinator
>> > > >> >> >> only
>> > > >> >> >> > when when a disagreement occurs. The nice thing about this
>> > > >> proposal is
>> > > >> >> >> that
>> > > >> >> >> > it doesn't affect the join group semantics, so the
>> coordinator
>> > > >> would
>> > > >> >> >> remain
>> > > >> >> >> > oblivious to the metadata used by the group for agreement.
>> > > Also, if
>> > > >> >> >> > metadata churn becomes an issue, it might be possible to
>> have
>> > > the
>> > > >> >> >> > coordinator provide a snapshot for the group to ensure
>> that a
>> > > >> >> generation
>> > > >> >> >> > would be able to reach agreement (this would probably
>> require
>> > > >> adding
>> > > >> >> >> > groupId/generation to the metadata request).
>> > > >> >> >> >
>> > > >> >> >> > 3. We talked briefly about support for multiple protocols
>> in
>> > the
>> > > >> join
>> > > >> >> >> group
>> > > >> >> >> > request in order to allow changing the assignment strategy
>> > > without
>> > > >> >> >> > downtime. I think it's a little doubtful that this would
>> get
>> > > much
>> > > >> use
>> > > >> >> in
>> > > >> >> >> > practice, but I agree it's a nice option to have on the
>> table.
>> > > An
>> > > >> >> >> > alternative, for the sake of argument, is to have each
>> member
>> > > >> provide
>> > > >> >> >> only
>> > > >> >> >> > one version of the protocol, and to let the coordinator
>> choose
>> > > the
>> > > >> >> >> protocol
>> > > >> >> >> > with the largest number of supporters. All members which
>> can't
>> > > >> support
>> > > >> >> >> the
>> > > >> >> >> > selected protocol would be kicked out of the group. The
>> > drawback
>> > > >> in a
>> > > >> >> >> > rolling upgrade is that the total capacity of the group
>> would
>> > be
>> > > >> >> >> > momentarily halved. It would also be a little tricky to
>> handle
>> > > the
>> > > >> >> case
>> > > >> >> >> of
>> > > >> >> >> > retrying when a consumer is kicked out of the group. We
>> > wouldn't
>> > > >> want
>> > > >> >> it
>> > > >> >> >> to
>> > > >> >> >> > be able to effect a rebalance, for example, if it would
>> just
>> > be
>> > > >> kicked
>> > > >> >> >> out
>> > > >> >> >> > again. That would probably complicate the group management
>> > > logic on
>> > > >> >> the
>> > > >> >> >> > coordinator.
>> > > >> >> >> >
>> > > >> >> >> >
>> > > >> >> >> > Thanks,
>> > > >> >> >> > Jason
>> > > >> >> >> >
>> > > >> >> >> >
>> > > >> >> >> > On Tue, Aug 18, 2015 at 11:16 AM, Jiangjie Qin
>> > > >> >> > >> >
>> > > >> >> >> > wrote:
>> > > >> >> >> >
>> > > >> >> >> > > Jun,
>> > > >> >> >> > >
>> > > >> >> >> > > Yes, I agree. If the metadata can be synced quickly there
>> > > should
>> > > >> >> not be
>> > > >> >> >> > an
>> > > >> >> >> > > issue. It just occurred to me that there is a proposal to
>> > > allow
>> > > >> >> >> consuming
>> > > >> >> >> > > from followers in ISR, that could potentially cause more
>> > > frequent
>> > > >> >> >> > metadata
>> > > >> >> >> > > change for consumers. Would that be an issue?
>> > > >> >> >> > >
>> > > >> >> >> > > Thanks,
>> > > >> >> >> > >
>> > > >> >> >> > > Jiangjie (Becket) Qin
>> > > >> >> >> > >
>> > > >> >> >> > > On Tue, Aug 18, 2015 at 10:22 AM, Jason Gustafson <
>> > > >> >> ja...@confluent.io>
>> > > >> >> >> > > wrote:
>> > > >> >> >> > >
>> > > >> >> >> > > > Hi Jun,
>> > > >> >> >> > > >
>> > > >> >> >> > > > Answers below:
>> > > >> >> >> > > >
>> > > >> >> >> > > > 1. When there are multiple common protocols in the
>> > > >> >> JoinGroupRequest,
>> > > >> >> >> > > which
>> > > >> >> >> > > > one would the coordinator pick?
>> > > >> >> >> > > >
>> > > >> >> >> > > > I was intending to use the list to indicate
>> preference. If
>> > > all
>> > > >> >> group
>> > > >> >> >> > > > members support protocols ["A", "B"] in that order,
>> then
>> > we
>> > > >> will
>> > > >> >> >> choose
>> > > >> >> >> > > > "A." If some support ["B", "A"], then we would either
>> > choose
>> > > >> >> based on
>> > > >> >> >> > > > respective counts or just randomly. The main use case
>> of
>> > > >> >> supporting
>> > > >> >> >> the
>> > > >> >> >> > > > list is for rolling upgrades when a change is made to
>> the
>> > > >> >> assignment
>> > > >> >> >> > > > strategy. In that case, the new assignment strategy
>> would
>> > be
>> > > >> >> listed
>> > > >> >> >> > first
>> > > >> >> >> > > > in the upgraded client. I think it's debatable whether
>> > this
>> > > >> >> feature
>> > > >> >> >> > would
>> > > >> >> >> > > > get much use in practice, so we might consider dropping
>> > it.
>> > > >> >> >> > > >
>> > > >> >> >> > > > 2. If the protocols don't agree, the group construction
>> > > fails.
>> > > >> >> What
>> > > >> >> >> > > exactly
>> > > >> >> >> > > > does it mean? Do we send an error in every
>> > JoinGroupResponse
>> > > >> and
>> > > >> >> >> remove
>> > > >> >> >> > > all
>> > > >> >> >> > > > members in the group in the coordinator?
>> > > >> >> >> > > >
>> > > >> >> >> > > > Yes, that is right. It would be handled similarly to
>> > > >> inconsistent
>> > > >> >> >> > > > assignment strategies in the current protocol. The
>> > > coordinator
>> > > >> >> >> returns
>> > > >> >> >> > an
>> > > >> >> >> > > > error in each join group response, and the client
>> > propagates
>> > > >> the
>> > > >> >> >> error
>> > > >> >> >> > to
>> > > >> >> >> > > > the user.
>> > > >> >> >> > > >
>> > > >> >> >> > > > 3. Consumer embedded protocol: The proposal has two
>> > > different
>> > > >> >> formats
>> > > >> >> >> > of
>> > > >> >> >> > > > subscription depending on whether wildcards are used or
>> > not.
>> > > >> This
>> > > >> >> >> > seems a
>> > > >> >> >> > > > bit complicated. Would it be better to always use the
>> > > metadata
>> > > >> >> hash?
>> > > >> >> >> > The
>> > > >> >> >> > > > clients know the subscribed topics already. This way,
>> the
>> > > >> client
>> > > >> >> code
>> > > >> >> >> > > > behaves the same whether wildcards are used or not.
>> > > >> >> >> > > >
>> > > >> >> >> > > > Yeah, I think this is possible (Neha also suggested
>> it). I
>> > > >> haven't
>> > > >> >> >> > > updated
>> > > >> >> >> > > > the wiki yet, but the patch I started working on uses
>> only
>> > > the
>> > > >> >> >> metadata
>> > > >> >> >> > > > hash. In the case that an explicit topic list is
>> provided,
>> > > the
>> > > >> >> hash
>> > > >> >> >> > just
>> > > >> >> >> > > > covers the metadata for those topics.
>> > > >> >> >> > > >
>> > > >> >> >> > > >
>> > > >> >> >> > > > Thanks,
>> > > >> >> >> > > > Jason
>> > > >> >> >> > > >
>> > > >> >> >> > > >
>> > > >> >> >> > > >
>> > > >> >> >> > > > On Tue, Aug 18, 2015 at 10:06 AM, Jun Rao
>> > > >> >> wrote:
>> > > >> >> >> > > >
>> > > >> >> >> > > > > Jason,
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > Thanks for the writeup. A few comments below.
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > 1. When there are multiple common protocols in the
>> > > >> >> >> JoinGroupRequest,
>> > > >> >> >> > > > which
>> > > >> >> >> > > > > one would the coordinator pick?
>> > > >> >> >> > > > > 2. If the protocols don't agree, the group
>> construction
>> > > >> fails.
>> > > >> >> What
>> > > >> >> >> > > > exactly
>> > > >> >> >> > > > > does it mean? Do we send an error in every
>> > > JoinGroupResponse
>> > > >> and
>> > > >> >> >> > remove
>> > > >> >> >> > > > all
>> > > >> >> >> > > > > members in the group in the coordinator?
>> > > >> >> >> > > > > 3. Consumer embedded protocol: The proposal has two
>> > > different
>> > > >> >> >> formats
>> > > >> >> >> > > of
>> > > >> >> >> > > > > subscription depending on whether wildcards are used
>> or
>> > > not.
>> > > >> >> This
>> > > >> >> >> > > seems a
>> > > >> >> >> > > > > bit complicated. Would it be better to always use the
>> > > >> metadata
>> > > >> >> >> hash?
>> > > >> >> >> > > The
>> > > >> >> >> > > > > clients know the subscribed topics already. This way,
>> > the
>> > > >> client
>> > > >> >> >> code
>> > > >> >> >> > > > > behaves the same whether wildcards are used or not.
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > Jiangjie,
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > With respect to rebalance churns due to topics being
>> > > >> >> >> created/deleted.
>> > > >> >> >> > > > With
>> > > >> >> >> > > > > the new consumer, the rebalance can probably settle
>> > within
>> > > >> 200ms
>> > > >> >> >> when
>> > > >> >> >> > > > there
>> > > >> >> >> > > > > is a topic change. So, as long as we are not changing
>> > > topic
>> > > >> more
>> > > >> >> >> > than 5
>> > > >> >> >> > > > > times per sec, there shouldn't be constant churns,
>> > right?
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > Thanks,
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > Jun
>> > > >> >> >> > > > >
>> > > >> >> >> > > > >
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <
>> > > >> >> >> ja...@confluent.io
>> > > >> >> >> > >
>> > > >> >> >> > > > > wrote:
>> > > >> >> >> > > > >
>> > > >> >> >> > > > > > Hi Kafka Devs,
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > > > One of the nagging issues in the current design of
>> the
>> > > new
>> > > >> >> >> consumer
>> > > >> >> >> > > has
>> > > >> >> >> > > > > > been the need to support a variety of assignment
>> > > >> strategies.
>> > > >> >> >> We've
>> > > >> >> >> > > > > > encountered this in particular in the design of
>> > copycat
>> > > and
>> > > >> >> the
>> > > >> >> >> > > > > processing
>> > > >> >> >> > > > > > framework (KIP-28). From what I understand, Samza
>> also
>> > > has
>> > > >> a
>> > > >> >> >> number
>> > > >> >> >> > > of
>> > > >> >> >> > > > > use
>> > > >> >> >> > > > > > cases with custom assignment needs. The new
>> consumer
>> > > >> protocol
>> > > >> >> >> > > supports
>> > > >> >> >> > > > > new
>> > > >> >> >> > > > > > assignment strategies by hooking them into the
>> broker.
>> > > For
>> > > >> >> many
>> > > >> >> >> > > > > > environments, this is a major pain and in some
>> cases,
>> > a
>> > > >> >> >> > non-starter.
>> > > >> >> >> > > It
>> > > >> >> >> > > > > > also challenges the validation that the coordinator
>> > can
>> > > >> >> provide.
>> > > >> >> >> > For
>> > > >> >> >> > > > > > example, some assignment strategies call for
>> > partitions
>> > > to
>> > > >> be
>> > > >> >> >> > > assigned
>> > > >> >> >> > > > > > multiple times, which means that the coordinator
>> can
>> > > only
>> > > >> >> check
>> > > >> >> >> > that
>> > > >> >> >> > > > > > partitions have been assigned at least once.
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > > > To solve these issues, we'd like to propose moving
>> > > >> assignment
>> > > >> >> to
>> > > >> >> >> > the
>> > > >> >> >> > > > > > client. I've written a wiki which outlines some
>> > protocol
>> > > >> >> changes
>> > > >> >> >> to
>> > > >> >> >> > > > > achieve
>> > > >> >> >> > > > > > this:
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > >
>> > > >> >> >> > > >
>> > > >> >> >> > >
>> > > >> >> >> >
>> > > >> >> >>
>> > > >> >>
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
>> > > >> >> >> > > > > > .
>> > > >> >> >> > > > > > To summarize briefly, instead of the coordinator
>> > > assigning
>> > > >> the
>> > > >> >> >> > > > partitions
>> > > >> >> >> > > > > > itself, all subscriptions are forwarded to each
>> member
>> > > of
>> > > >> the
>> > > >> >> >> group
>> > > >> >> >> > > > which
>> > > >> >> >> > > > > > then decides independently which partitions it
>> should
>> > > >> consume.
>> > > >> >> >> The
>> > > >> >> >> > > > > protocol
>> > > >> >> >> > > > > > provides a mechanism for the coordinator to
>> validate
>> > > that
>> > > >> all
>> > > >> >> >> > > consumers
>> > > >> >> >> > > > > use
>> > > >> >> >> > > > > > the same assignment strategy, but it does not
>> ensure
>> > > that
>> > > >> the
>> > > >> >> >> > > resulting
>> > > >> >> >> > > > > > assignment is "correct." This provides a powerful
>> > > >> capability
>> > > >> >> for
>> > > >> >> >> > > users
>> > > >> >> >> > > > to
>> > > >> >> >> > > > > > control the full data flow on the client side. They
>> > > control
>> > > >> >> how
>> > > >> >> >> > data
>> > > >> >> >> > > is
>> > > >> >> >> > > > > > written to partitions through the Partitioner
>> > interface
>> > > and
>> > > >> >> they
>> > > >> >> >> > > > control
>> > > >> >> >> > > > > > how data is consumed through the assignment
>> strategy,
>> > > all
>> > > >> >> without
>> > > >> >> >> > > > > touching
>> > > >> >> >> > > > > > the server.
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > > > Of course nothing comes for free. In particular,
>> this
>> > > >> change
>> > > >> >> >> > removes
>> > > >> >> >> > > > the
>> > > >> >> >> > > > > > ability of the coordinator to validate that commits
>> > are
>> > > >> made
>> > > >> >> by
>> > > >> >> >> > > > consumers
>> > > >> >> >> > > > > > who were assigned the respective partition. This
>> might
>> > > not
>> > > >> be
>> > > >> >> too
>> > > >> >> >> > bad
>> > > >> >> >> > > > > since
>> > > >> >> >> > > > > > we retain the ability to validate the generation
>> id,
>> > > but it
>> > > >> >> is a
>> > > >> >> >> > > > > potential
>> > > >> >> >> > > > > > concern. We have considered alternative protocols
>> > which
>> > > >> add a
>> > > >> >> >> > second
>> > > >> >> >> > > > > > round-trip to the protocol in order to give the
>> > > coordinator
>> > > >> >> the
>> > > >> >> >> > > ability
>> > > >> >> >> > > > > to
>> > > >> >> >> > > > > > confirm the assignment. As mentioned above, the
>> > > >> coordinator is
>> > > >> >> >> > > somewhat
>> > > >> >> >> > > > > > limited in what it can actually validate, but this
>> > would
>> > > >> >> return
>> > > >> >> >> its
>> > > >> >> >> > > > > ability
>> > > >> >> >> > > > > > to validate commits. The tradeoff is that it
>> increases
>> > > the
>> > > >> >> >> > protocol's
>> > > >> >> >> > > > > > complexity which means more ways for the protocol
>> to
>> > > fail
>> > > >> and
>> > > >> >> >> > > > > consequently
>> > > >> >> >> > > > > > more edge cases in the code.
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > > > It also misses an opportunity to generalize the
>> group
>> > > >> >> membership
>> > > >> >> >> > > > protocol
>> > > >> >> >> > > > > > for additional use cases. In fact, after you've
>> gone
>> > to
>> > > the
>> > > >> >> >> trouble
>> > > >> >> >> > > of
>> > > >> >> >> > > > > > moving assignment to the client, the main thing
>> that
>> > is
>> > > >> left
>> > > >> >> in
>> > > >> >> >> > this
>> > > >> >> >> > > > > > protocol is basically a general group management
>> > > >> capability.
>> > > >> >> This
>> > > >> >> >> > is
>> > > >> >> >> > > > > > exactly what is needed for a few cases that are
>> > > currently
>> > > >> >> under
>> > > >> >> >> > > > > discussion
>> > > >> >> >> > > > > > (e.g. copycat or single-writer producer). We've
>> taken
>> > > this
>> > > >> >> >> further
>> > > >> >> >> > > step
>> > > >> >> >> > > > > in
>> > > >> >> >> > > > > > the proposal and attempted to envision what that
>> > general
>> > > >> >> protocol
>> > > >> >> >> > > might
>> > > >> >> >> > > > > > look like and how it could be used both by the
>> > consumer
>> > > and
>> > > >> >> for
>> > > >> >> >> > some
>> > > >> >> >> > > of
>> > > >> >> >> > > > > > these other cases.
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > > > Anyway, since time is running out on the new
>> consumer,
>> > > we
>> > > >> have
>> > > >> >> >> > > perhaps
>> > > >> >> >> > > > > one
>> > > >> >> >> > > > > > last chance to consider a significant change in the
>> > > >> protocol
>> > > >> >> like
>> > > >> >> >> > > this,
>> > > >> >> >> > > > > so
>> > > >> >> >> > > > > > have a look at the wiki and share your thoughts.
>> I've
>> > no
>> > > >> doubt
>> > > >> >> >> that
>> > > >> >> >> > > > some
>> > > >> >> >> > > > > > ideas seem clearer in my mind than they do on
>> paper,
>> > so
>> > > ask
>> > > >> >> >> > questions
>> > > >> >> >> > > > if
>> > > >> >> >> > > > > > there is any confusion.
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > > > Thanks!
>> > > >> >> >> > > > > > Jason
>> > > >> >> >> > > > > >
>> > > >> >> >> > > > >
>> > > >> >> >> > > >
>> > > >> >> >> > >
>> > > >> >> >> >
>> > > >> >> >>
>> > > >> >> >
>> > > >> >> >
>> > > >> >> >
>> > > >> >> > --
>> > > >> >> > Thanks,
>> > > >> >> > Neha
>> > > >> >>
>> > > >>
>> > >
>> >
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>

Reply via email to