Neha and Ewen,

About the metadata change frequency. I guess it really depends on how
frequent the metadata change might occur. If we run Kafka as a service, I
can see that happens from time to time. As I can imagine people will create
some topic, test and maybe delete the topic in some automated test. If so,
the proposed protocol might be a little bit vulnerable.

More specifically the scenario I am thinking is:
1. Consumer 0 periodically refresh metadata and detected a metadata change.
So it sends a JoinGroupRequest with metadata_hash_0.
2. Consumer 1 was notified by controller to start a rebalance, so it
refreshes its metadata and send a JoingGroupRequest with metadata_hash_1,
which is different from metadata hash 0.
3. Rebalance failed and both consumer refresh there metadata again from
different brokers.
4. Depending on the metadata change frequency(or some admin operation like
partition reassigment), they may or may not have the same metadata
returned, so the restart from 3 again.

I agree that step 4 might not be a big concern if consumers updates
metadata at almost the same time, but I'm a little bit worried whether that
assumption really stands because we do not have control over how frequent
the metadata can change.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 14, 2015 at 2:03 AM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> On Thu, Aug 13, 2015 at 11:07 PM, Neha Narkhede <n...@confluent.io> wrote:
>
> > Becket,
> >
> > As you say, the metadata hash addresses the concern you originally raised
> > about large topic subscriptions. Can you please list other problems you
> are
> > raising more clearly? It is more helpful to know problems that the
> proposal
> > does not address or addresses poorly.
> >
> > Regarding other things you said -
> >
> > it is required that each
> > > consumer refresh their metadata before sending a JoinGroupRequest
> > >
> >
> > This is required for wildcard topic subscriptions anyway. So this
> proposal
> > does not introduce a regression. We had agreed earlier that it does not
> > make sense for the server to deserialize regular expressions sent by the
> > consumer.
> >
>
> I don't think consumers need to do a metadata refresh before sending a
> JoinGroupRequest. Metadata changes that affect assignment are rare -- it
> requires changing the number of partitions in a topic. But you might send a
> JoinGroupRequest simply because a new member is trying to join the group.
> That case is presumably much more common.
>
> I think it's actually a good idea to have the first JoinGroup cycle fail in
> some cases, and has little impact. Lets say the metadata does change
> because partitions are added. Then we might fail in the first round, but
> then all members detect that issue *immediately*, refresh their metadata,
> and submit a new join group request. This second cycle does not require a
> full heartbeat cycle. It happens much more quickly because everyone
> detected the inconsistency based on the first JoinGroupResponse. The
> inconsistency should be resolved very quickly (barring other failures like
> a member leaving mid-rebalance)




> >
> >
> > > the metadata might still be inconsistent if there is a topic or
> partition
> > > change because the
> > > UpdateMetadataRequest from controller might be handled at different
> time.
> > >
> >
> > Topic metadata does not change frequently and even if it did, a couple
> > rebalance attempts will be needed whether the coordinator drives the
> > assignments or the consumer. Because guess how the coordinator knows
> about
> > the topic metadata changes -- indirectly through either a zk callback or
> > UpdateMetadataRequest, so it is completely possible the coordinator sees
> > the topic metadata changes in batches, not all at once.
> >
>
> > On Thu, Aug 13, 2015 at 10:50 PM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > Ewen/Jason,
> > >
> > > The metadata hash is a clever approach and certainly addresses the
> > problem
> > > of large metadata for consumers like mirror maker. Few comments -
> > >
> > >
> > >    1. In the interest of simplifying the format of the consumer's
> > >    metadata - Why not just always include only the topic names in the
> > metadata
> > >    followed by the metadata hash? If the metadata hash check succeeds,
> > each
> > >    consumer uses the # of partitions it had fetched. If it fails, a
> > rebalance
> > >    happens and the metadata is not used anyway.
> >
>
> Doing this requires that every consumer always fetch the full metadata. The
> most common use case is consumers that just want to consume one or a couple
> of topics, in which case grabbing all metadata for the entire cluster is
> wasteful. If I subscribe only to topic A, why make all consumers grab
> metadata for the entire topic (and need to rebalance every time it
> changes!). Including the # of partitions for each topic lets you avoid
> having to grab the global set of metadata.
>
> So if you're just subscribing to one or a couple of topics, why not just
> compute the hash by filtering out everything but the topics you are
> subscribed to? The problem there is if you ever add/remove subscriptions
> and want to support rolling upgrades. If the group was subscribed to topic
> A, but later changes require subscribing to A + B, then to achieve a
> seamless rolling upgrade would require one (old) consumer to be subscribing
> to A and one (new) consumer to be subscribing to A+B. If we computed
> metadata hashes based on filtered metadata, those two would disagree and we
> could not perform assignment while the upgrade was in progress.
>
> The solution is to differentiate between the cases when a very small amount
> of the metadata is needed (one or a couple of topic subscriptions;
> communicate and share this via metadata in the JoinGroup protocol) vs when
> *all* the metadata is needed (regex subscription; verify agreement via
> hash).
>

>
>
> > >    2. Do you need a topic list and topic regex to be separate? A single
> > >    topic or list of topics can be expressed as a regex.
> >
>
> See above note about collecting all metadata when you really only need it
> for 1 or 2 topics. There's probably some debate to be had about whether
> this cost would be too high -- every consumer would need to request the
> metadata for all topics, and they'd need to request that all every time
> they might be out of date.
>
Are we going to allow consumers in the same group to subscribe to different
topic set? If we do, we need to let them refresh metadata for all the
topics a group is consuming from. If we don't then in the protocol we only
need a subscription set hash.

>
>
> > >    3. Let's include a version explicitly at the beginning of the
> > >    ProtocolMetadata. The version dictates how to deserialize the
> > >    ProtocolMetadata blob and is consistent with the rest of Kafka.
> >
>
> If I'm understanding correctly, in JoinGroupRequest I would change
>
> GroupProtocols          => [Protocol ProtocolMetadata]
>
> to
>
> GroupProtocols          => [Protocol ProtocolVersion ProtocolMetadata]
>
> We had been talking about just baking the version into the Protocol field,
> but making it separate seems perfectly reasonable to me. Jason, any issue
> with splitting the version out into a separate field like this?
>
> >
> > > That can simplify the metadata format to the following:
> > >
> > > GroupType => "consumer"
> > >>
> > >> Protocol => AssignmentStrategy
> > >>   AssignmentStrategy   => String
> > >>
> > >> ProtocolMetadata => Version Subscription AssignmentStrategyMetadata
> > >
> > >     Version                    => String
> > >
> > >   Subscription                 => TopicRegex MetadataHash
> > >>     TopicRegex                 => String
> > >>     MetadataHash               => String
> > >>   AssignmentStrategyMetadata   => bytes
> >
> >
> > >
> > > On Thu, Aug 13, 2015 at 6:28 PM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > wrote:
> > >
> > >> Ewen and Jason,
> > >>
> > >> Thanks for the reply. Sorry I missed the metadata hash. Yes, that is a
> > >> clever approach and would solve my concern about the data passing
> > around.
> > >> I
> > >> can see both pros and cons from doing this, though. The advantage is
> we
> > >> don't need the topic metadata in JoinGroupResponse anymore. The
> downside
> > >> is
> > >> that now rebalance have extra dependency on the consensus of metadata
> of
> > >> all consumers, which is obtained separately. So it is required that
> each
> > >> consumer refresh their metadata before sending a JoinGroupRequest,
> > >> otherwise in some cases (e.g. wildcard consumers) will almost
> certainly
> > >> fail for the first rebalance attempt. Even if we do that, since the
> > >> consumers are getting metadata from different brokers, the metadata
> > might
> > >> still be inconsistent if there is a topic or partition change because
> > the
> > >> UpdateMetadataRequest from controller might be handled at different
> > time.
> > >> Just want to make sure we think through the cases so the protocol does
> > not
> > >> cause us unexpected issues.
> > >>
> > >> About the number of consumers, I think with the current liveliness
> > >> definition, we can tolerate churns by bumping up the session timeout.
> > Also
> > >> I guess we will see an increasing number of consumers for new
> consumer,
> > >> because every the old consumer thread will probably become a consumer.
> > >>
> > >> It is a valid concern for consumers that have large subscription set.
> > This
> > >> might not be avoided though for client side assignment approach. One
> > >> solution is having topic names associate with a topic ID. And only use
> > >> topic ID in JoinGroupRequest and JoinGroupResponse, There is a
> > discussion
> > >> thread about this to solve the topic renaming case but this is a
> > >> completely
> > >> different discussion.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >>
> > >> On Thu, Aug 13, 2015 at 2:14 PM, Jason Gustafson <ja...@confluent.io>
> > >> wrote:
> > >>
> > >> > Thanks Jiangjie, that information helps. I agree the protocol must
> > >> consider
> > >> > scalability. My point was that the synchronization barrier in the
> > >> current
> > >> > protocol already effectively limits the number of consumers since it
> > >> > provides no way to gracefully handle churn. It wouldn't be worth
> > >> worrying
> > >> > about scaling up to 100,000 members, for example, because there's no
> > way
> > >> > the group would be stable. So we just need to set some clear
> > >> expectations
> > >> > on the size we can scale to, and that can help inform the discussion
> > on
> > >> the
> > >> > size of messages in this protocol.
> > >> >
> > >> > Ewen and I were discussing this morning along similar lines to what
> > >> you're
> > >> > suggesting. However, even if the coordinator decides on the metadata
> > for
> > >> > the group, each member still needs to communicate its subscriptions
> to
> > >> the
> > >> > rest of the group. This is nice for the regex case since the regex
> is
> > >> > probably small, but if the members have a large topic list, then we
> > have
> > >> > the same problem. One thing I was thinking about was whether we
> really
> > >> need
> > >> > to handle different subscriptions for every member. If the
> coordinator
> > >> > could guarantee that all members had the same subscription, then
> there
> > >> > would be no need for the coordinator to return the subscriptions for
> > >> each
> > >> > member. However, this would prevent graceful upgrades. We might be
> > able
> > >> to
> > >> > fix that problem by allowing the consumer to provide two
> subscriptions
> > >> to
> > >> > allowing rolling updates, but that starts to sound pretty nasty.
> > >> >
> > >> > -Jason
> > >> >
> > >> > On Thu, Aug 13, 2015 at 1:41 PM, Jiangjie Qin
> > <j...@linkedin.com.invalid
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Jason,
> > >> > >
> > >> > > The protocol has to consider the scalability. The protocol in the
> > wiki
> > >> > > means the JoinGroupResoponse size would be:
> > >> > > NumberOfTopics * (AvgTopicNameLength + 4) * (NumberOfConsumers)^2
> > >> > >
> > >> > > To give some real number, we have 26-node Mirror Maker cluster,
> each
> > >> > with 4
> > >> > > consumers. That is 104 consumers using regex ".*". And most of our
> > >> > clusters
> > >> > > have around 3000 topics, whose topic name are typically around 20
> > >> > > characters.
> > >> > >
> > >> > > I think the key issue for client side partition assignment logic
> is
> > to
> > >> > make
> > >> > > sure 1) all the clients run the same algorithm. 2) all the clients
> > >> make
> > >> > > decision on the same topic metadata. The second purpose can be
> done
> > by
> > >> > > simply letting coordinator provide the topic metadata and all then
> > >> member
> > >> > > information as source of truth. Is it necessary to pass topic
> > >> metadata of
> > >> > > each consumer around? Can we keep the protocol metadata field
> > >> completely
> > >> > > independent of topic metadata? I think In the JoinGroupResponse,
> we
> > >> > should
> > >> > > have only one copy of topic metadata provided by coordinator and
> is
> > >> > outside
> > >> > > of protocol metadata. If user decides to put some metadata in the
> > >> > > JoinGroupRequest and let coordinator pass around, they are
> > responsible
> > >> > for
> > >> > > understanding the risk.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jiangjie (Becket) Qin
> > >> > >
> > >> > >
> > >> > > On Thu, Aug 13, 2015 at 12:41 PM, Jason Gustafson <
> > ja...@confluent.io
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > Hey Onur and Jiangjie,
> > >> > > >
> > >> > > > I've updated that wiki with a proposal to add regex
> subscriptions
> > to
> > >> > the
> > >> > > > consumer metadata. Can you have a look to see if it addresses
> your
> > >> > > concern?
> > >> > > > In general, I think we should be a little careful when we are
> > >> talking
> > >> > > about
> > >> > > > the scalability of the protocol. Regardless of whether
> assignment
> > is
> > >> > done
> > >> > > > on the server or the client, the protocol assumes a relatively
> > >> stable
> > >> > > > configuration. When the number of consumers increases beyond a
> > >> certain
> > >> > > > limit, then membership churn becomes a major concern. Similarly
> > >> there
> > >> > is
> > >> > > a
> > >> > > > notion of metadata churn when topics are added, deleted, or
> > >> resized. If
> > >> > > > either membership or metadata changes, then the protocol forces
> > all
> > >> > > > consumers to stop consumption and rejoin the group. If this
> > happens
> > >> > often
> > >> > > > enough, then it can severely impact the ability of the consumer
> to
> > >> make
> > >> > > > progress. The point is that the protocol may already be unsuited
> > to
> > >> > cases
> > >> > > > where there are either a large number of consumers or topics. I
> > >> wonder
> > >> > if
> > >> > > > you guys can share your thoughts about your scaling
> expectations?
> > >> > > >
> > >> > > > -Jason
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Wed, Aug 12, 2015 at 12:28 PM, Jason Gustafson <
> > >> ja...@confluent.io>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Hey Jiangjie,
> > >> > > > >
> > >> > > > > That's a great point. In the worst case (the mirror maker
> case I
> > >> > > guess),
> > >> > > > > the join group response can be massive. This would be
> especially
> > >> > deadly
> > >> > > > > when there is a lot of churn in the group (e.g. in a rolling
> > >> > upgrade).
> > >> > > > The
> > >> > > > > current protocol is not great for this case either, but it's
> > >> > > > significantly
> > >> > > > > better. Here are a couple ways to deal with the size:
> > >> > > > >
> > >> > > > > 1. First, we could have the coordinator compress the
> responses.
> > >> This
> > >> > > > would
> > >> > > > > probably be pretty effective if applied across the metadata
> from
> > >> all
> > >> > > > > members.
> > >> > > > >
> > >> > > > > 2. I think the regex case is the main problem. Is that right?
> We
> > >> > could
> > >> > > > > extend the metadata to allow the consumer to embed its regex
> > >> > > subscription
> > >> > > > > in the metadata directly (note this might be a good idea
> > >> regardless
> > >> > of
> > >> > > > the
> > >> > > > > rest of this proposal). To support regex on the consumer, we
> > must
> > >> > fetch
> > >> > > > > metadata for all topics. Rather than having all regex
> > subscribers
> > >> > embed
> > >> > > > all
> > >> > > > > of this metadata in their join group requests, they could
> > instead
> > >> > > embed a
> > >> > > > > hash of it. Then after the join group responses are received,
> > they
> > >> > just
> > >> > > > > need to check that the hashes are the same. If there is a
> > mismatch
> > >> > > (which
> > >> > > > > should only occur when topics are created, deleted, or
> resized),
> > >> then
> > >> > > the
> > >> > > > > group members must refetch the metadata and rejoin the group.
> > >> This is
> > >> > > > also
> > >> > > > > how the current protocol behaves when there is a change in the
> > >> topic
> > >> > > > > metadata affecting the group--someone (either the coordinator
> or
> > >> the
> > >> > > > > consumer) detects the change and forces the group to
> rebalance.
> > >> > > > >
> > >> > > > > What do you think?
> > >> > > > >
> > >> > > > > (Also I think adding groupId/generationId to fetch and produce
> > >> > requests
> > >> > > > > seems like an interesting line of thought.)
> > >> > > > >
> > >> > > > > -Jason
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > On Wed, Aug 12, 2015 at 10:57 AM, Jiangjie Qin
> > >> > > <j...@linkedin.com.invalid
> > >> > > > >
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > >> Hey Ewen,
> > >> > > > >>
> > >> > > > >> Onur and I discussed this a little bit more. And we are still
> > >> > worrying
> > >> > > > >> about passing all the metadata of all consumers around.
> > >> > > > >>
> > >> > > > >> Let's say I have a cluster has 10,000 topics, the average
> topic
> > >> name
> > >> > > > >> length
> > >> > > > >> is 10 bytes. In this case, the opaque metadata will have 10 *
> > >> > 10,000 =
> > >> > > > >> 100KB for topic name, for each topic, there is a 4-byte
> integer
> > >> of
> > >> > > > number
> > >> > > > >> of partitions, that's another 40KB. So one global topic
> > metadata
> > >> > will
> > >> > > > have
> > >> > > > >> 140KB data. If I have 100 consumers who are using wildcard to
> > >> > consume
> > >> > > > from
> > >> > > > >> all the topics. That means the protocol metadata end up in
> the
> > >> > > > >> JoinGroupResponse will be 140KB * 100 = 14MB. And the
> > >> > > JoinGroupResponse
> > >> > > > >> will need to be sent to 100 different consumers, that means
> > 14MB
> > >> *
> > >> > > 100 =
> > >> > > > >> 1.4GB need to be sent by the consumer coordinator for one
> > >> rebalance.
> > >> > > How
> > >> > > > >> would that work?
> > >> > > > >>
> > >> > > > >> Also, having two consumers (old owner and new owner)
> consuming
> > >> from
> > >> > > the
> > >> > > > >> same partition might also be a problem. e.g. people are
> > updating
> > >> > > > database.
> > >> > > > >> One thing might worth doing is to add GroupId and Generation
> ID
> > >> to
> > >> > > > >> ProducerRequest and FetchRequest as well. This will also help
> > >> with
> > >> > the
> > >> > > > >> single producer use case. However, this is probably
> orthogonal
> > to
> > >> > this
> > >> > > > >> thread given the current new consumer also has this problem
> > and I
> > >> > > > believe
> > >> > > > >> we need to fix it.
> > >> > > > >>
> > >> > > > >> Thanks,
> > >> > > > >>
> > >> > > > >> Jiangjie (Becket) Qin
> > >> > > > >>
> > >> > > > >> On Tue, Aug 11, 2015 at 11:43 PM, Ewen Cheslack-Postava <
> > >> > > > >> e...@confluent.io>
> > >> > > > >> wrote:
> > >> > > > >>
> > >> > > > >> > On Tue, Aug 11, 2015 at 11:29 PM, Jiangjie Qin
> > >> > > > >> <j...@linkedin.com.invalid>
> > >> > > > >> > wrote:
> > >> > > > >> >
> > >> > > > >> > > Ewen,
> > >> > > > >> > >
> > >> > > > >> > > Thanks for the explanation.
> > >> > > > >> > >
> > >> > > > >> > > For (1), I am more concerned about the failure case
> instead
> > >> of
> > >> > > > normal
> > >> > > > >> > case.
> > >> > > > >> > > What if a consumer somehow was kick out of a group but is
> > >> still
> > >> > > > >> consuming
> > >> > > > >> > > and committing offsets? Does that mean the new owner and
> > old
> > >> > owner
> > >> > > > >> might
> > >> > > > >> > > potentially consuming from and committing offsets for the
> > >> same
> > >> > > > >> partition?
> > >> > > > >> > > In the old consumer, this won't happen because the new
> > >> consumer
> > >> > > will
> > >> > > > >> not
> > >> > > > >> > be
> > >> > > > >> > > able to start consumption unless the previous owner has
> > >> released
> > >> > > its
> > >> > > > >> > > ownership. Basically, without the ownership guarantee, I
> > >> don't
> > >> > see
> > >> > > > how
> > >> > > > >> > the
> > >> > > > >> > > communication among consumers themselves alone can solve
> > the
> > >> > > problem
> > >> > > > >> > here.
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >> > The generation ID check still applies to offset commits. If
> > >> one of
> > >> > > the
> > >> > > > >> > consumers is kicked out and misbehaving, it can obviously
> > still
> > >> > > fetch
> > >> > > > >> and
> > >> > > > >> > process messages, but offset commits will not work since it
> > >> will
> > >> > not
> > >> > > > >> have
> > >> > > > >> > the current generation ID.
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >> > >
> > >> > > > >> > > For (2) and (3), now I understand how metadata are used.
> > But
> > >> I
> > >> > > still
> > >> > > > >> > don't
> > >> > > > >> > > see why should we let the consumers to pass the topic
> > >> > information
> > >> > > > >> across
> > >> > > > >> > > instead of letting coordinator give the information. The
> > >> single
> > >> > > > >> producer
> > >> > > > >> > > use case does not solve the ownership problem in abnormal
> > >> case
> > >> > > > either,
> > >> > > > >> > > which seems to be a little bit vulnerable.
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >> > One of the goals here was to generalize group membership so
> > we
> > >> > can,
> > >> > > > for
> > >> > > > >> > example, use it for balancing Copycat tasks across workers.
> > >> > There's
> > >> > > no
> > >> > > > >> > topic subscription info in that case. The metadata for
> > copycat
> > >> > > workers
> > >> > > > >> > would instead need to somehow indicate the current set of
> > tasks
> > >> > that
> > >> > > > >> need
> > >> > > > >> > to be assigned to workers. By making the metadata
> completely
> > >> > opaque
> > >> > > to
> > >> > > > >> the
> > >> > > > >> > protocol, it becomes more generally useful since it focuses
> > >> > squarely
> > >> > > > on
> > >> > > > >> the
> > >> > > > >> > group membership problem, allowing for that additional bit
> of
> > >> > > metadata
> > >> > > > >> so
> > >> > > > >> > you don't just get a list of members, but also get a little
> > >> bit of
> > >> > > > info
> > >> > > > >> > about each of them.
> > >> > > > >> >
> > >> > > > >> > A different option that we explored is to use a sort of
> mixed
> > >> > model
> > >> > > --
> > >> > > > >> > still bake all the topic subscriptions directly into the
> > >> protocol
> > >> > > but
> > >> > > > >> also
> > >> > > > >> > include metadata. That would allow us to maintain the
> > existing
> > >> > > > >> > coordinator-driven approach to handling the metadata and
> > change
> > >> > > events
> > >> > > > >> like
> > >> > > > >> > the ones Onur pointed out. Then something like the Copycat
> > >> workers
> > >> > > > would
> > >> > > > >> > just not fill in any topic subscriptions and it would be
> > >> handled
> > >> > as
> > >> > > a
> > >> > > > >> > degenerate case. Based on the way I explained that we can
> > >> handle
> > >> > > those
> > >> > > > >> > types of events, I personally feel its cleaner and a nicer
> > >> > > > >> generalization
> > >> > > > >> > to not include the subscriptions in the join group
> protocol,
> > >> > making
> > >> > > it
> > >> > > > >> part
> > >> > > > >> > of the metadata instead.
> > >> > > > >> >
> > >> > > > >> > For the single producer case, are you saying it doesn't
> solve
> > >> > > > ownership
> > >> > > > >> in
> > >> > > > >> > the abnormal case because a producer that doesn't know it
> has
> > >> been
> > >> > > > >> kicked
> > >> > > > >> > out of the group yet can still produce data even though it
> > >> > shouldn't
> > >> > > > be
> > >> > > > >> > able to anymore? I definitely agree that that is a risk --
> > this
> > >> > > > >> provides a
> > >> > > > >> > way to get closer to a true single-writer, but there are
> > >> > definitely
> > >> > > > >> still
> > >> > > > >> > failure modes that this does not address.
> > >> > > > >> >
> > >> > > > >> > -Ewen
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >> > >
> > >> > > > >> > > Thanks,
> > >> > > > >> > >
> > >> > > > >> > > Jiangjie (Becket) Qin
> > >> > > > >> > >
> > >> > > > >> > >
> > >> > > > >> > > On Tue, Aug 11, 2015 at 11:06 PM, Ewen Cheslack-Postava <
> > >> > > > >> > e...@confluent.io
> > >> > > > >> > > >
> > >> > > > >> > > wrote:
> > >> > > > >> > >
> > >> > > > >> > > > On Tue, Aug 11, 2015 at 10:15 PM, Jiangjie Qin
> > >> > > > >> > <j...@linkedin.com.invalid
> > >> > > > >> > > >
> > >> > > > >> > > > wrote:
> > >> > > > >> > > >
> > >> > > > >> > > > > Hi Jason,
> > >> > > > >> > > > >
> > >> > > > >> > > > > Thanks for writing this up. It would be useful to
> > >> generalize
> > >> > > the
> > >> > > > >> > group
> > >> > > > >> > > > > concept. I have a few questions below.
> > >> > > > >> > > > >
> > >> > > > >> > > > > 1. In old consumer actually the partition assignment
> > are
> > >> > done
> > >> > > by
> > >> > > > >> > > > consumers
> > >> > > > >> > > > > themselves. We used zookeeper to guarantee that a
> > >> partition
> > >> > > will
> > >> > > > >> only
> > >> > > > >> > > be
> > >> > > > >> > > > > consumed by one consumer thread who successfully
> > claimed
> > >> its
> > >> > > > >> > ownership.
> > >> > > > >> > > > > Does the new protocol plan to provide the same
> > guarantee?
> > >> > > > >> > > > >
> > >> > > > >> > > >
> > >> > > > >> > > > Once you have all the metadata from all the consumers,
> > >> > > assignment
> > >> > > > >> > should
> > >> > > > >> > > > just be a simple function mapping that Map<ConsumerId,
> > >> > Metadata>
> > >> > > > to
> > >> > > > >> > > > Map<ConsumerId, List<TopicPartition>>. If everyone is
> > >> > consistent
> > >> > > > in
> > >> > > > >> > > > computing that, you don't need ZK involved at all.
> > >> > > > >> > > >
> > >> > > > >> > > > In practice, this shouldn't be that hard to ensure for
> > most
> > >> > > > >> assignment
> > >> > > > >> > > > strategies just by having decent unit testing on them.
> > You
> > >> > just
> > >> > > > >> have to
> > >> > > > >> > > do
> > >> > > > >> > > > things like ensure your assignment strategy sorts lists
> > >> into a
> > >> > > > >> > consistent
> > >> > > > >> > > > order.
> > >> > > > >> > > >
> > >> > > > >> > > > You do give up the ability to use some techniques (e.g.
> > any
> > >> > > > >> randomized
> > >> > > > >> > > > algorithm if you can't distribute the seed w/ the
> > metadata)
> > >> > and
> > >> > > > it's
> > >> > > > >> > true
> > >> > > > >> > > > that nothing validates the assignment, but if that
> > >> assignment
> > >> > > > >> algorithm
> > >> > > > >> > > > step is kept simple, small, and well tested, the risk
> is
> > >> very
> > >> > > > >> minimal.
> > >> > > > >> > > >
> > >> > > > >> > > >
> > >> > > > >> > > > >
> > >> > > > >> > > > > 2. It looks that both JoinGroupRequest and
> > >> JoinGroupResponse
> > >> > > has
> > >> > > > >> the
> > >> > > > >> > > > > ProtocolMetadata.AssignmentStrategyMetadata, what
> would
> > >> be
> > >> > the
> > >> > > > >> > metadata
> > >> > > > >> > > > be
> > >> > > > >> > > > > sent and returned by coordinator? How will the
> > >> coordinator
> > >> > > > handle
> > >> > > > >> the
> > >> > > > >> > > > > metadata?
> > >> > > > >> > > > >
> > >> > > > >> > > >
> > >> > > > >> > > > The coordinator is basically just blindly broadcasting
> > all
> > >> of
> > >> > it
> > >> > > > to
> > >> > > > >> > group
> > >> > > > >> > > > members so they have a consistent view.
> > >> > > > >> > > >
> > >> > > > >> > > > So from the coordinators perspective, it sees something
> > >> like:
> > >> > > > >> > > >
> > >> > > > >> > > > Consumer 1 -> JoinGroupRequest with GroupProtocols = [
> > >> > > "consumer"
> > >> > > > >> > > > <Consumer1 opaque byte[]>]
> > >> > > > >> > > > Consumer 2 -> JoinGroupRequest with GroupProtocols = [
> > >> > > "consumer"
> > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > >> > > > >> > > >
> > >> > > > >> > > > Then, in the responses would look like:
> > >> > > > >> > > >
> > >> > > > >> > > > Consumer 1 <- JoinGroupResponse with GroupProtocol =
> > >> > "consumer"
> > >> > > > and
> > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque byte[]>,
> > >> > Consumer
> > >> > > 2
> > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > >> > > > >> > > > Consumer 2 <- JoinGroupResponse with GroupProtocol =
> > >> > "consumer"
> > >> > > > and
> > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque byte[]>,
> > >> > Consumer
> > >> > > 2
> > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > >> > > > >> > > >
> > >> > > > >> > > > So all the responses include all the metadata for every
> > >> member
> > >> > > in
> > >> > > > >> the
> > >> > > > >> > > > group, and everyone can use that to consistently decide
> > on
> > >> > > > >> assignment.
> > >> > > > >> > > The
> > >> > > > >> > > > broker doesn't care and cannot even understand the
> > metadata
> > >> > > since
> > >> > > > >> the
> > >> > > > >> > > data
> > >> > > > >> > > > format for it is dependent on the assignment strategy
> > being
> > >> > > used.
> > >> > > > >> > > >
> > >> > > > >> > > > As another example that is *not* a consumer, let's say
> > you
> > >> > just
> > >> > > > >> want to
> > >> > > > >> > > > have a single writer in the group which everyone will
> > >> forward
> > >> > > > >> requests
> > >> > > > >> > > to.
> > >> > > > >> > > > To accomplish this, you could use a very dumb
> assignment
> > >> > > strategy:
> > >> > > > >> > there
> > >> > > > >> > > is
> > >> > > > >> > > > no metadata (empty byte[]) and all we care about is who
> > is
> > >> the
> > >> > > > first
> > >> > > > >> > > member
> > >> > > > >> > > > in the group (e.g. when IDs are sorted
> > lexicographically).
> > >> > That
> > >> > > > >> member
> > >> > > > >> > is
> > >> > > > >> > > > selected as the writer. In that case, we actually just
> > care
> > >> > > about
> > >> > > > >> the
> > >> > > > >> > > > membership list, there's no additional info about each
> > >> member
> > >> > > that
> > >> > > > >> is
> > >> > > > >> > > > required to determine who is the writer.
> > >> > > > >> > > >
> > >> > > > >> > > >
> > >> > > > >> > > > > 3. Do you mean that the number of partitions in
> > >> > > > JoinGroupResponse
> > >> > > > >> > will
> > >> > > > >> > > be
> > >> > > > >> > > > > the max partition number of a topic among all the
> > >> reported
> > >> > > > >> partition
> > >> > > > >> > > > number
> > >> > > > >> > > > > by consumers? Is there any reason not just let
> > >> Coordinator
> > >> > to
> > >> > > > >> return
> > >> > > > >> > > the
> > >> > > > >> > > > > number of partitions of a topic in its metadata
> cache?
> > >> > > > >> > > > >
> > >> > > > >> > > >
> > >> > > > >> > > > Nothing from the embedded protocol is touched by the
> > >> broker.
> > >> > The
> > >> > > > >> broker
> > >> > > > >> > > > just collects opaque bytes of metadata, does the
> > selection
> > >> of
> > >> > > the
> > >> > > > >> > > strategy
> > >> > > > >> > > > if multiple are supported by some consumers, and then
> > >> returns
> > >> > > that
> > >> > > > >> > opaque
> > >> > > > >> > > > metadata for all the members back to every member. In
> > that
> > >> way
> > >> > > > they
> > >> > > > >> all
> > >> > > > >> > > > have a consistent view of the group. For regular
> > consumers,
> > >> > that
> > >> > > > >> view
> > >> > > > >> > of
> > >> > > > >> > > > the group includes information about how many
> partitions
> > >> each
> > >> > > > >> consumer
> > >> > > > >> > > > currently thinks the topics it is subscribed to has.
> > These
> > >> > could
> > >> > > > be
> > >> > > > >> > > > inconsistent due to out of date metadata and it would
> be
> > >> up to
> > >> > > the
> > >> > > > >> > > > assignment strategy on the *client* to resolve that. As
> > you
> > >> > > point
> > >> > > > >> out,
> > >> > > > >> > in
> > >> > > > >> > > > that case they could just take the max value that any
> > >> consumer
> > >> > > > >> reported
> > >> > > > >> > > > seeing and use that. The consumers that notice that
> their
> > >> > > metadata
> > >> > > > >> had
> > >> > > > >> > a
> > >> > > > >> > > > smaller # of partitions should also trigger a metadata
> > >> update
> > >> > > when
> > >> > > > >> they
> > >> > > > >> > > see
> > >> > > > >> > > > someone else observing a larger # of partitions.
> > >> > > > >> > > >
> > >> > > > >> > > >
> > >> > > > >> > > > >
> > >> > > > >> > > > > Thanks,
> > >> > > > >> > > > >
> > >> > > > >> > > > > Jiangjie (Becket) Qin
> > >> > > > >> > > > >
> > >> > > > >> > > > >
> > >> > > > >> > > > >
> > >> > > > >> > > > >
> > >> > > > >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason Gustafson <
> > >> > > > >> ja...@confluent.io
> > >> > > > >> > >
> > >> > > > >> > > > > wrote:
> > >> > > > >> > > > >
> > >> > > > >> > > > > > Hi Kafka Devs,
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > One of the nagging issues in the current design of
> > the
> > >> new
> > >> > > > >> consumer
> > >> > > > >> > > has
> > >> > > > >> > > > > > been the need to support a variety of assignment
> > >> > strategies.
> > >> > > > >> We've
> > >> > > > >> > > > > > encountered this in particular in the design of
> > copycat
> > >> > and
> > >> > > > the
> > >> > > > >> > > > > processing
> > >> > > > >> > > > > > framework (KIP-28). From what I understand, Samza
> > also
> > >> > has a
> > >> > > > >> number
> > >> > > > >> > > of
> > >> > > > >> > > > > use
> > >> > > > >> > > > > > cases with custom assignment needs. The new
> consumer
> > >> > > protocol
> > >> > > > >> > > supports
> > >> > > > >> > > > > new
> > >> > > > >> > > > > > assignment strategies by hooking them into the
> > broker.
> > >> For
> > >> > > > many
> > >> > > > >> > > > > > environments, this is a major pain and in some
> > cases, a
> > >> > > > >> > non-starter.
> > >> > > > >> > > It
> > >> > > > >> > > > > > also challenges the validation that the coordinator
> > can
> > >> > > > provide.
> > >> > > > >> > For
> > >> > > > >> > > > > > example, some assignment strategies call for
> > >> partitions to
> > >> > > be
> > >> > > > >> > > assigned
> > >> > > > >> > > > > > multiple times, which means that the coordinator
> can
> > >> only
> > >> > > > check
> > >> > > > >> > that
> > >> > > > >> > > > > > partitions have been assigned at least once.
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > To solve these issues, we'd like to propose moving
> > >> > > assignment
> > >> > > > to
> > >> > > > >> > the
> > >> > > > >> > > > > > client. I've written a wiki which outlines some
> > >> protocol
> > >> > > > >> changes to
> > >> > > > >> > > > > achieve
> > >> > > > >> > > > > > this:
> > >> > > > >> > > > > >
> > >> > > > >> > > > > >
> > >> > > > >> > > > >
> > >> > > > >> > > >
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > >> > > > >> > > > > > .
> > >> > > > >> > > > > > To summarize briefly, instead of the coordinator
> > >> assigning
> > >> > > the
> > >> > > > >> > > > partitions
> > >> > > > >> > > > > > itself, all subscriptions are forwarded to each
> > member
> > >> of
> > >> > > the
> > >> > > > >> group
> > >> > > > >> > > > which
> > >> > > > >> > > > > > then decides independently which partitions it
> should
> > >> > > consume.
> > >> > > > >> The
> > >> > > > >> > > > > protocol
> > >> > > > >> > > > > > provides a mechanism for the coordinator to
> validate
> > >> that
> > >> > > all
> > >> > > > >> > > consumers
> > >> > > > >> > > > > use
> > >> > > > >> > > > > > the same assignment strategy, but it does not
> ensure
> > >> that
> > >> > > the
> > >> > > > >> > > resulting
> > >> > > > >> > > > > > assignment is "correct." This provides a powerful
> > >> > capability
> > >> > > > for
> > >> > > > >> > > users
> > >> > > > >> > > > to
> > >> > > > >> > > > > > control the full data flow on the client side. They
> > >> > control
> > >> > > > how
> > >> > > > >> > data
> > >> > > > >> > > is
> > >> > > > >> > > > > > written to partitions through the Partitioner
> > interface
> > >> > and
> > >> > > > they
> > >> > > > >> > > > control
> > >> > > > >> > > > > > how data is consumed through the assignment
> strategy,
> > >> all
> > >> > > > >> without
> > >> > > > >> > > > > touching
> > >> > > > >> > > > > > the server.
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > Of course nothing comes for free. In particular,
> this
> > >> > change
> > >> > > > >> > removes
> > >> > > > >> > > > the
> > >> > > > >> > > > > > ability of the coordinator to validate that commits
> > are
> > >> > made
> > >> > > > by
> > >> > > > >> > > > consumers
> > >> > > > >> > > > > > who were assigned the respective partition. This
> > might
> > >> not
> > >> > > be
> > >> > > > >> too
> > >> > > > >> > bad
> > >> > > > >> > > > > since
> > >> > > > >> > > > > > we retain the ability to validate the generation
> id,
> > >> but
> > >> > it
> > >> > > > is a
> > >> > > > >> > > > > potential
> > >> > > > >> > > > > > concern. We have considered alternative protocols
> > which
> > >> > add
> > >> > > a
> > >> > > > >> > second
> > >> > > > >> > > > > > round-trip to the protocol in order to give the
> > >> > coordinator
> > >> > > > the
> > >> > > > >> > > ability
> > >> > > > >> > > > > to
> > >> > > > >> > > > > > confirm the assignment. As mentioned above, the
> > >> > coordinator
> > >> > > is
> > >> > > > >> > > somewhat
> > >> > > > >> > > > > > limited in what it can actually validate, but this
> > >> would
> > >> > > > return
> > >> > > > >> its
> > >> > > > >> > > > > ability
> > >> > > > >> > > > > > to validate commits. The tradeoff is that it
> > increases
> > >> the
> > >> > > > >> > protocol's
> > >> > > > >> > > > > > complexity which means more ways for the protocol
> to
> > >> fail
> > >> > > and
> > >> > > > >> > > > > consequently
> > >> > > > >> > > > > > more edge cases in the code.
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > It also misses an opportunity to generalize the
> group
> > >> > > > membership
> > >> > > > >> > > > protocol
> > >> > > > >> > > > > > for additional use cases. In fact, after you've
> gone
> > to
> > >> > the
> > >> > > > >> trouble
> > >> > > > >> > > of
> > >> > > > >> > > > > > moving assignment to the client, the main thing
> that
> > is
> > >> > left
> > >> > > > in
> > >> > > > >> > this
> > >> > > > >> > > > > > protocol is basically a general group management
> > >> > capability.
> > >> > > > >> This
> > >> > > > >> > is
> > >> > > > >> > > > > > exactly what is needed for a few cases that are
> > >> currently
> > >> > > > under
> > >> > > > >> > > > > discussion
> > >> > > > >> > > > > > (e.g. copycat or single-writer producer). We've
> taken
> > >> this
> > >> > > > >> further
> > >> > > > >> > > step
> > >> > > > >> > > > > in
> > >> > > > >> > > > > > the proposal and attempted to envision what that
> > >> general
> > >> > > > >> protocol
> > >> > > > >> > > might
> > >> > > > >> > > > > > look like and how it could be used both by the
> > consumer
> > >> > and
> > >> > > > for
> > >> > > > >> > some
> > >> > > > >> > > of
> > >> > > > >> > > > > > these other cases.
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > Anyway, since time is running out on the new
> > consumer,
> > >> we
> > >> > > have
> > >> > > > >> > > perhaps
> > >> > > > >> > > > > one
> > >> > > > >> > > > > > last chance to consider a significant change in the
> > >> > protocol
> > >> > > > >> like
> > >> > > > >> > > this,
> > >> > > > >> > > > > so
> > >> > > > >> > > > > > have a look at the wiki and share your thoughts.
> I've
> > >> no
> > >> > > doubt
> > >> > > > >> that
> > >> > > > >> > > > some
> > >> > > > >> > > > > > ideas seem clearer in my mind than they do on
> paper,
> > so
> > >> > ask
> > >> > > > >> > questions
> > >> > > > >> > > > if
> > >> > > > >> > > > > > there is any confusion.
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > Thanks!
> > >> > > > >> > > > > > Jason
> > >> > > > >> > > > > >
> > >> > > > >> > > > >
> > >> > > > >> > > >
> > >> > > > >> > > >
> > >> > > > >> > > >
> > >> > > > >> > > > --
> > >> > > > >> > > > Thanks,
> > >> > > > >> > > > Ewen
> > >> > > > >> > > >
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >> > --
> > >> > > > >> > Thanks,
> > >> > > > >> > Ewen
> > >> > > > >> >
> > >> > > > >>
> > >> > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
> >
> >
> > --
> > Thanks,
> > Neha
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to