Hey Onur and Jiangjie,

I've updated that wiki with a proposal to add regex subscriptions to the
consumer metadata. Can you have a look to see if it addresses your concern?
In general, I think we should be a little careful when we are talking about
the scalability of the protocol. Regardless of whether assignment is done
on the server or the client, the protocol assumes a relatively stable
configuration. When the number of consumers increases beyond a certain
limit, then membership churn becomes a major concern. Similarly there is a
notion of metadata churn when topics are added, deleted, or resized. If
either membership or metadata changes, then the protocol forces all
consumers to stop consumption and rejoin the group. If this happens often
enough, then it can severely impact the ability of the consumer to make
progress. The point is that the protocol may already be unsuited to cases
where there are either a large number of consumers or topics. I wonder if
you guys can share your thoughts about your scaling expectations?

-Jason




On Wed, Aug 12, 2015 at 12:28 PM, Jason Gustafson <ja...@confluent.io>
wrote:

> Hey Jiangjie,
>
> That's a great point. In the worst case (the mirror maker case I guess),
> the join group response can be massive. This would be especially deadly
> when there is a lot of churn in the group (e.g. in a rolling upgrade). The
> current protocol is not great for this case either, but it's significantly
> better. Here are a couple ways to deal with the size:
>
> 1. First, we could have the coordinator compress the responses. This would
> probably be pretty effective if applied across the metadata from all
> members.
>
> 2. I think the regex case is the main problem. Is that right? We could
> extend the metadata to allow the consumer to embed its regex subscription
> in the metadata directly (note this might be a good idea regardless of the
> rest of this proposal). To support regex on the consumer, we must fetch
> metadata for all topics. Rather than having all regex subscribers embed all
> of this metadata in their join group requests, they could instead embed a
> hash of it. Then after the join group responses are received, they just
> need to check that the hashes are the same. If there is a mismatch (which
> should only occur when topics are created, deleted, or resized), then the
> group members must refetch the metadata and rejoin the group. This is also
> how the current protocol behaves when there is a change in the topic
> metadata affecting the group--someone (either the coordinator or the
> consumer) detects the change and forces the group to rebalance.
>
> What do you think?
>
> (Also I think adding groupId/generationId to fetch and produce requests
> seems like an interesting line of thought.)
>
> -Jason
>
>
>
> On Wed, Aug 12, 2015 at 10:57 AM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
>> Hey Ewen,
>>
>> Onur and I discussed this a little bit more. And we are still worrying
>> about passing all the metadata of all consumers around.
>>
>> Let's say I have a cluster has 10,000 topics, the average topic name
>> length
>> is 10 bytes. In this case, the opaque metadata will have 10 * 10,000 =
>> 100KB for topic name, for each topic, there is a 4-byte integer of number
>> of partitions, that's another 40KB. So one global topic metadata will have
>> 140KB data. If I have 100 consumers who are using wildcard to consume from
>> all the topics. That means the protocol metadata end up in the
>> JoinGroupResponse will be 140KB * 100 = 14MB. And the JoinGroupResponse
>> will need to be sent to 100 different consumers, that means 14MB * 100 =
>> 1.4GB need to be sent by the consumer coordinator for one rebalance. How
>> would that work?
>>
>> Also, having two consumers (old owner and new owner) consuming from the
>> same partition might also be a problem. e.g. people are updating database.
>> One thing might worth doing is to add GroupId and Generation ID to
>> ProducerRequest and FetchRequest as well. This will also help with the
>> single producer use case. However, this is probably orthogonal to this
>> thread given the current new consumer also has this problem and I believe
>> we need to fix it.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Aug 11, 2015 at 11:43 PM, Ewen Cheslack-Postava <
>> e...@confluent.io>
>> wrote:
>>
>> > On Tue, Aug 11, 2015 at 11:29 PM, Jiangjie Qin
>> <j...@linkedin.com.invalid>
>> > wrote:
>> >
>> > > Ewen,
>> > >
>> > > Thanks for the explanation.
>> > >
>> > > For (1), I am more concerned about the failure case instead of normal
>> > case.
>> > > What if a consumer somehow was kick out of a group but is still
>> consuming
>> > > and committing offsets? Does that mean the new owner and old owner
>> might
>> > > potentially consuming from and committing offsets for the same
>> partition?
>> > > In the old consumer, this won't happen because the new consumer will
>> not
>> > be
>> > > able to start consumption unless the previous owner has released its
>> > > ownership. Basically, without the ownership guarantee, I don't see how
>> > the
>> > > communication among consumers themselves alone can solve the problem
>> > here.
>> > >
>> >
>> > The generation ID check still applies to offset commits. If one of the
>> > consumers is kicked out and misbehaving, it can obviously still fetch
>> and
>> > process messages, but offset commits will not work since it will not
>> have
>> > the current generation ID.
>> >
>> >
>> > >
>> > > For (2) and (3), now I understand how metadata are used. But I still
>> > don't
>> > > see why should we let the consumers to pass the topic information
>> across
>> > > instead of letting coordinator give the information. The single
>> producer
>> > > use case does not solve the ownership problem in abnormal case either,
>> > > which seems to be a little bit vulnerable.
>> > >
>> >
>> > One of the goals here was to generalize group membership so we can, for
>> > example, use it for balancing Copycat tasks across workers. There's no
>> > topic subscription info in that case. The metadata for copycat workers
>> > would instead need to somehow indicate the current set of tasks that
>> need
>> > to be assigned to workers. By making the metadata completely opaque to
>> the
>> > protocol, it becomes more generally useful since it focuses squarely on
>> the
>> > group membership problem, allowing for that additional bit of metadata
>> so
>> > you don't just get a list of members, but also get a little bit of info
>> > about each of them.
>> >
>> > A different option that we explored is to use a sort of mixed model --
>> > still bake all the topic subscriptions directly into the protocol but
>> also
>> > include metadata. That would allow us to maintain the existing
>> > coordinator-driven approach to handling the metadata and change events
>> like
>> > the ones Onur pointed out. Then something like the Copycat workers would
>> > just not fill in any topic subscriptions and it would be handled as a
>> > degenerate case. Based on the way I explained that we can handle those
>> > types of events, I personally feel its cleaner and a nicer
>> generalization
>> > to not include the subscriptions in the join group protocol, making it
>> part
>> > of the metadata instead.
>> >
>> > For the single producer case, are you saying it doesn't solve ownership
>> in
>> > the abnormal case because a producer that doesn't know it has been
>> kicked
>> > out of the group yet can still produce data even though it shouldn't be
>> > able to anymore? I definitely agree that that is a risk -- this
>> provides a
>> > way to get closer to a true single-writer, but there are definitely
>> still
>> > failure modes that this does not address.
>> >
>> > -Ewen
>> >
>> >
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > > On Tue, Aug 11, 2015 at 11:06 PM, Ewen Cheslack-Postava <
>> > e...@confluent.io
>> > > >
>> > > wrote:
>> > >
>> > > > On Tue, Aug 11, 2015 at 10:15 PM, Jiangjie Qin
>> > <j...@linkedin.com.invalid
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Hi Jason,
>> > > > >
>> > > > > Thanks for writing this up. It would be useful to generalize the
>> > group
>> > > > > concept. I have a few questions below.
>> > > > >
>> > > > > 1. In old consumer actually the partition assignment are done by
>> > > > consumers
>> > > > > themselves. We used zookeeper to guarantee that a partition will
>> only
>> > > be
>> > > > > consumed by one consumer thread who successfully claimed its
>> > ownership.
>> > > > > Does the new protocol plan to provide the same guarantee?
>> > > > >
>> > > >
>> > > > Once you have all the metadata from all the consumers, assignment
>> > should
>> > > > just be a simple function mapping that Map<ConsumerId, Metadata> to
>> > > > Map<ConsumerId, List<TopicPartition>>. If everyone is consistent in
>> > > > computing that, you don't need ZK involved at all.
>> > > >
>> > > > In practice, this shouldn't be that hard to ensure for most
>> assignment
>> > > > strategies just by having decent unit testing on them. You just
>> have to
>> > > do
>> > > > things like ensure your assignment strategy sorts lists into a
>> > consistent
>> > > > order.
>> > > >
>> > > > You do give up the ability to use some techniques (e.g. any
>> randomized
>> > > > algorithm if you can't distribute the seed w/ the metadata) and it's
>> > true
>> > > > that nothing validates the assignment, but if that assignment
>> algorithm
>> > > > step is kept simple, small, and well tested, the risk is very
>> minimal.
>> > > >
>> > > >
>> > > > >
>> > > > > 2. It looks that both JoinGroupRequest and JoinGroupResponse has
>> the
>> > > > > ProtocolMetadata.AssignmentStrategyMetadata, what would be the
>> > metadata
>> > > > be
>> > > > > sent and returned by coordinator? How will the coordinator handle
>> the
>> > > > > metadata?
>> > > > >
>> > > >
>> > > > The coordinator is basically just blindly broadcasting all of it to
>> > group
>> > > > members so they have a consistent view.
>> > > >
>> > > > So from the coordinators perspective, it sees something like:
>> > > >
>> > > > Consumer 1 -> JoinGroupRequest with GroupProtocols = [ "consumer"
>> > > > <Consumer1 opaque byte[]>]
>> > > > Consumer 2 -> JoinGroupRequest with GroupProtocols = [ "consumer"
>> > > > <Consumer2 opaque byte[]>]
>> > > >
>> > > > Then, in the responses would look like:
>> > > >
>> > > > Consumer 1 <- JoinGroupResponse with GroupProtocol = "consumer" and
>> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque byte[]>, Consumer 2
>> > > > <Consumer2 opaque byte[]>]
>> > > > Consumer 2 <- JoinGroupResponse with GroupProtocol = "consumer" and
>> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque byte[]>, Consumer 2
>> > > > <Consumer2 opaque byte[]>]
>> > > >
>> > > > So all the responses include all the metadata for every member in
>> the
>> > > > group, and everyone can use that to consistently decide on
>> assignment.
>> > > The
>> > > > broker doesn't care and cannot even understand the metadata since
>> the
>> > > data
>> > > > format for it is dependent on the assignment strategy being used.
>> > > >
>> > > > As another example that is *not* a consumer, let's say you just
>> want to
>> > > > have a single writer in the group which everyone will forward
>> requests
>> > > to.
>> > > > To accomplish this, you could use a very dumb assignment strategy:
>> > there
>> > > is
>> > > > no metadata (empty byte[]) and all we care about is who is the first
>> > > member
>> > > > in the group (e.g. when IDs are sorted lexicographically). That
>> member
>> > is
>> > > > selected as the writer. In that case, we actually just care about
>> the
>> > > > membership list, there's no additional info about each member that
>> is
>> > > > required to determine who is the writer.
>> > > >
>> > > >
>> > > > > 3. Do you mean that the number of partitions in JoinGroupResponse
>> > will
>> > > be
>> > > > > the max partition number of a topic among all the reported
>> partition
>> > > > number
>> > > > > by consumers? Is there any reason not just let Coordinator to
>> return
>> > > the
>> > > > > number of partitions of a topic in its metadata cache?
>> > > > >
>> > > >
>> > > > Nothing from the embedded protocol is touched by the broker. The
>> broker
>> > > > just collects opaque bytes of metadata, does the selection of the
>> > > strategy
>> > > > if multiple are supported by some consumers, and then returns that
>> > opaque
>> > > > metadata for all the members back to every member. In that way they
>> all
>> > > > have a consistent view of the group. For regular consumers, that
>> view
>> > of
>> > > > the group includes information about how many partitions each
>> consumer
>> > > > currently thinks the topics it is subscribed to has. These could be
>> > > > inconsistent due to out of date metadata and it would be up to the
>> > > > assignment strategy on the *client* to resolve that. As you point
>> out,
>> > in
>> > > > that case they could just take the max value that any consumer
>> reported
>> > > > seeing and use that. The consumers that notice that their metadata
>> had
>> > a
>> > > > smaller # of partitions should also trigger a metadata update when
>> they
>> > > see
>> > > > someone else observing a larger # of partitions.
>> > > >
>> > > >
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jiangjie (Becket) Qin
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <
>> ja...@confluent.io
>> > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Kafka Devs,
>> > > > > >
>> > > > > > One of the nagging issues in the current design of the new
>> consumer
>> > > has
>> > > > > > been the need to support a variety of assignment strategies.
>> We've
>> > > > > > encountered this in particular in the design of copycat and the
>> > > > > processing
>> > > > > > framework (KIP-28). From what I understand, Samza also has a
>> number
>> > > of
>> > > > > use
>> > > > > > cases with custom assignment needs. The new consumer protocol
>> > > supports
>> > > > > new
>> > > > > > assignment strategies by hooking them into the broker. For many
>> > > > > > environments, this is a major pain and in some cases, a
>> > non-starter.
>> > > It
>> > > > > > also challenges the validation that the coordinator can provide.
>> > For
>> > > > > > example, some assignment strategies call for partitions to be
>> > > assigned
>> > > > > > multiple times, which means that the coordinator can only check
>> > that
>> > > > > > partitions have been assigned at least once.
>> > > > > >
>> > > > > > To solve these issues, we'd like to propose moving assignment to
>> > the
>> > > > > > client. I've written a wiki which outlines some protocol
>> changes to
>> > > > > achieve
>> > > > > > this:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
>> > > > > > .
>> > > > > > To summarize briefly, instead of the coordinator assigning the
>> > > > partitions
>> > > > > > itself, all subscriptions are forwarded to each member of the
>> group
>> > > > which
>> > > > > > then decides independently which partitions it should consume.
>> The
>> > > > > protocol
>> > > > > > provides a mechanism for the coordinator to validate that all
>> > > consumers
>> > > > > use
>> > > > > > the same assignment strategy, but it does not ensure that the
>> > > resulting
>> > > > > > assignment is "correct." This provides a powerful capability for
>> > > users
>> > > > to
>> > > > > > control the full data flow on the client side. They control how
>> > data
>> > > is
>> > > > > > written to partitions through the Partitioner interface and they
>> > > > control
>> > > > > > how data is consumed through the assignment strategy, all
>> without
>> > > > > touching
>> > > > > > the server.
>> > > > > >
>> > > > > > Of course nothing comes for free. In particular, this change
>> > removes
>> > > > the
>> > > > > > ability of the coordinator to validate that commits are made by
>> > > > consumers
>> > > > > > who were assigned the respective partition. This might not be
>> too
>> > bad
>> > > > > since
>> > > > > > we retain the ability to validate the generation id, but it is a
>> > > > > potential
>> > > > > > concern. We have considered alternative protocols which add a
>> > second
>> > > > > > round-trip to the protocol in order to give the coordinator the
>> > > ability
>> > > > > to
>> > > > > > confirm the assignment. As mentioned above, the coordinator is
>> > > somewhat
>> > > > > > limited in what it can actually validate, but this would return
>> its
>> > > > > ability
>> > > > > > to validate commits. The tradeoff is that it increases the
>> > protocol's
>> > > > > > complexity which means more ways for the protocol to fail and
>> > > > > consequently
>> > > > > > more edge cases in the code.
>> > > > > >
>> > > > > > It also misses an opportunity to generalize the group membership
>> > > > protocol
>> > > > > > for additional use cases. In fact, after you've gone to the
>> trouble
>> > > of
>> > > > > > moving assignment to the client, the main thing that is left in
>> > this
>> > > > > > protocol is basically a general group management capability.
>> This
>> > is
>> > > > > > exactly what is needed for a few cases that are currently under
>> > > > > discussion
>> > > > > > (e.g. copycat or single-writer producer). We've taken this
>> further
>> > > step
>> > > > > in
>> > > > > > the proposal and attempted to envision what that general
>> protocol
>> > > might
>> > > > > > look like and how it could be used both by the consumer and for
>> > some
>> > > of
>> > > > > > these other cases.
>> > > > > >
>> > > > > > Anyway, since time is running out on the new consumer, we have
>> > > perhaps
>> > > > > one
>> > > > > > last chance to consider a significant change in the protocol
>> like
>> > > this,
>> > > > > so
>> > > > > > have a look at the wiki and share your thoughts. I've no doubt
>> that
>> > > > some
>> > > > > > ideas seem clearer in my mind than they do on paper, so ask
>> > questions
>> > > > if
>> > > > > > there is any confusion.
>> > > > > >
>> > > > > > Thanks!
>> > > > > > Jason
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Thanks,
>> > > > Ewen
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Thanks,
>> > Ewen
>> >
>>
>
>

Reply via email to