Jason,

The way this is handled today (in the old consumer) is by grouping several
rebalance events into one rebalance operation (similar to how we handle log
flushes using "group commit"). The idea is to batch several rebalance
events while one rebalance operation is in progress.

This is naturally handled on the consumer side since the consumer would
only periodically refresh its metadata. Any metadata changes that happened
in between would end up contributing to only a single rebalance operation.
There might be more rebalance operations in the new proposal as compared to
the server side assignment since in the presence of lots of metadata
changes, every consumer might have a slightly different view of the global
metadata.

My take is that the intersection of a large number of metadata operations
and a consumer with a large number of topic subscriptions is fairly low.
Though it will be great to run a quick performance test on this scenario to
see if we can convince ourselves of the impact.

Thanks
Neha

On Fri, Aug 14, 2015 at 6:34 PM, Jason Gustafson <ja...@confluent.io> wrote:

> I think metadata churn is an interesting problem and it would be nice if we
> had some approach to deal with it. I wonder if it would be sufficient to
> have a setting on the client (or the server if we use centralized
> assignment) which dampens the rate of rebalancing. Basically the effect
> would be to delay a rebalance from a metadata change until the group has
> been active for some amount of time. This would mean that the group would
> not respond as quickly to metadata changes, but that is probably preferable
> to the the death by churn scenario. It may make sense to have a similar
> setting to delay group membership from causing a rebalance.
>
> -Jason
>
> On Fri, Aug 14, 2015 at 3:01 PM, Neha Narkhede <n...@confluent.io> wrote:
>
> > >
> > > So if you're just subscribing to one or a couple of topics, why not
> just
> > > compute the hash by filtering out everything but the topics you are
> > > subscribed to? The problem there is if you ever add/remove
> subscriptions
> > > and want to support rolling upgrades. If the group was subscribed to
> > topic
> > > A, but later changes require subscribing to A + B, then to achieve a
> > > seamless rolling upgrade would require one (old) consumer to be
> > subscribing
> > > to A and one (new) consumer to be subscribing to A+B. If we computed
> > > metadata hashes based on filtered metadata, those two would disagree
> and
> > we
> > > could not perform assignment while the upgrade was in progress.
> >
> >
> > When I suggested including just the metadata hash, I didn't mean for all
> > topics in the cluster but just the topics the consumer wants to subscribe
> > to. I understand that it temporarily halts consumption while the topic
> > subscription changes but the question is whether we want to support
> > consumer groups with members that have different subscription
> preferences.
> > This came up during various discussions and the general consensus was
> that
> > we couldn't come up with compelling reasons to allow that. Upgrades are
> one
> > but then the question is how many upgrades would involve letting some
> > instances subscribe to a different set of topics for a long period of
> time
> > as against a simple rolling upgrade.
> >
> > I'm questioning the value of allowing the flexibility of variation in
> > consumer group topic subscriptions while trading off simplicity of the
> > consumer metadata format. The latter is important given the variety of
> > non-java consumer clients that need to exist and evolve with this change.
> >
> > Becket,
> >
> > If # of partitions or topic metadata changes, whether the coordinator
> > handles it or the consumer, there will be rebalance attempts until the
> > changes subside.
> >
> > Thanks,
> > Neha
> >
> > On Fri, Aug 14, 2015 at 12:57 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > wrote:
> >
> > > On Fri, Aug 14, 2015 at 10:59 AM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > wrote:
> > >
> > > > Neha and Ewen,
> > > >
> > > > About the metadata change frequency. I guess it really depends on how
> > > > frequent the metadata change might occur. If we run Kafka as a
> > service, I
> > > > can see that happens from time to time. As I can imagine people will
> > > create
> > > > some topic, test and maybe delete the topic in some automated test.
> If
> > > so,
> > > > the proposed protocol might be a little bit vulnerable.
> > > >
> > > > More specifically the scenario I am thinking is:
> > > > 1. Consumer 0 periodically refresh metadata and detected a metadata
> > > change.
> > > > So it sends a JoinGroupRequest with metadata_hash_0.
> > > > 2. Consumer 1 was notified by controller to start a rebalance, so it
> > > > refreshes its metadata and send a JoingGroupRequest with
> > metadata_hash_1,
> > > > which is different from metadata hash 0.
> > > > 3. Rebalance failed and both consumer refresh there metadata again
> from
> > > > different brokers.
> > > > 4. Depending on the metadata change frequency(or some admin operation
> > > like
> > > > partition reassigment), they may or may not have the same metadata
> > > > returned, so the restart from 3 again.
> > > >
> > > > I agree that step 4 might not be a big concern if consumers updates
> > > > metadata at almost the same time, but I'm a little bit worried
> whether
> > > that
> > > > assumption really stands because we do not have control over how
> > frequent
> > > > the metadata can change.
> > > >
> > > >
> > > Is this really that different from what would happen if the coordinator
> > > distributed the metadata to consumers? In that case you would trivially
> > > have everyone in a consistent state, but those metadata changes would
> > still
> > > cause churn and require JoinGroup rounds, during which processing is
> > > stalled for the nodes that are waiting on other members to re-join the
> > > group.
> > >
> > > -Ewen
> > >
> > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Aug 14, 2015 at 2:03 AM, Ewen Cheslack-Postava <
> > > e...@confluent.io>
> > > > wrote:
> > > >
> > > > > On Thu, Aug 13, 2015 at 11:07 PM, Neha Narkhede <n...@confluent.io
> >
> > > > wrote:
> > > > >
> > > > > > Becket,
> > > > > >
> > > > > > As you say, the metadata hash addresses the concern you
> originally
> > > > raised
> > > > > > about large topic subscriptions. Can you please list other
> problems
> > > you
> > > > > are
> > > > > > raising more clearly? It is more helpful to know problems that
> the
> > > > > proposal
> > > > > > does not address or addresses poorly.
> > > > > >
> > > > > > Regarding other things you said -
> > > > > >
> > > > > > it is required that each
> > > > > > > consumer refresh their metadata before sending a
> JoinGroupRequest
> > > > > > >
> > > > > >
> > > > > > This is required for wildcard topic subscriptions anyway. So this
> > > > > proposal
> > > > > > does not introduce a regression. We had agreed earlier that it
> does
> > > not
> > > > > > make sense for the server to deserialize regular expressions sent
> > by
> > > > the
> > > > > > consumer.
> > > > > >
> > > > >
> > > > > I don't think consumers need to do a metadata refresh before
> sending
> > a
> > > > > JoinGroupRequest. Metadata changes that affect assignment are rare
> --
> > > it
> > > > > requires changing the number of partitions in a topic. But you
> might
> > > > send a
> > > > > JoinGroupRequest simply because a new member is trying to join the
> > > group.
> > > > > That case is presumably much more common.
> > > > >
> > > > > I think it's actually a good idea to have the first JoinGroup cycle
> > > fail
> > > > in
> > > > > some cases, and has little impact. Lets say the metadata does
> change
> > > > > because partitions are added. Then we might fail in the first
> round,
> > > but
> > > > > then all members detect that issue *immediately*, refresh their
> > > metadata,
> > > > > and submit a new join group request. This second cycle does not
> > > require a
> > > > > full heartbeat cycle. It happens much more quickly because everyone
> > > > > detected the inconsistency based on the first JoinGroupResponse.
> The
> > > > > inconsistency should be resolved very quickly (barring other
> failures
> > > > like
> > > > > a member leaving mid-rebalance)
> > > >
> > > >
> > > >
> > > >
> > > > > >
> > > > > >
> > > > > > > the metadata might still be inconsistent if there is a topic or
> > > > > partition
> > > > > > > change because the
> > > > > > > UpdateMetadataRequest from controller might be handled at
> > different
> > > > > time.
> > > > > > >
> > > > > >
> > > > > > Topic metadata does not change frequently and even if it did, a
> > > couple
> > > > > > rebalance attempts will be needed whether the coordinator drives
> > the
> > > > > > assignments or the consumer. Because guess how the coordinator
> > knows
> > > > > about
> > > > > > the topic metadata changes -- indirectly through either a zk
> > callback
> > > > or
> > > > > > UpdateMetadataRequest, so it is completely possible the
> coordinator
> > > > sees
> > > > > > the topic metadata changes in batches, not all at once.
> > > > > >
> > > > >
> > > > > > On Thu, Aug 13, 2015 at 10:50 PM, Neha Narkhede <
> n...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Ewen/Jason,
> > > > > > >
> > > > > > > The metadata hash is a clever approach and certainly addresses
> > the
> > > > > > problem
> > > > > > > of large metadata for consumers like mirror maker. Few
> comments -
> > > > > > >
> > > > > > >
> > > > > > >    1. In the interest of simplifying the format of the
> consumer's
> > > > > > >    metadata - Why not just always include only the topic names
> in
> > > the
> > > > > > metadata
> > > > > > >    followed by the metadata hash? If the metadata hash check
> > > > succeeds,
> > > > > > each
> > > > > > >    consumer uses the # of partitions it had fetched. If it
> > fails, a
> > > > > > rebalance
> > > > > > >    happens and the metadata is not used anyway.
> > > > > >
> > > > >
> > > > > Doing this requires that every consumer always fetch the full
> > metadata.
> > > > The
> > > > > most common use case is consumers that just want to consume one or
> a
> > > > couple
> > > > > of topics, in which case grabbing all metadata for the entire
> cluster
> > > is
> > > > > wasteful. If I subscribe only to topic A, why make all consumers
> grab
> > > > > metadata for the entire topic (and need to rebalance every time it
> > > > > changes!). Including the # of partitions for each topic lets you
> > avoid
> > > > > having to grab the global set of metadata.
> > > > >
> > > > > So if you're just subscribing to one or a couple of topics, why not
> > > just
> > > > > compute the hash by filtering out everything but the topics you are
> > > > > subscribed to? The problem there is if you ever add/remove
> > > subscriptions
> > > > > and want to support rolling upgrades. If the group was subscribed
> to
> > > > topic
> > > > > A, but later changes require subscribing to A + B, then to achieve
> a
> > > > > seamless rolling upgrade would require one (old) consumer to be
> > > > subscribing
> > > > > to A and one (new) consumer to be subscribing to A+B. If we
> computed
> > > > > metadata hashes based on filtered metadata, those two would
> disagree
> > > and
> > > > we
> > > > > could not perform assignment while the upgrade was in progress.
> > > > >
> > > > > The solution is to differentiate between the cases when a very
> small
> > > > amount
> > > > > of the metadata is needed (one or a couple of topic subscriptions;
> > > > > communicate and share this via metadata in the JoinGroup protocol)
> vs
> > > > when
> > > > > *all* the metadata is needed (regex subscription; verify agreement
> > via
> > > > > hash).
> > > > >
> > > >
> > > > >
> > > > >
> > > > > > >    2. Do you need a topic list and topic regex to be separate?
> A
> > > > single
> > > > > > >    topic or list of topics can be expressed as a regex.
> > > > > >
> > > > >
> > > > > See above note about collecting all metadata when you really only
> > need
> > > it
> > > > > for 1 or 2 topics. There's probably some debate to be had about
> > whether
> > > > > this cost would be too high -- every consumer would need to request
> > the
> > > > > metadata for all topics, and they'd need to request that all every
> > time
> > > > > they might be out of date.
> > > > >
> > > > Are we going to allow consumers in the same group to subscribe to
> > > different
> > > > topic set? If we do, we need to let them refresh metadata for all the
> > > > topics a group is consuming from. If we don't then in the protocol we
> > > only
> > > > need a subscription set hash.
> > > >
> > > > >
> > > > >
> > > > > > >    3. Let's include a version explicitly at the beginning of
> the
> > > > > > >    ProtocolMetadata. The version dictates how to deserialize
> the
> > > > > > >    ProtocolMetadata blob and is consistent with the rest of
> > Kafka.
> > > > > >
> > > > >
> > > > > If I'm understanding correctly, in JoinGroupRequest I would change
> > > > >
> > > > > GroupProtocols          => [Protocol ProtocolMetadata]
> > > > >
> > > > > to
> > > > >
> > > > > GroupProtocols          => [Protocol ProtocolVersion
> > ProtocolMetadata]
> > > > >
> > > > > We had been talking about just baking the version into the Protocol
> > > > field,
> > > > > but making it separate seems perfectly reasonable to me. Jason, any
> > > issue
> > > > > with splitting the version out into a separate field like this?
> > > > >
> > > > > >
> > > > > > > That can simplify the metadata format to the following:
> > > > > > >
> > > > > > > GroupType => "consumer"
> > > > > > >>
> > > > > > >> Protocol => AssignmentStrategy
> > > > > > >>   AssignmentStrategy   => String
> > > > > > >>
> > > > > > >> ProtocolMetadata => Version Subscription
> > > AssignmentStrategyMetadata
> > > > > > >
> > > > > > >     Version                    => String
> > > > > > >
> > > > > > >   Subscription                 => TopicRegex MetadataHash
> > > > > > >>     TopicRegex                 => String
> > > > > > >>     MetadataHash               => String
> > > > > > >>   AssignmentStrategyMetadata   => bytes
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 13, 2015 at 6:28 PM, Jiangjie Qin
> > > > > <j...@linkedin.com.invalid
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Ewen and Jason,
> > > > > > >>
> > > > > > >> Thanks for the reply. Sorry I missed the metadata hash. Yes,
> > that
> > > > is a
> > > > > > >> clever approach and would solve my concern about the data
> > passing
> > > > > > around.
> > > > > > >> I
> > > > > > >> can see both pros and cons from doing this, though. The
> > advantage
> > > is
> > > > > we
> > > > > > >> don't need the topic metadata in JoinGroupResponse anymore.
> The
> > > > > downside
> > > > > > >> is
> > > > > > >> that now rebalance have extra dependency on the consensus of
> > > > metadata
> > > > > of
> > > > > > >> all consumers, which is obtained separately. So it is required
> > > that
> > > > > each
> > > > > > >> consumer refresh their metadata before sending a
> > JoinGroupRequest,
> > > > > > >> otherwise in some cases (e.g. wildcard consumers) will almost
> > > > > certainly
> > > > > > >> fail for the first rebalance attempt. Even if we do that,
> since
> > > the
> > > > > > >> consumers are getting metadata from different brokers, the
> > > metadata
> > > > > > might
> > > > > > >> still be inconsistent if there is a topic or partition change
> > > > because
> > > > > > the
> > > > > > >> UpdateMetadataRequest from controller might be handled at
> > > different
> > > > > > time.
> > > > > > >> Just want to make sure we think through the cases so the
> > protocol
> > > > does
> > > > > > not
> > > > > > >> cause us unexpected issues.
> > > > > > >>
> > > > > > >> About the number of consumers, I think with the current
> > liveliness
> > > > > > >> definition, we can tolerate churns by bumping up the session
> > > > timeout.
> > > > > > Also
> > > > > > >> I guess we will see an increasing number of consumers for new
> > > > > consumer,
> > > > > > >> because every the old consumer thread will probably become a
> > > > consumer.
> > > > > > >>
> > > > > > >> It is a valid concern for consumers that have large
> subscription
> > > > set.
> > > > > > This
> > > > > > >> might not be avoided though for client side assignment
> approach.
> > > One
> > > > > > >> solution is having topic names associate with a topic ID. And
> > only
> > > > use
> > > > > > >> topic ID in JoinGroupRequest and JoinGroupResponse, There is a
> > > > > > discussion
> > > > > > >> thread about this to solve the topic renaming case but this
> is a
> > > > > > >> completely
> > > > > > >> different discussion.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> Jiangjie (Becket) Qin
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Aug 13, 2015 at 2:14 PM, Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Thanks Jiangjie, that information helps. I agree the
> protocol
> > > must
> > > > > > >> consider
> > > > > > >> > scalability. My point was that the synchronization barrier
> in
> > > the
> > > > > > >> current
> > > > > > >> > protocol already effectively limits the number of consumers
> > > since
> > > > it
> > > > > > >> > provides no way to gracefully handle churn. It wouldn't be
> > worth
> > > > > > >> worrying
> > > > > > >> > about scaling up to 100,000 members, for example, because
> > > there's
> > > > no
> > > > > > way
> > > > > > >> > the group would be stable. So we just need to set some clear
> > > > > > >> expectations
> > > > > > >> > on the size we can scale to, and that can help inform the
> > > > discussion
> > > > > > on
> > > > > > >> the
> > > > > > >> > size of messages in this protocol.
> > > > > > >> >
> > > > > > >> > Ewen and I were discussing this morning along similar lines
> to
> > > > what
> > > > > > >> you're
> > > > > > >> > suggesting. However, even if the coordinator decides on the
> > > > metadata
> > > > > > for
> > > > > > >> > the group, each member still needs to communicate its
> > > > subscriptions
> > > > > to
> > > > > > >> the
> > > > > > >> > rest of the group. This is nice for the regex case since the
> > > regex
> > > > > is
> > > > > > >> > probably small, but if the members have a large topic list,
> > then
> > > > we
> > > > > > have
> > > > > > >> > the same problem. One thing I was thinking about was whether
> > we
> > > > > really
> > > > > > >> need
> > > > > > >> > to handle different subscriptions for every member. If the
> > > > > coordinator
> > > > > > >> > could guarantee that all members had the same subscription,
> > then
> > > > > there
> > > > > > >> > would be no need for the coordinator to return the
> > subscriptions
> > > > for
> > > > > > >> each
> > > > > > >> > member. However, this would prevent graceful upgrades. We
> > might
> > > be
> > > > > > able
> > > > > > >> to
> > > > > > >> > fix that problem by allowing the consumer to provide two
> > > > > subscriptions
> > > > > > >> to
> > > > > > >> > allowing rolling updates, but that starts to sound pretty
> > nasty.
> > > > > > >> >
> > > > > > >> > -Jason
> > > > > > >> >
> > > > > > >> > On Thu, Aug 13, 2015 at 1:41 PM, Jiangjie Qin
> > > > > > <j...@linkedin.com.invalid
> > > > > > >> >
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > Jason,
> > > > > > >> > >
> > > > > > >> > > The protocol has to consider the scalability. The protocol
> > in
> > > > the
> > > > > > wiki
> > > > > > >> > > means the JoinGroupResoponse size would be:
> > > > > > >> > > NumberOfTopics * (AvgTopicNameLength + 4) *
> > > > (NumberOfConsumers)^2
> > > > > > >> > >
> > > > > > >> > > To give some real number, we have 26-node Mirror Maker
> > > cluster,
> > > > > each
> > > > > > >> > with 4
> > > > > > >> > > consumers. That is 104 consumers using regex ".*". And
> most
> > of
> > > > our
> > > > > > >> > clusters
> > > > > > >> > > have around 3000 topics, whose topic name are typically
> > around
> > > > 20
> > > > > > >> > > characters.
> > > > > > >> > >
> > > > > > >> > > I think the key issue for client side partition assignment
> > > logic
> > > > > is
> > > > > > to
> > > > > > >> > make
> > > > > > >> > > sure 1) all the clients run the same algorithm. 2) all the
> > > > clients
> > > > > > >> make
> > > > > > >> > > decision on the same topic metadata. The second purpose
> can
> > be
> > > > > done
> > > > > > by
> > > > > > >> > > simply letting coordinator provide the topic metadata and
> > all
> > > > then
> > > > > > >> member
> > > > > > >> > > information as source of truth. Is it necessary to pass
> > topic
> > > > > > >> metadata of
> > > > > > >> > > each consumer around? Can we keep the protocol metadata
> > field
> > > > > > >> completely
> > > > > > >> > > independent of topic metadata? I think In the
> > > JoinGroupResponse,
> > > > > we
> > > > > > >> > should
> > > > > > >> > > have only one copy of topic metadata provided by
> coordinator
> > > and
> > > > > is
> > > > > > >> > outside
> > > > > > >> > > of protocol metadata. If user decides to put some metadata
> > in
> > > > the
> > > > > > >> > > JoinGroupRequest and let coordinator pass around, they are
> > > > > > responsible
> > > > > > >> > for
> > > > > > >> > > understanding the risk.
> > > > > > >> > >
> > > > > > >> > > Thanks,
> > > > > > >> > >
> > > > > > >> > > Jiangjie (Becket) Qin
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Aug 13, 2015 at 12:41 PM, Jason Gustafson <
> > > > > > ja...@confluent.io
> > > > > > >> >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hey Onur and Jiangjie,
> > > > > > >> > > >
> > > > > > >> > > > I've updated that wiki with a proposal to add regex
> > > > > subscriptions
> > > > > > to
> > > > > > >> > the
> > > > > > >> > > > consumer metadata. Can you have a look to see if it
> > > addresses
> > > > > your
> > > > > > >> > > concern?
> > > > > > >> > > > In general, I think we should be a little careful when
> we
> > > are
> > > > > > >> talking
> > > > > > >> > > about
> > > > > > >> > > > the scalability of the protocol. Regardless of whether
> > > > > assignment
> > > > > > is
> > > > > > >> > done
> > > > > > >> > > > on the server or the client, the protocol assumes a
> > > relatively
> > > > > > >> stable
> > > > > > >> > > > configuration. When the number of consumers increases
> > > beyond a
> > > > > > >> certain
> > > > > > >> > > > limit, then membership churn becomes a major concern.
> > > > Similarly
> > > > > > >> there
> > > > > > >> > is
> > > > > > >> > > a
> > > > > > >> > > > notion of metadata churn when topics are added, deleted,
> > or
> > > > > > >> resized. If
> > > > > > >> > > > either membership or metadata changes, then the protocol
> > > > forces
> > > > > > all
> > > > > > >> > > > consumers to stop consumption and rejoin the group. If
> > this
> > > > > > happens
> > > > > > >> > often
> > > > > > >> > > > enough, then it can severely impact the ability of the
> > > > consumer
> > > > > to
> > > > > > >> make
> > > > > > >> > > > progress. The point is that the protocol may already be
> > > > unsuited
> > > > > > to
> > > > > > >> > cases
> > > > > > >> > > > where there are either a large number of consumers or
> > > topics.
> > > > I
> > > > > > >> wonder
> > > > > > >> > if
> > > > > > >> > > > you guys can share your thoughts about your scaling
> > > > > expectations?
> > > > > > >> > > >
> > > > > > >> > > > -Jason
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Aug 12, 2015 at 12:28 PM, Jason Gustafson <
> > > > > > >> ja...@confluent.io>
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hey Jiangjie,
> > > > > > >> > > > >
> > > > > > >> > > > > That's a great point. In the worst case (the mirror
> > maker
> > > > > case I
> > > > > > >> > > guess),
> > > > > > >> > > > > the join group response can be massive. This would be
> > > > > especially
> > > > > > >> > deadly
> > > > > > >> > > > > when there is a lot of churn in the group (e.g. in a
> > > rolling
> > > > > > >> > upgrade).
> > > > > > >> > > > The
> > > > > > >> > > > > current protocol is not great for this case either,
> but
> > > it's
> > > > > > >> > > > significantly
> > > > > > >> > > > > better. Here are a couple ways to deal with the size:
> > > > > > >> > > > >
> > > > > > >> > > > > 1. First, we could have the coordinator compress the
> > > > > responses.
> > > > > > >> This
> > > > > > >> > > > would
> > > > > > >> > > > > probably be pretty effective if applied across the
> > > metadata
> > > > > from
> > > > > > >> all
> > > > > > >> > > > > members.
> > > > > > >> > > > >
> > > > > > >> > > > > 2. I think the regex case is the main problem. Is that
> > > > right?
> > > > > We
> > > > > > >> > could
> > > > > > >> > > > > extend the metadata to allow the consumer to embed its
> > > regex
> > > > > > >> > > subscription
> > > > > > >> > > > > in the metadata directly (note this might be a good
> idea
> > > > > > >> regardless
> > > > > > >> > of
> > > > > > >> > > > the
> > > > > > >> > > > > rest of this proposal). To support regex on the
> > consumer,
> > > we
> > > > > > must
> > > > > > >> > fetch
> > > > > > >> > > > > metadata for all topics. Rather than having all regex
> > > > > > subscribers
> > > > > > >> > embed
> > > > > > >> > > > all
> > > > > > >> > > > > of this metadata in their join group requests, they
> > could
> > > > > > instead
> > > > > > >> > > embed a
> > > > > > >> > > > > hash of it. Then after the join group responses are
> > > > received,
> > > > > > they
> > > > > > >> > just
> > > > > > >> > > > > need to check that the hashes are the same. If there
> is
> > a
> > > > > > mismatch
> > > > > > >> > > (which
> > > > > > >> > > > > should only occur when topics are created, deleted, or
> > > > > resized),
> > > > > > >> then
> > > > > > >> > > the
> > > > > > >> > > > > group members must refetch the metadata and rejoin the
> > > > group.
> > > > > > >> This is
> > > > > > >> > > > also
> > > > > > >> > > > > how the current protocol behaves when there is a
> change
> > in
> > > > the
> > > > > > >> topic
> > > > > > >> > > > > metadata affecting the group--someone (either the
> > > > coordinator
> > > > > or
> > > > > > >> the
> > > > > > >> > > > > consumer) detects the change and forces the group to
> > > > > rebalance.
> > > > > > >> > > > >
> > > > > > >> > > > > What do you think?
> > > > > > >> > > > >
> > > > > > >> > > > > (Also I think adding groupId/generationId to fetch and
> > > > produce
> > > > > > >> > requests
> > > > > > >> > > > > seems like an interesting line of thought.)
> > > > > > >> > > > >
> > > > > > >> > > > > -Jason
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Wed, Aug 12, 2015 at 10:57 AM, Jiangjie Qin
> > > > > > >> > > <j...@linkedin.com.invalid
> > > > > > >> > > > >
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > >> Hey Ewen,
> > > > > > >> > > > >>
> > > > > > >> > > > >> Onur and I discussed this a little bit more. And we
> are
> > > > still
> > > > > > >> > worrying
> > > > > > >> > > > >> about passing all the metadata of all consumers
> around.
> > > > > > >> > > > >>
> > > > > > >> > > > >> Let's say I have a cluster has 10,000 topics, the
> > average
> > > > > topic
> > > > > > >> name
> > > > > > >> > > > >> length
> > > > > > >> > > > >> is 10 bytes. In this case, the opaque metadata will
> > have
> > > > 10 *
> > > > > > >> > 10,000 =
> > > > > > >> > > > >> 100KB for topic name, for each topic, there is a
> 4-byte
> > > > > integer
> > > > > > >> of
> > > > > > >> > > > number
> > > > > > >> > > > >> of partitions, that's another 40KB. So one global
> topic
> > > > > > metadata
> > > > > > >> > will
> > > > > > >> > > > have
> > > > > > >> > > > >> 140KB data. If I have 100 consumers who are using
> > > wildcard
> > > > to
> > > > > > >> > consume
> > > > > > >> > > > from
> > > > > > >> > > > >> all the topics. That means the protocol metadata end
> up
> > > in
> > > > > the
> > > > > > >> > > > >> JoinGroupResponse will be 140KB * 100 = 14MB. And the
> > > > > > >> > > JoinGroupResponse
> > > > > > >> > > > >> will need to be sent to 100 different consumers, that
> > > means
> > > > > > 14MB
> > > > > > >> *
> > > > > > >> > > 100 =
> > > > > > >> > > > >> 1.4GB need to be sent by the consumer coordinator for
> > one
> > > > > > >> rebalance.
> > > > > > >> > > How
> > > > > > >> > > > >> would that work?
> > > > > > >> > > > >>
> > > > > > >> > > > >> Also, having two consumers (old owner and new owner)
> > > > > consuming
> > > > > > >> from
> > > > > > >> > > the
> > > > > > >> > > > >> same partition might also be a problem. e.g. people
> are
> > > > > > updating
> > > > > > >> > > > database.
> > > > > > >> > > > >> One thing might worth doing is to add GroupId and
> > > > Generation
> > > > > ID
> > > > > > >> to
> > > > > > >> > > > >> ProducerRequest and FetchRequest as well. This will
> > also
> > > > help
> > > > > > >> with
> > > > > > >> > the
> > > > > > >> > > > >> single producer use case. However, this is probably
> > > > > orthogonal
> > > > > > to
> > > > > > >> > this
> > > > > > >> > > > >> thread given the current new consumer also has this
> > > problem
> > > > > > and I
> > > > > > >> > > > believe
> > > > > > >> > > > >> we need to fix it.
> > > > > > >> > > > >>
> > > > > > >> > > > >> Thanks,
> > > > > > >> > > > >>
> > > > > > >> > > > >> Jiangjie (Becket) Qin
> > > > > > >> > > > >>
> > > > > > >> > > > >> On Tue, Aug 11, 2015 at 11:43 PM, Ewen
> > Cheslack-Postava <
> > > > > > >> > > > >> e...@confluent.io>
> > > > > > >> > > > >> wrote:
> > > > > > >> > > > >>
> > > > > > >> > > > >> > On Tue, Aug 11, 2015 at 11:29 PM, Jiangjie Qin
> > > > > > >> > > > >> <j...@linkedin.com.invalid>
> > > > > > >> > > > >> > wrote:
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > > Ewen,
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > Thanks for the explanation.
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > For (1), I am more concerned about the failure
> case
> > > > > instead
> > > > > > >> of
> > > > > > >> > > > normal
> > > > > > >> > > > >> > case.
> > > > > > >> > > > >> > > What if a consumer somehow was kick out of a
> group
> > > but
> > > > is
> > > > > > >> still
> > > > > > >> > > > >> consuming
> > > > > > >> > > > >> > > and committing offsets? Does that mean the new
> > owner
> > > > and
> > > > > > old
> > > > > > >> > owner
> > > > > > >> > > > >> might
> > > > > > >> > > > >> > > potentially consuming from and committing offsets
> > for
> > > > the
> > > > > > >> same
> > > > > > >> > > > >> partition?
> > > > > > >> > > > >> > > In the old consumer, this won't happen because
> the
> > > new
> > > > > > >> consumer
> > > > > > >> > > will
> > > > > > >> > > > >> not
> > > > > > >> > > > >> > be
> > > > > > >> > > > >> > > able to start consumption unless the previous
> owner
> > > has
> > > > > > >> released
> > > > > > >> > > its
> > > > > > >> > > > >> > > ownership. Basically, without the ownership
> > > guarantee,
> > > > I
> > > > > > >> don't
> > > > > > >> > see
> > > > > > >> > > > how
> > > > > > >> > > > >> > the
> > > > > > >> > > > >> > > communication among consumers themselves alone
> can
> > > > solve
> > > > > > the
> > > > > > >> > > problem
> > > > > > >> > > > >> > here.
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > The generation ID check still applies to offset
> > > commits.
> > > > If
> > > > > > >> one of
> > > > > > >> > > the
> > > > > > >> > > > >> > consumers is kicked out and misbehaving, it can
> > > obviously
> > > > > > still
> > > > > > >> > > fetch
> > > > > > >> > > > >> and
> > > > > > >> > > > >> > process messages, but offset commits will not work
> > > since
> > > > it
> > > > > > >> will
> > > > > > >> > not
> > > > > > >> > > > >> have
> > > > > > >> > > > >> > the current generation ID.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > For (2) and (3), now I understand how metadata
> are
> > > > used.
> > > > > > But
> > > > > > >> I
> > > > > > >> > > still
> > > > > > >> > > > >> > don't
> > > > > > >> > > > >> > > see why should we let the consumers to pass the
> > topic
> > > > > > >> > information
> > > > > > >> > > > >> across
> > > > > > >> > > > >> > > instead of letting coordinator give the
> > information.
> > > > The
> > > > > > >> single
> > > > > > >> > > > >> producer
> > > > > > >> > > > >> > > use case does not solve the ownership problem in
> > > > abnormal
> > > > > > >> case
> > > > > > >> > > > either,
> > > > > > >> > > > >> > > which seems to be a little bit vulnerable.
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > One of the goals here was to generalize group
> > > membership
> > > > so
> > > > > > we
> > > > > > >> > can,
> > > > > > >> > > > for
> > > > > > >> > > > >> > example, use it for balancing Copycat tasks across
> > > > workers.
> > > > > > >> > There's
> > > > > > >> > > no
> > > > > > >> > > > >> > topic subscription info in that case. The metadata
> > for
> > > > > > copycat
> > > > > > >> > > workers
> > > > > > >> > > > >> > would instead need to somehow indicate the current
> > set
> > > of
> > > > > > tasks
> > > > > > >> > that
> > > > > > >> > > > >> need
> > > > > > >> > > > >> > to be assigned to workers. By making the metadata
> > > > > completely
> > > > > > >> > opaque
> > > > > > >> > > to
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > protocol, it becomes more generally useful since it
> > > > focuses
> > > > > > >> > squarely
> > > > > > >> > > > on
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > group membership problem, allowing for that
> > additional
> > > > bit
> > > > > of
> > > > > > >> > > metadata
> > > > > > >> > > > >> so
> > > > > > >> > > > >> > you don't just get a list of members, but also get
> a
> > > > little
> > > > > > >> bit of
> > > > > > >> > > > info
> > > > > > >> > > > >> > about each of them.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > A different option that we explored is to use a
> sort
> > of
> > > > > mixed
> > > > > > >> > model
> > > > > > >> > > --
> > > > > > >> > > > >> > still bake all the topic subscriptions directly
> into
> > > the
> > > > > > >> protocol
> > > > > > >> > > but
> > > > > > >> > > > >> also
> > > > > > >> > > > >> > include metadata. That would allow us to maintain
> the
> > > > > > existing
> > > > > > >> > > > >> > coordinator-driven approach to handling the
> metadata
> > > and
> > > > > > change
> > > > > > >> > > events
> > > > > > >> > > > >> like
> > > > > > >> > > > >> > the ones Onur pointed out. Then something like the
> > > > Copycat
> > > > > > >> workers
> > > > > > >> > > > would
> > > > > > >> > > > >> > just not fill in any topic subscriptions and it
> would
> > > be
> > > > > > >> handled
> > > > > > >> > as
> > > > > > >> > > a
> > > > > > >> > > > >> > degenerate case. Based on the way I explained that
> we
> > > can
> > > > > > >> handle
> > > > > > >> > > those
> > > > > > >> > > > >> > types of events, I personally feel its cleaner and
> a
> > > > nicer
> > > > > > >> > > > >> generalization
> > > > > > >> > > > >> > to not include the subscriptions in the join group
> > > > > protocol,
> > > > > > >> > making
> > > > > > >> > > it
> > > > > > >> > > > >> part
> > > > > > >> > > > >> > of the metadata instead.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > For the single producer case, are you saying it
> > doesn't
> > > > > solve
> > > > > > >> > > > ownership
> > > > > > >> > > > >> in
> > > > > > >> > > > >> > the abnormal case because a producer that doesn't
> > know
> > > it
> > > > > has
> > > > > > >> been
> > > > > > >> > > > >> kicked
> > > > > > >> > > > >> > out of the group yet can still produce data even
> > though
> > > > it
> > > > > > >> > shouldn't
> > > > > > >> > > > be
> > > > > > >> > > > >> > able to anymore? I definitely agree that that is a
> > risk
> > > > --
> > > > > > this
> > > > > > >> > > > >> provides a
> > > > > > >> > > > >> > way to get closer to a true single-writer, but
> there
> > > are
> > > > > > >> > definitely
> > > > > > >> > > > >> still
> > > > > > >> > > > >> > failure modes that this does not address.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > -Ewen
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > Thanks,
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > Jiangjie (Becket) Qin
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > On Tue, Aug 11, 2015 at 11:06 PM, Ewen
> > > > Cheslack-Postava <
> > > > > > >> > > > >> > e...@confluent.io
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > wrote:
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > > On Tue, Aug 11, 2015 at 10:15 PM, Jiangjie Qin
> > > > > > >> > > > >> > <j...@linkedin.com.invalid
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > wrote:
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > > Hi Jason,
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > Thanks for writing this up. It would be
> useful
> > to
> > > > > > >> generalize
> > > > > > >> > > the
> > > > > > >> > > > >> > group
> > > > > > >> > > > >> > > > > concept. I have a few questions below.
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > 1. In old consumer actually the partition
> > > > assignment
> > > > > > are
> > > > > > >> > done
> > > > > > >> > > by
> > > > > > >> > > > >> > > > consumers
> > > > > > >> > > > >> > > > > themselves. We used zookeeper to guarantee
> > that a
> > > > > > >> partition
> > > > > > >> > > will
> > > > > > >> > > > >> only
> > > > > > >> > > > >> > > be
> > > > > > >> > > > >> > > > > consumed by one consumer thread who
> > successfully
> > > > > > claimed
> > > > > > >> its
> > > > > > >> > > > >> > ownership.
> > > > > > >> > > > >> > > > > Does the new protocol plan to provide the
> same
> > > > > > guarantee?
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Once you have all the metadata from all the
> > > > consumers,
> > > > > > >> > > assignment
> > > > > > >> > > > >> > should
> > > > > > >> > > > >> > > > just be a simple function mapping that
> > > > Map<ConsumerId,
> > > > > > >> > Metadata>
> > > > > > >> > > > to
> > > > > > >> > > > >> > > > Map<ConsumerId, List<TopicPartition>>. If
> > everyone
> > > is
> > > > > > >> > consistent
> > > > > > >> > > > in
> > > > > > >> > > > >> > > > computing that, you don't need ZK involved at
> > all.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > In practice, this shouldn't be that hard to
> > ensure
> > > > for
> > > > > > most
> > > > > > >> > > > >> assignment
> > > > > > >> > > > >> > > > strategies just by having decent unit testing
> on
> > > > them.
> > > > > > You
> > > > > > >> > just
> > > > > > >> > > > >> have to
> > > > > > >> > > > >> > > do
> > > > > > >> > > > >> > > > things like ensure your assignment strategy
> sorts
> > > > lists
> > > > > > >> into a
> > > > > > >> > > > >> > consistent
> > > > > > >> > > > >> > > > order.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > You do give up the ability to use some
> techniques
> > > > (e.g.
> > > > > > any
> > > > > > >> > > > >> randomized
> > > > > > >> > > > >> > > > algorithm if you can't distribute the seed w/
> the
> > > > > > metadata)
> > > > > > >> > and
> > > > > > >> > > > it's
> > > > > > >> > > > >> > true
> > > > > > >> > > > >> > > > that nothing validates the assignment, but if
> > that
> > > > > > >> assignment
> > > > > > >> > > > >> algorithm
> > > > > > >> > > > >> > > > step is kept simple, small, and well tested,
> the
> > > risk
> > > > > is
> > > > > > >> very
> > > > > > >> > > > >> minimal.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > 2. It looks that both JoinGroupRequest and
> > > > > > >> JoinGroupResponse
> > > > > > >> > > has
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > > ProtocolMetadata.AssignmentStrategyMetadata,
> > what
> > > > > would
> > > > > > >> be
> > > > > > >> > the
> > > > > > >> > > > >> > metadata
> > > > > > >> > > > >> > > > be
> > > > > > >> > > > >> > > > > sent and returned by coordinator? How will
> the
> > > > > > >> coordinator
> > > > > > >> > > > handle
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > > metadata?
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > The coordinator is basically just blindly
> > > > broadcasting
> > > > > > all
> > > > > > >> of
> > > > > > >> > it
> > > > > > >> > > > to
> > > > > > >> > > > >> > group
> > > > > > >> > > > >> > > > members so they have a consistent view.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > So from the coordinators perspective, it sees
> > > > something
> > > > > > >> like:
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Consumer 1 -> JoinGroupRequest with
> > GroupProtocols
> > > =
> > > > [
> > > > > > >> > > "consumer"
> > > > > > >> > > > >> > > > <Consumer1 opaque byte[]>]
> > > > > > >> > > > >> > > > Consumer 2 -> JoinGroupRequest with
> > GroupProtocols
> > > =
> > > > [
> > > > > > >> > > "consumer"
> > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Then, in the responses would look like:
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Consumer 1 <- JoinGroupResponse with
> > GroupProtocol
> > > =
> > > > > > >> > "consumer"
> > > > > > >> > > > and
> > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque
> > > > byte[]>,
> > > > > > >> > Consumer
> > > > > > >> > > 2
> > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > >> > > > >> > > > Consumer 2 <- JoinGroupResponse with
> > GroupProtocol
> > > =
> > > > > > >> > "consumer"
> > > > > > >> > > > and
> > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque
> > > > byte[]>,
> > > > > > >> > Consumer
> > > > > > >> > > 2
> > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > So all the responses include all the metadata
> for
> > > > every
> > > > > > >> member
> > > > > > >> > > in
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > group, and everyone can use that to
> consistently
> > > > decide
> > > > > > on
> > > > > > >> > > > >> assignment.
> > > > > > >> > > > >> > > The
> > > > > > >> > > > >> > > > broker doesn't care and cannot even understand
> > the
> > > > > > metadata
> > > > > > >> > > since
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > data
> > > > > > >> > > > >> > > > format for it is dependent on the assignment
> > > strategy
> > > > > > being
> > > > > > >> > > used.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > As another example that is *not* a consumer,
> > let's
> > > > say
> > > > > > you
> > > > > > >> > just
> > > > > > >> > > > >> want to
> > > > > > >> > > > >> > > > have a single writer in the group which
> everyone
> > > will
> > > > > > >> forward
> > > > > > >> > > > >> requests
> > > > > > >> > > > >> > > to.
> > > > > > >> > > > >> > > > To accomplish this, you could use a very dumb
> > > > > assignment
> > > > > > >> > > strategy:
> > > > > > >> > > > >> > there
> > > > > > >> > > > >> > > is
> > > > > > >> > > > >> > > > no metadata (empty byte[]) and all we care
> about
> > is
> > > > who
> > > > > > is
> > > > > > >> the
> > > > > > >> > > > first
> > > > > > >> > > > >> > > member
> > > > > > >> > > > >> > > > in the group (e.g. when IDs are sorted
> > > > > > lexicographically).
> > > > > > >> > That
> > > > > > >> > > > >> member
> > > > > > >> > > > >> > is
> > > > > > >> > > > >> > > > selected as the writer. In that case, we
> actually
> > > > just
> > > > > > care
> > > > > > >> > > about
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > membership list, there's no additional info
> about
> > > > each
> > > > > > >> member
> > > > > > >> > > that
> > > > > > >> > > > >> is
> > > > > > >> > > > >> > > > required to determine who is the writer.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > > 3. Do you mean that the number of partitions
> in
> > > > > > >> > > > JoinGroupResponse
> > > > > > >> > > > >> > will
> > > > > > >> > > > >> > > be
> > > > > > >> > > > >> > > > > the max partition number of a topic among all
> > the
> > > > > > >> reported
> > > > > > >> > > > >> partition
> > > > > > >> > > > >> > > > number
> > > > > > >> > > > >> > > > > by consumers? Is there any reason not just
> let
> > > > > > >> Coordinator
> > > > > > >> > to
> > > > > > >> > > > >> return
> > > > > > >> > > > >> > > the
> > > > > > >> > > > >> > > > > number of partitions of a topic in its
> metadata
> > > > > cache?
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Nothing from the embedded protocol is touched
> by
> > > the
> > > > > > >> broker.
> > > > > > >> > The
> > > > > > >> > > > >> broker
> > > > > > >> > > > >> > > > just collects opaque bytes of metadata, does
> the
> > > > > > selection
> > > > > > >> of
> > > > > > >> > > the
> > > > > > >> > > > >> > > strategy
> > > > > > >> > > > >> > > > if multiple are supported by some consumers,
> and
> > > then
> > > > > > >> returns
> > > > > > >> > > that
> > > > > > >> > > > >> > opaque
> > > > > > >> > > > >> > > > metadata for all the members back to every
> > member.
> > > In
> > > > > > that
> > > > > > >> way
> > > > > > >> > > > they
> > > > > > >> > > > >> all
> > > > > > >> > > > >> > > > have a consistent view of the group. For
> regular
> > > > > > consumers,
> > > > > > >> > that
> > > > > > >> > > > >> view
> > > > > > >> > > > >> > of
> > > > > > >> > > > >> > > > the group includes information about how many
> > > > > partitions
> > > > > > >> each
> > > > > > >> > > > >> consumer
> > > > > > >> > > > >> > > > currently thinks the topics it is subscribed to
> > > has.
> > > > > > These
> > > > > > >> > could
> > > > > > >> > > > be
> > > > > > >> > > > >> > > > inconsistent due to out of date metadata and it
> > > would
> > > > > be
> > > > > > >> up to
> > > > > > >> > > the
> > > > > > >> > > > >> > > > assignment strategy on the *client* to resolve
> > > that.
> > > > As
> > > > > > you
> > > > > > >> > > point
> > > > > > >> > > > >> out,
> > > > > > >> > > > >> > in
> > > > > > >> > > > >> > > > that case they could just take the max value
> that
> > > any
> > > > > > >> consumer
> > > > > > >> > > > >> reported
> > > > > > >> > > > >> > > > seeing and use that. The consumers that notice
> > that
> > > > > their
> > > > > > >> > > metadata
> > > > > > >> > > > >> had
> > > > > > >> > > > >> > a
> > > > > > >> > > > >> > > > smaller # of partitions should also trigger a
> > > > metadata
> > > > > > >> update
> > > > > > >> > > when
> > > > > > >> > > > >> they
> > > > > > >> > > > >> > > see
> > > > > > >> > > > >> > > > someone else observing a larger # of
> partitions.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > Thanks,
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > Jiangjie (Becket) Qin
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason
> > Gustafson
> > > <
> > > > > > >> > > > >> ja...@confluent.io
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > > > wrote:
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > > Hi Kafka Devs,
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > One of the nagging issues in the current
> > design
> > > > of
> > > > > > the
> > > > > > >> new
> > > > > > >> > > > >> consumer
> > > > > > >> > > > >> > > has
> > > > > > >> > > > >> > > > > > been the need to support a variety of
> > > assignment
> > > > > > >> > strategies.
> > > > > > >> > > > >> We've
> > > > > > >> > > > >> > > > > > encountered this in particular in the
> design
> > of
> > > > > > copycat
> > > > > > >> > and
> > > > > > >> > > > the
> > > > > > >> > > > >> > > > > processing
> > > > > > >> > > > >> > > > > > framework (KIP-28). From what I understand,
> > > Samza
> > > > > > also
> > > > > > >> > has a
> > > > > > >> > > > >> number
> > > > > > >> > > > >> > > of
> > > > > > >> > > > >> > > > > use
> > > > > > >> > > > >> > > > > > cases with custom assignment needs. The new
> > > > > consumer
> > > > > > >> > > protocol
> > > > > > >> > > > >> > > supports
> > > > > > >> > > > >> > > > > new
> > > > > > >> > > > >> > > > > > assignment strategies by hooking them into
> > the
> > > > > > broker.
> > > > > > >> For
> > > > > > >> > > > many
> > > > > > >> > > > >> > > > > > environments, this is a major pain and in
> > some
> > > > > > cases, a
> > > > > > >> > > > >> > non-starter.
> > > > > > >> > > > >> > > It
> > > > > > >> > > > >> > > > > > also challenges the validation that the
> > > > coordinator
> > > > > > can
> > > > > > >> > > > provide.
> > > > > > >> > > > >> > For
> > > > > > >> > > > >> > > > > > example, some assignment strategies call
> for
> > > > > > >> partitions to
> > > > > > >> > > be
> > > > > > >> > > > >> > > assigned
> > > > > > >> > > > >> > > > > > multiple times, which means that the
> > > coordinator
> > > > > can
> > > > > > >> only
> > > > > > >> > > > check
> > > > > > >> > > > >> > that
> > > > > > >> > > > >> > > > > > partitions have been assigned at least
> once.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > To solve these issues, we'd like to propose
> > > > moving
> > > > > > >> > > assignment
> > > > > > >> > > > to
> > > > > > >> > > > >> > the
> > > > > > >> > > > >> > > > > > client. I've written a wiki which outlines
> > some
> > > > > > >> protocol
> > > > > > >> > > > >> changes to
> > > > > > >> > > > >> > > > > achieve
> > > > > > >> > > > >> > > > > > this:
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >>
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > > > > > >> > > > >> > > > > > .
> > > > > > >> > > > >> > > > > > To summarize briefly, instead of the
> > > coordinator
> > > > > > >> assigning
> > > > > > >> > > the
> > > > > > >> > > > >> > > > partitions
> > > > > > >> > > > >> > > > > > itself, all subscriptions are forwarded to
> > each
> > > > > > member
> > > > > > >> of
> > > > > > >> > > the
> > > > > > >> > > > >> group
> > > > > > >> > > > >> > > > which
> > > > > > >> > > > >> > > > > > then decides independently which partitions
> > it
> > > > > should
> > > > > > >> > > consume.
> > > > > > >> > > > >> The
> > > > > > >> > > > >> > > > > protocol
> > > > > > >> > > > >> > > > > > provides a mechanism for the coordinator to
> > > > > validate
> > > > > > >> that
> > > > > > >> > > all
> > > > > > >> > > > >> > > consumers
> > > > > > >> > > > >> > > > > use
> > > > > > >> > > > >> > > > > > the same assignment strategy, but it does
> not
> > > > > ensure
> > > > > > >> that
> > > > > > >> > > the
> > > > > > >> > > > >> > > resulting
> > > > > > >> > > > >> > > > > > assignment is "correct." This provides a
> > > powerful
> > > > > > >> > capability
> > > > > > >> > > > for
> > > > > > >> > > > >> > > users
> > > > > > >> > > > >> > > > to
> > > > > > >> > > > >> > > > > > control the full data flow on the client
> > side.
> > > > They
> > > > > > >> > control
> > > > > > >> > > > how
> > > > > > >> > > > >> > data
> > > > > > >> > > > >> > > is
> > > > > > >> > > > >> > > > > > written to partitions through the
> Partitioner
> > > > > > interface
> > > > > > >> > and
> > > > > > >> > > > they
> > > > > > >> > > > >> > > > control
> > > > > > >> > > > >> > > > > > how data is consumed through the assignment
> > > > > strategy,
> > > > > > >> all
> > > > > > >> > > > >> without
> > > > > > >> > > > >> > > > > touching
> > > > > > >> > > > >> > > > > > the server.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > Of course nothing comes for free. In
> > > particular,
> > > > > this
> > > > > > >> > change
> > > > > > >> > > > >> > removes
> > > > > > >> > > > >> > > > the
> > > > > > >> > > > >> > > > > > ability of the coordinator to validate that
> > > > commits
> > > > > > are
> > > > > > >> > made
> > > > > > >> > > > by
> > > > > > >> > > > >> > > > consumers
> > > > > > >> > > > >> > > > > > who were assigned the respective partition.
> > > This
> > > > > > might
> > > > > > >> not
> > > > > > >> > > be
> > > > > > >> > > > >> too
> > > > > > >> > > > >> > bad
> > > > > > >> > > > >> > > > > since
> > > > > > >> > > > >> > > > > > we retain the ability to validate the
> > > generation
> > > > > id,
> > > > > > >> but
> > > > > > >> > it
> > > > > > >> > > > is a
> > > > > > >> > > > >> > > > > potential
> > > > > > >> > > > >> > > > > > concern. We have considered alternative
> > > protocols
> > > > > > which
> > > > > > >> > add
> > > > > > >> > > a
> > > > > > >> > > > >> > second
> > > > > > >> > > > >> > > > > > round-trip to the protocol in order to give
> > the
> > > > > > >> > coordinator
> > > > > > >> > > > the
> > > > > > >> > > > >> > > ability
> > > > > > >> > > > >> > > > > to
> > > > > > >> > > > >> > > > > > confirm the assignment. As mentioned above,
> > the
> > > > > > >> > coordinator
> > > > > > >> > > is
> > > > > > >> > > > >> > > somewhat
> > > > > > >> > > > >> > > > > > limited in what it can actually validate,
> but
> > > > this
> > > > > > >> would
> > > > > > >> > > > return
> > > > > > >> > > > >> its
> > > > > > >> > > > >> > > > > ability
> > > > > > >> > > > >> > > > > > to validate commits. The tradeoff is that
> it
> > > > > > increases
> > > > > > >> the
> > > > > > >> > > > >> > protocol's
> > > > > > >> > > > >> > > > > > complexity which means more ways for the
> > > protocol
> > > > > to
> > > > > > >> fail
> > > > > > >> > > and
> > > > > > >> > > > >> > > > > consequently
> > > > > > >> > > > >> > > > > > more edge cases in the code.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > It also misses an opportunity to generalize
> > the
> > > > > group
> > > > > > >> > > > membership
> > > > > > >> > > > >> > > > protocol
> > > > > > >> > > > >> > > > > > for additional use cases. In fact, after
> > you've
> > > > > gone
> > > > > > to
> > > > > > >> > the
> > > > > > >> > > > >> trouble
> > > > > > >> > > > >> > > of
> > > > > > >> > > > >> > > > > > moving assignment to the client, the main
> > thing
> > > > > that
> > > > > > is
> > > > > > >> > left
> > > > > > >> > > > in
> > > > > > >> > > > >> > this
> > > > > > >> > > > >> > > > > > protocol is basically a general group
> > > management
> > > > > > >> > capability.
> > > > > > >> > > > >> This
> > > > > > >> > > > >> > is
> > > > > > >> > > > >> > > > > > exactly what is needed for a few cases that
> > are
> > > > > > >> currently
> > > > > > >> > > > under
> > > > > > >> > > > >> > > > > discussion
> > > > > > >> > > > >> > > > > > (e.g. copycat or single-writer producer).
> > We've
> > > > > taken
> > > > > > >> this
> > > > > > >> > > > >> further
> > > > > > >> > > > >> > > step
> > > > > > >> > > > >> > > > > in
> > > > > > >> > > > >> > > > > > the proposal and attempted to envision what
> > > that
> > > > > > >> general
> > > > > > >> > > > >> protocol
> > > > > > >> > > > >> > > might
> > > > > > >> > > > >> > > > > > look like and how it could be used both by
> > the
> > > > > > consumer
> > > > > > >> > and
> > > > > > >> > > > for
> > > > > > >> > > > >> > some
> > > > > > >> > > > >> > > of
> > > > > > >> > > > >> > > > > > these other cases.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > Anyway, since time is running out on the
> new
> > > > > > consumer,
> > > > > > >> we
> > > > > > >> > > have
> > > > > > >> > > > >> > > perhaps
> > > > > > >> > > > >> > > > > one
> > > > > > >> > > > >> > > > > > last chance to consider a significant
> change
> > in
> > > > the
> > > > > > >> > protocol
> > > > > > >> > > > >> like
> > > > > > >> > > > >> > > this,
> > > > > > >> > > > >> > > > > so
> > > > > > >> > > > >> > > > > > have a look at the wiki and share your
> > > thoughts.
> > > > > I've
> > > > > > >> no
> > > > > > >> > > doubt
> > > > > > >> > > > >> that
> > > > > > >> > > > >> > > > some
> > > > > > >> > > > >> > > > > > ideas seem clearer in my mind than they do
> on
> > > > > paper,
> > > > > > so
> > > > > > >> > ask
> > > > > > >> > > > >> > questions
> > > > > > >> > > > >> > > > if
> > > > > > >> > > > >> > > > > > there is any confusion.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > Thanks!
> > > > > > >> > > > >> > > > > > Jason
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > --
> > > > > > >> > > > >> > > > Thanks,
> > > > > > >> > > > >> > > > Ewen
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > --
> > > > > > >> > > > >> > Thanks,
> > > > > > >> > > > >> > Ewen
> > > > > > >> > > > >> >
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Ewen
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> > Thanks,
> > Neha
> >
>



-- 
Thanks,
Neha

Reply via email to