Ewen,

Honestly I am not sure whether we can keep all the topic in the same
cluster or not. By default, we will mirror the topic. Because when
application teams run their test, they would expect the environment to be
as similar to production as possible. Also, we are developing a
comprehensive continuous verification test framework as well. There will be
a bunch of test continuously running, including some tests that
periodically create topic, produce some data, verify the data integrity and
performance statistics, then do the clean up. The tests framework will also
include the entire pipeline but not only one cluster.

Another use case for wildcard besides mirror maker is that our auditing
service. Currently our auditing framework also reads from all the topics
and compares the messages count with the producers to see if there is any
data loss.

So I think for some topics we can let mirror maker exclude them, but it
might be a little bit difficult to exclude everything from all wildcard
consumer groups.

Thanks,

Jiangjie (Becket) Qin



On Mon, Aug 17, 2015 at 5:06 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> Becket,
>
> Just to clarify, when topics are being created/resized for automated tests,
> are those excluded by mirrormaker? I assume you don't want to bother with
> copying them since they are just for tests. If they are excluded, then they
> don't need to affect the metadata used by the consumer and so wouldn't
> trigger rebalancing in the scheme proposed -- the hash being reported can
> be restricted to the set of topics that will be processed by the consumer
> group. Consumers should only ever need to rejoin the group if they detect a
> metadata change that would affect the group's behavior.
>
> -Ewen
>
> On Mon, Aug 17, 2015 at 11:14 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Becket,
> >
> > Thanks for raising these concerns. I think the rolling upgrade problem is
> > handled quite a bit better with the new consumer if Onur's LeaveGroup
> patch
> > (or a variant) is accepted. Without it, the coordinator has to wait for
> the
> > full session timeout to detect that a node has left the group, which can
> > significantly delay rebalancing.
> >
> > -Jason
> >
> > On Sun, Aug 16, 2015 at 11:42 AM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > Hey Becket,
> > >
> > > These are all fair points. Regarding running Kafka as a service, it
> will
> > be
> > > good for everyone to know some numbers around topic creation and
> changes
> > > around # of partitions. I don't think the test usage is a good one
> since
> > no
> > > one should be creating and deleting topics in a loop on a production
> > > cluster.
> > >
> > > Regarding your point about mirror maker, I think we should definitely
> > > ensure that a mirror maker with a large topic subscription list should
> be
> > > well supported. The reason for the half an hour stalls in LinkedIn
> mirror
> > > makers is due to the fact that it still uses the old consumer. The 3
> > major
> > > reasons for long rebalance operations in the mirror maker using the old
> > > consumer are -
> > > 1. Every rebalance operation involves many writes to ZK. For large
> > consumer
> > > groups and large number of partitions, this adds up to several seconds
> > > 2. The view of the members in a group is inconsistent and hence there
> are
> > > typically several rebalance attempts. Note that this is fixed in the
> new
> > > design where the group membership is always consistently communicated
> to
> > > all members.
> > > 3. The rebalance backoff time is high (of the order of seconds).
> > >
> > > The good news is that the new design has none of these problems. As we
> > > said, we will run some tests to ensure that the mirror maker case is
> > > handled well. Thank you for the feedback!
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Sat, Aug 15, 2015 at 6:17 PM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > wrote:
> > >
> > > > Neha, Ewen and Jason,
> > > >
> > > > Maybe I am over concerning and I agree that it does depend on the
> > > metadata
> > > > change frequency. As Neha said, a few tests will be helpful. We can
> see
> > > how
> > > > it goes.
> > > >
> > > > What worries me is that in LinkedIn we are in the progress of running
> > > Kafka
> > > > as a service. That means user will have the ability to create/delete
> > > topics
> > > > and update their topic configurations programmatically or through UI
> > > using
> > > > LinkedIn internal cloud service platform. And some automated test
> will
> > > also
> > > > be consistently running to create topic, produce/consume some data,
> > then
> > > > clean up the topics. This brings us some new challenges for mirror
> > maker
> > > > because we need to make sure it actually copies data instead of
> > spending
> > > > too much time on rebalance. As what we see now is that each rebalance
> > > will
> > > > probably take 30 seconds or more to finish even with some tricks and
> > > > tuning.
> > > >
> > > > Another related use case I want to bring up is that today when we
> want
> > to
> > > > do a rolling upgrade of mirror maker (26 nodes), there will be two
> > rounds
> > > > of rebalance to bounce each node, each rebalance takes about 30
> > seconds.
> > > So
> > > > bouncing 26 nodes takes roughly half an hour. Awkwardly, during the
> > > rolling
> > > > bounce, because mirror maker is keeping rebalancing, it actually does
> > not
> > > > really copy any data! So the pipeline will literally stall for half
> an
> > > > hour! Since we are designing the new protocol, it will also be good
> it
> > we
> > > > make sure this use case is addressed.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Sat, Aug 15, 2015 at 10:50 AM, Neha Narkhede <n...@confluent.io>
> > > wrote:
> > > >
> > > > > Becket,
> > > > >
> > > > > This is a clever approach for to ensure that only one thing
> > > communicates
> > > > > the metadata so even if it is stale, the entire group has the same
> > > view.
> > > > > However, the big assumption this makes is that the coordinator is
> > that
> > > > one
> > > > > process that has the ability to know the metadata for group
> members,
> > > > which
> > > > > does not work for any non-consumer use case.
> > > > >
> > > > > I wonder if we may be complicating the design of 95% use cases for
> > the
> > > > > remaining 5%. For instance, how many times do people create and
> > remove
> > > > > topics or even add partitions? We operated LI clusters for a long
> > time
> > > > and
> > > > > this wasn't a frequent event that would need us to optimize this
> > design
> > > > > for.
> > > > >
> > > > > Also, this is something we can easily validate by running a few
> tests
> > > on
> > > > > the patch and I suggest we wait for that.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Sat, Aug 15, 2015 at 9:14 AM, Jason Gustafson <
> ja...@confluent.io
> > >
> > > > > wrote:
> > > > >
> > > > > > Hey Jiangjie,
> > > > > >
> > > > > > I was thinking about the same problem. When metadata is changing
> > > > > > frequently, the clients may not be able to ever find agreement on
> > the
> > > > > > current state. The server doesn't have this problem, as you say,
> > > > because
> > > > > it
> > > > > > can just take a snapshot and send that to the clients. Adding a
> > > > dampening
> > > > > > setting to the client would help if churn is sporadic, but not if
> > it
> > > is
> > > > > > steady. I think the metadata would have to be changing really
> > > > frequently
> > > > > > for this to be a problem practically, but this is a case where
> the
> > > > > > server-side approach has an advantage.
> > > > > >
> > > > > > Including the metadata in the join group response would require
> > > making
> > > > > the
> > > > > > subscription available to the coordinator, right? We lose a
> little
> > > bit
> > > > of
> > > > > > the generality of the protocol, but it might not be too bad since
> > > most
> > > > > use
> > > > > > cases for reusing this protocol have a similar need for metadata
> > (and
> > > > > they
> > > > > > can always pass an empty subscription if they don't). We've gone
> > back
> > > > and
> > > > > > forth a few times on this, and I'm generally not opposed. It
> might
> > > help
> > > > > if
> > > > > > we try to quantify the impact of the metadata churn in practice.
> I
> > > > think
> > > > > > the rate of change would have to be in the low seconds for this
> to
> > > be a
> > > > > > real problem. It does seem nice though that we have a way to
> manage
> > > > even
> > > > > > this much churn when we do this.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 14, 2015 at 9:03 PM, Jiangjie Qin
> > > > <j...@linkedin.com.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Ewen,
> > > > > > >
> > > > > > > I agree that if there is a churn in metadata, the consumers
> need
> > > > > several
> > > > > > > rounds of rebalances to succeed. The difference I am thinking
> is
> > > that
> > > > > > with
> > > > > > > coordinator as single source of truth, we can let the consumer
> > > finish
> > > > > one
> > > > > > > round of rebalance, work for a while and start the next round
> of
> > > > > > rebalance.
> > > > > > > If we purely depend on the consumers to synchronize by
> themselves
> > > > based
> > > > > > on
> > > > > > > different metadata sources, is it possible we have some groups
> > > > > spending a
> > > > > > > lot of time on rebalancing but not able to make too much
> progress
> > > in
> > > > > > > consuming?
> > > > > > >
> > > > > > > I'm thinking can we let consumers to fetch metadata only from
> > their
> > > > > > > coordinator? So the rebalance can be done in the following way:
> > > > > > >
> > > > > > > 1. Consumers refresh their metadata periodically
> > > > > > > 2. If one of the consumer sees a change in metadata that
> > triggers a
> > > > > > > rebalance, it sends JoinGroupRequest to coordinator.
> > > > > > > 3. Once the coordinator receives the first JoinGroupRequest of
> a
> > > > group,
> > > > > > it
> > > > > > > takes a snapshot of current metadata and the group enters
> > > > > > prepare-rebalance
> > > > > > > state.
> > > > > > > 4. The metadata snapshot will be used for this round of
> > rebalance.
> > > > i.e.
> > > > > > the
> > > > > > > metadata snapshot will be sent to consumers in
> JoinGroupResponse.
> > > > > > > 4.1 If the consumers are subscribing to explicit topic lists
> (not
> > > > > regex),
> > > > > > > the JoinGroupResponse needs only contain the metadata of all
> > topics
> > > > the
> > > > > > > group is interested.
> > > > > > > 4.2 If the consumers are subscribing using regex, all the topic
> > > > > metadata
> > > > > > > will be returned to the consumer.
> > > > > > > 5. Consumers get JoinGroupResponse, refresh metadata using the
> > > > metadata
> > > > > > in
> > > > > > > JoinGroupResponse, run algorithm to assign partitions and start
> > > > > consume.
> > > > > > > 6. Go back to 1.
> > > > > > >
> > > > > > > The benefit here is that we can let rebalance finish in one
> > round,
> > > > and
> > > > > > all
> > > > > > > the rest of changes will be captured in next consumer metadata
> > > > refresh
> > > > > -
> > > > > > so
> > > > > > > we get group commit. One concern might be letting consumer
> > refresh
> > > > > > metadata
> > > > > > > from coordinator might cause issue for big consumer groups.
> Maybe
> > > > that
> > > > > is
> > > > > > > OK because metadata refresh is infrequent.
> > > > > > >
> > > > > > > This approach actually is very similar to what is proposed now:
> > > > > rebalance
> > > > > > > is triggered by metadata refresh, consumer provides
> subscription
> > > list
> > > > > to
> > > > > > > pass around. The only difference is that we don't need metadata
> > > hash
> > > > > > > anymore because the metadata is guaranteed to be the same.
> > > Replacing
> > > > > > > metadata hash with actual metadata will not have too much
> > overhead
> > > > for
> > > > > > > small subscription groups. There will be some overhead for
> regex
> > > > > > > subscriptions, but this can save the potential extra round of
> > > > metadata
> > > > > > > fetch and will only occur when consumer see metadata change,
> > which
> > > is
> > > > > > > infrequent.
> > > > > > >
> > > > > > > Any thoughts?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 14, 2015 at 12:57 PM, Ewen Cheslack-Postava <
> > > > > > e...@confluent.io
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > On Fri, Aug 14, 2015 at 10:59 AM, Jiangjie Qin
> > > > > > <j...@linkedin.com.invalid
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Neha and Ewen,
> > > > > > > > >
> > > > > > > > > About the metadata change frequency. I guess it really
> > depends
> > > on
> > > > > how
> > > > > > > > > frequent the metadata change might occur. If we run Kafka
> as
> > a
> > > > > > > service, I
> > > > > > > > > can see that happens from time to time. As I can imagine
> > people
> > > > > will
> > > > > > > > create
> > > > > > > > > some topic, test and maybe delete the topic in some
> automated
> > > > test.
> > > > > > If
> > > > > > > > so,
> > > > > > > > > the proposed protocol might be a little bit vulnerable.
> > > > > > > > >
> > > > > > > > > More specifically the scenario I am thinking is:
> > > > > > > > > 1. Consumer 0 periodically refresh metadata and detected a
> > > > metadata
> > > > > > > > change.
> > > > > > > > > So it sends a JoinGroupRequest with metadata_hash_0.
> > > > > > > > > 2. Consumer 1 was notified by controller to start a
> > rebalance,
> > > so
> > > > > it
> > > > > > > > > refreshes its metadata and send a JoingGroupRequest with
> > > > > > > metadata_hash_1,
> > > > > > > > > which is different from metadata hash 0.
> > > > > > > > > 3. Rebalance failed and both consumer refresh there
> metadata
> > > > again
> > > > > > from
> > > > > > > > > different brokers.
> > > > > > > > > 4. Depending on the metadata change frequency(or some admin
> > > > > operation
> > > > > > > > like
> > > > > > > > > partition reassigment), they may or may not have the same
> > > > metadata
> > > > > > > > > returned, so the restart from 3 again.
> > > > > > > > >
> > > > > > > > > I agree that step 4 might not be a big concern if consumers
> > > > updates
> > > > > > > > > metadata at almost the same time, but I'm a little bit
> > worried
> > > > > > whether
> > > > > > > > that
> > > > > > > > > assumption really stands because we do not have control
> over
> > > how
> > > > > > > frequent
> > > > > > > > > the metadata can change.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > Is this really that different from what would happen if the
> > > > > coordinator
> > > > > > > > distributed the metadata to consumers? In that case you would
> > > > > trivially
> > > > > > > > have everyone in a consistent state, but those metadata
> changes
> > > > would
> > > > > > > still
> > > > > > > > cause churn and require JoinGroup rounds, during which
> > processing
> > > > is
> > > > > > > > stalled for the nodes that are waiting on other members to
> > > re-join
> > > > > the
> > > > > > > > group.
> > > > > > > >
> > > > > > > > -Ewen
> > > > > > > >
> > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > > On Fri, Aug 14, 2015 at 2:03 AM, Ewen Cheslack-Postava <
> > > > > > > > e...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > On Thu, Aug 13, 2015 at 11:07 PM, Neha Narkhede <
> > > > > n...@confluent.io
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Becket,
> > > > > > > > > > >
> > > > > > > > > > > As you say, the metadata hash addresses the concern you
> > > > > > originally
> > > > > > > > > raised
> > > > > > > > > > > about large topic subscriptions. Can you please list
> > other
> > > > > > problems
> > > > > > > > you
> > > > > > > > > > are
> > > > > > > > > > > raising more clearly? It is more helpful to know
> problems
> > > > that
> > > > > > the
> > > > > > > > > > proposal
> > > > > > > > > > > does not address or addresses poorly.
> > > > > > > > > > >
> > > > > > > > > > > Regarding other things you said -
> > > > > > > > > > >
> > > > > > > > > > > it is required that each
> > > > > > > > > > > > consumer refresh their metadata before sending a
> > > > > > JoinGroupRequest
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > This is required for wildcard topic subscriptions
> anyway.
> > > So
> > > > > this
> > > > > > > > > > proposal
> > > > > > > > > > > does not introduce a regression. We had agreed earlier
> > that
> > > > it
> > > > > > does
> > > > > > > > not
> > > > > > > > > > > make sense for the server to deserialize regular
> > > expressions
> > > > > sent
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > > > consumer.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I don't think consumers need to do a metadata refresh
> > before
> > > > > > sending
> > > > > > > a
> > > > > > > > > > JoinGroupRequest. Metadata changes that affect assignment
> > are
> > > > > rare
> > > > > > --
> > > > > > > > it
> > > > > > > > > > requires changing the number of partitions in a topic.
> But
> > > you
> > > > > > might
> > > > > > > > > send a
> > > > > > > > > > JoinGroupRequest simply because a new member is trying to
> > > join
> > > > > the
> > > > > > > > group.
> > > > > > > > > > That case is presumably much more common.
> > > > > > > > > >
> > > > > > > > > > I think it's actually a good idea to have the first
> > JoinGroup
> > > > > cycle
> > > > > > > > fail
> > > > > > > > > in
> > > > > > > > > > some cases, and has little impact. Lets say the metadata
> > does
> > > > > > change
> > > > > > > > > > because partitions are added. Then we might fail in the
> > first
> > > > > > round,
> > > > > > > > but
> > > > > > > > > > then all members detect that issue *immediately*, refresh
> > > their
> > > > > > > > metadata,
> > > > > > > > > > and submit a new join group request. This second cycle
> does
> > > not
> > > > > > > > require a
> > > > > > > > > > full heartbeat cycle. It happens much more quickly
> because
> > > > > everyone
> > > > > > > > > > detected the inconsistency based on the first
> > > > JoinGroupResponse.
> > > > > > The
> > > > > > > > > > inconsistency should be resolved very quickly (barring
> > other
> > > > > > failures
> > > > > > > > > like
> > > > > > > > > > a member leaving mid-rebalance)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > the metadata might still be inconsistent if there is
> a
> > > > topic
> > > > > or
> > > > > > > > > > partition
> > > > > > > > > > > > change because the
> > > > > > > > > > > > UpdateMetadataRequest from controller might be
> handled
> > at
> > > > > > > different
> > > > > > > > > > time.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Topic metadata does not change frequently and even if
> it
> > > > did, a
> > > > > > > > couple
> > > > > > > > > > > rebalance attempts will be needed whether the
> coordinator
> > > > > drives
> > > > > > > the
> > > > > > > > > > > assignments or the consumer. Because guess how the
> > > > coordinator
> > > > > > > knows
> > > > > > > > > > about
> > > > > > > > > > > the topic metadata changes -- indirectly through
> either a
> > > zk
> > > > > > > callback
> > > > > > > > > or
> > > > > > > > > > > UpdateMetadataRequest, so it is completely possible the
> > > > > > coordinator
> > > > > > > > > sees
> > > > > > > > > > > the topic metadata changes in batches, not all at once.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 13, 2015 at 10:50 PM, Neha Narkhede <
> > > > > > n...@confluent.io
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Ewen/Jason,
> > > > > > > > > > > >
> > > > > > > > > > > > The metadata hash is a clever approach and certainly
> > > > > addresses
> > > > > > > the
> > > > > > > > > > > problem
> > > > > > > > > > > > of large metadata for consumers like mirror maker.
> Few
> > > > > > comments -
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >    1. In the interest of simplifying the format of
> the
> > > > > > consumer's
> > > > > > > > > > > >    metadata - Why not just always include only the
> > topic
> > > > > names
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > metadata
> > > > > > > > > > > >    followed by the metadata hash? If the metadata
> hash
> > > > check
> > > > > > > > > succeeds,
> > > > > > > > > > > each
> > > > > > > > > > > >    consumer uses the # of partitions it had fetched.
> If
> > > it
> > > > > > > fails, a
> > > > > > > > > > > rebalance
> > > > > > > > > > > >    happens and the metadata is not used anyway.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Doing this requires that every consumer always fetch the
> > full
> > > > > > > metadata.
> > > > > > > > > The
> > > > > > > > > > most common use case is consumers that just want to
> consume
> > > one
> > > > > or
> > > > > > a
> > > > > > > > > couple
> > > > > > > > > > of topics, in which case grabbing all metadata for the
> > entire
> > > > > > cluster
> > > > > > > > is
> > > > > > > > > > wasteful. If I subscribe only to topic A, why make all
> > > > consumers
> > > > > > grab
> > > > > > > > > > metadata for the entire topic (and need to rebalance
> every
> > > time
> > > > > it
> > > > > > > > > > changes!). Including the # of partitions for each topic
> > lets
> > > > you
> > > > > > > avoid
> > > > > > > > > > having to grab the global set of metadata.
> > > > > > > > > >
> > > > > > > > > > So if you're just subscribing to one or a couple of
> topics,
> > > why
> > > > > not
> > > > > > > > just
> > > > > > > > > > compute the hash by filtering out everything but the
> topics
> > > you
> > > > > are
> > > > > > > > > > subscribed to? The problem there is if you ever
> add/remove
> > > > > > > > subscriptions
> > > > > > > > > > and want to support rolling upgrades. If the group was
> > > > subscribed
> > > > > > to
> > > > > > > > > topic
> > > > > > > > > > A, but later changes require subscribing to A + B, then
> to
> > > > > achieve
> > > > > > a
> > > > > > > > > > seamless rolling upgrade would require one (old) consumer
> > to
> > > be
> > > > > > > > > subscribing
> > > > > > > > > > to A and one (new) consumer to be subscribing to A+B. If
> we
> > > > > > computed
> > > > > > > > > > metadata hashes based on filtered metadata, those two
> would
> > > > > > disagree
> > > > > > > > and
> > > > > > > > > we
> > > > > > > > > > could not perform assignment while the upgrade was in
> > > progress.
> > > > > > > > > >
> > > > > > > > > > The solution is to differentiate between the cases when a
> > > very
> > > > > > small
> > > > > > > > > amount
> > > > > > > > > > of the metadata is needed (one or a couple of topic
> > > > > subscriptions;
> > > > > > > > > > communicate and share this via metadata in the JoinGroup
> > > > > protocol)
> > > > > > vs
> > > > > > > > > when
> > > > > > > > > > *all* the metadata is needed (regex subscription; verify
> > > > > agreement
> > > > > > > via
> > > > > > > > > > hash).
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > >    2. Do you need a topic list and topic regex to be
> > > > > separate?
> > > > > > A
> > > > > > > > > single
> > > > > > > > > > > >    topic or list of topics can be expressed as a
> regex.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > See above note about collecting all metadata when you
> > really
> > > > only
> > > > > > > need
> > > > > > > > it
> > > > > > > > > > for 1 or 2 topics. There's probably some debate to be had
> > > about
> > > > > > > whether
> > > > > > > > > > this cost would be too high -- every consumer would need
> to
> > > > > request
> > > > > > > the
> > > > > > > > > > metadata for all topics, and they'd need to request that
> > all
> > > > > every
> > > > > > > time
> > > > > > > > > > they might be out of date.
> > > > > > > > > >
> > > > > > > > > Are we going to allow consumers in the same group to
> > subscribe
> > > to
> > > > > > > > different
> > > > > > > > > topic set? If we do, we need to let them refresh metadata
> for
> > > all
> > > > > the
> > > > > > > > > topics a group is consuming from. If we don't then in the
> > > > protocol
> > > > > we
> > > > > > > > only
> > > > > > > > > need a subscription set hash.
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > >    3. Let's include a version explicitly at the
> > beginning
> > > > of
> > > > > > the
> > > > > > > > > > > >    ProtocolMetadata. The version dictates how to
> > > > deserialize
> > > > > > the
> > > > > > > > > > > >    ProtocolMetadata blob and is consistent with the
> > rest
> > > of
> > > > > > > Kafka.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > If I'm understanding correctly, in JoinGroupRequest I
> would
> > > > > change
> > > > > > > > > >
> > > > > > > > > > GroupProtocols          => [Protocol ProtocolMetadata]
> > > > > > > > > >
> > > > > > > > > > to
> > > > > > > > > >
> > > > > > > > > > GroupProtocols          => [Protocol ProtocolVersion
> > > > > > > ProtocolMetadata]
> > > > > > > > > >
> > > > > > > > > > We had been talking about just baking the version into
> the
> > > > > Protocol
> > > > > > > > > field,
> > > > > > > > > > but making it separate seems perfectly reasonable to me.
> > > Jason,
> > > > > any
> > > > > > > > issue
> > > > > > > > > > with splitting the version out into a separate field like
> > > this?
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > That can simplify the metadata format to the
> following:
> > > > > > > > > > > >
> > > > > > > > > > > > GroupType => "consumer"
> > > > > > > > > > > >>
> > > > > > > > > > > >> Protocol => AssignmentStrategy
> > > > > > > > > > > >>   AssignmentStrategy   => String
> > > > > > > > > > > >>
> > > > > > > > > > > >> ProtocolMetadata => Version Subscription
> > > > > > > > AssignmentStrategyMetadata
> > > > > > > > > > > >
> > > > > > > > > > > >     Version                    => String
> > > > > > > > > > > >
> > > > > > > > > > > >   Subscription                 => TopicRegex
> > MetadataHash
> > > > > > > > > > > >>     TopicRegex                 => String
> > > > > > > > > > > >>     MetadataHash               => String
> > > > > > > > > > > >>   AssignmentStrategyMetadata   => bytes
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 13, 2015 at 6:28 PM, Jiangjie Qin
> > > > > > > > > > <j...@linkedin.com.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Ewen and Jason,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks for the reply. Sorry I missed the metadata
> > hash.
> > > > Yes,
> > > > > > > that
> > > > > > > > > is a
> > > > > > > > > > > >> clever approach and would solve my concern about the
> > > data
> > > > > > > passing
> > > > > > > > > > > around.
> > > > > > > > > > > >> I
> > > > > > > > > > > >> can see both pros and cons from doing this, though.
> > The
> > > > > > > advantage
> > > > > > > > is
> > > > > > > > > > we
> > > > > > > > > > > >> don't need the topic metadata in JoinGroupResponse
> > > > anymore.
> > > > > > The
> > > > > > > > > > downside
> > > > > > > > > > > >> is
> > > > > > > > > > > >> that now rebalance have extra dependency on the
> > > consensus
> > > > of
> > > > > > > > > metadata
> > > > > > > > > > of
> > > > > > > > > > > >> all consumers, which is obtained separately. So it
> is
> > > > > required
> > > > > > > > that
> > > > > > > > > > each
> > > > > > > > > > > >> consumer refresh their metadata before sending a
> > > > > > > JoinGroupRequest,
> > > > > > > > > > > >> otherwise in some cases (e.g. wildcard consumers)
> will
> > > > > almost
> > > > > > > > > > certainly
> > > > > > > > > > > >> fail for the first rebalance attempt. Even if we do
> > > that,
> > > > > > since
> > > > > > > > the
> > > > > > > > > > > >> consumers are getting metadata from different
> brokers,
> > > the
> > > > > > > > metadata
> > > > > > > > > > > might
> > > > > > > > > > > >> still be inconsistent if there is a topic or
> partition
> > > > > change
> > > > > > > > > because
> > > > > > > > > > > the
> > > > > > > > > > > >> UpdateMetadataRequest from controller might be
> handled
> > > at
> > > > > > > > different
> > > > > > > > > > > time.
> > > > > > > > > > > >> Just want to make sure we think through the cases so
> > the
> > > > > > > protocol
> > > > > > > > > does
> > > > > > > > > > > not
> > > > > > > > > > > >> cause us unexpected issues.
> > > > > > > > > > > >>
> > > > > > > > > > > >> About the number of consumers, I think with the
> > current
> > > > > > > liveliness
> > > > > > > > > > > >> definition, we can tolerate churns by bumping up the
> > > > session
> > > > > > > > > timeout.
> > > > > > > > > > > Also
> > > > > > > > > > > >> I guess we will see an increasing number of
> consumers
> > > for
> > > > > new
> > > > > > > > > > consumer,
> > > > > > > > > > > >> because every the old consumer thread will probably
> > > > become a
> > > > > > > > > consumer.
> > > > > > > > > > > >>
> > > > > > > > > > > >> It is a valid concern for consumers that have large
> > > > > > subscription
> > > > > > > > > set.
> > > > > > > > > > > This
> > > > > > > > > > > >> might not be avoided though for client side
> assignment
> > > > > > approach.
> > > > > > > > One
> > > > > > > > > > > >> solution is having topic names associate with a
> topic
> > > ID.
> > > > > And
> > > > > > > only
> > > > > > > > > use
> > > > > > > > > > > >> topic ID in JoinGroupRequest and JoinGroupResponse,
> > > There
> > > > > is a
> > > > > > > > > > > discussion
> > > > > > > > > > > >> thread about this to solve the topic renaming case
> but
> > > > this
> > > > > > is a
> > > > > > > > > > > >> completely
> > > > > > > > > > > >> different discussion.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Jiangjie (Becket) Qin
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Thu, Aug 13, 2015 at 2:14 PM, Jason Gustafson <
> > > > > > > > > ja...@confluent.io>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Thanks Jiangjie, that information helps. I agree
> the
> > > > > > protocol
> > > > > > > > must
> > > > > > > > > > > >> consider
> > > > > > > > > > > >> > scalability. My point was that the synchronization
> > > > barrier
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > >> current
> > > > > > > > > > > >> > protocol already effectively limits the number of
> > > > > consumers
> > > > > > > > since
> > > > > > > > > it
> > > > > > > > > > > >> > provides no way to gracefully handle churn. It
> > > wouldn't
> > > > be
> > > > > > > worth
> > > > > > > > > > > >> worrying
> > > > > > > > > > > >> > about scaling up to 100,000 members, for example,
> > > > because
> > > > > > > > there's
> > > > > > > > > no
> > > > > > > > > > > way
> > > > > > > > > > > >> > the group would be stable. So we just need to set
> > some
> > > > > clear
> > > > > > > > > > > >> expectations
> > > > > > > > > > > >> > on the size we can scale to, and that can help
> > inform
> > > > the
> > > > > > > > > discussion
> > > > > > > > > > > on
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > size of messages in this protocol.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Ewen and I were discussing this morning along
> > similar
> > > > > lines
> > > > > > to
> > > > > > > > > what
> > > > > > > > > > > >> you're
> > > > > > > > > > > >> > suggesting. However, even if the coordinator
> decides
> > > on
> > > > > the
> > > > > > > > > metadata
> > > > > > > > > > > for
> > > > > > > > > > > >> > the group, each member still needs to communicate
> > its
> > > > > > > > > subscriptions
> > > > > > > > > > to
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > rest of the group. This is nice for the regex case
> > > since
> > > > > the
> > > > > > > > regex
> > > > > > > > > > is
> > > > > > > > > > > >> > probably small, but if the members have a large
> > topic
> > > > > list,
> > > > > > > then
> > > > > > > > > we
> > > > > > > > > > > have
> > > > > > > > > > > >> > the same problem. One thing I was thinking about
> was
> > > > > whether
> > > > > > > we
> > > > > > > > > > really
> > > > > > > > > > > >> need
> > > > > > > > > > > >> > to handle different subscriptions for every
> member.
> > If
> > > > the
> > > > > > > > > > coordinator
> > > > > > > > > > > >> > could guarantee that all members had the same
> > > > > subscription,
> > > > > > > then
> > > > > > > > > > there
> > > > > > > > > > > >> > would be no need for the coordinator to return the
> > > > > > > subscriptions
> > > > > > > > > for
> > > > > > > > > > > >> each
> > > > > > > > > > > >> > member. However, this would prevent graceful
> > upgrades.
> > > > We
> > > > > > > might
> > > > > > > > be
> > > > > > > > > > > able
> > > > > > > > > > > >> to
> > > > > > > > > > > >> > fix that problem by allowing the consumer to
> provide
> > > two
> > > > > > > > > > subscriptions
> > > > > > > > > > > >> to
> > > > > > > > > > > >> > allowing rolling updates, but that starts to sound
> > > > pretty
> > > > > > > nasty.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > -Jason
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Thu, Aug 13, 2015 at 1:41 PM, Jiangjie Qin
> > > > > > > > > > > <j...@linkedin.com.invalid
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > Jason,
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > The protocol has to consider the scalability.
> The
> > > > > protocol
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > wiki
> > > > > > > > > > > >> > > means the JoinGroupResoponse size would be:
> > > > > > > > > > > >> > > NumberOfTopics * (AvgTopicNameLength + 4) *
> > > > > > > > > (NumberOfConsumers)^2
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > To give some real number, we have 26-node Mirror
> > > Maker
> > > > > > > > cluster,
> > > > > > > > > > each
> > > > > > > > > > > >> > with 4
> > > > > > > > > > > >> > > consumers. That is 104 consumers using regex
> ".*".
> > > And
> > > > > > most
> > > > > > > of
> > > > > > > > > our
> > > > > > > > > > > >> > clusters
> > > > > > > > > > > >> > > have around 3000 topics, whose topic name are
> > > > typically
> > > > > > > around
> > > > > > > > > 20
> > > > > > > > > > > >> > > characters.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > I think the key issue for client side partition
> > > > > assignment
> > > > > > > > logic
> > > > > > > > > > is
> > > > > > > > > > > to
> > > > > > > > > > > >> > make
> > > > > > > > > > > >> > > sure 1) all the clients run the same algorithm.
> 2)
> > > all
> > > > > the
> > > > > > > > > clients
> > > > > > > > > > > >> make
> > > > > > > > > > > >> > > decision on the same topic metadata. The second
> > > > purpose
> > > > > > can
> > > > > > > be
> > > > > > > > > > done
> > > > > > > > > > > by
> > > > > > > > > > > >> > > simply letting coordinator provide the topic
> > > metadata
> > > > > and
> > > > > > > all
> > > > > > > > > then
> > > > > > > > > > > >> member
> > > > > > > > > > > >> > > information as source of truth. Is it necessary
> to
> > > > pass
> > > > > > > topic
> > > > > > > > > > > >> metadata of
> > > > > > > > > > > >> > > each consumer around? Can we keep the protocol
> > > > metadata
> > > > > > > field
> > > > > > > > > > > >> completely
> > > > > > > > > > > >> > > independent of topic metadata? I think In the
> > > > > > > > JoinGroupResponse,
> > > > > > > > > > we
> > > > > > > > > > > >> > should
> > > > > > > > > > > >> > > have only one copy of topic metadata provided by
> > > > > > coordinator
> > > > > > > > and
> > > > > > > > > > is
> > > > > > > > > > > >> > outside
> > > > > > > > > > > >> > > of protocol metadata. If user decides to put
> some
> > > > > metadata
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > >> > > JoinGroupRequest and let coordinator pass
> around,
> > > they
> > > > > are
> > > > > > > > > > > responsible
> > > > > > > > > > > >> > for
> > > > > > > > > > > >> > > understanding the risk.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > Jiangjie (Becket) Qin
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > On Thu, Aug 13, 2015 at 12:41 PM, Jason
> Gustafson
> > <
> > > > > > > > > > > ja...@confluent.io
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > > Hey Onur and Jiangjie,
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > I've updated that wiki with a proposal to add
> > > regex
> > > > > > > > > > subscriptions
> > > > > > > > > > > to
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > consumer metadata. Can you have a look to see
> if
> > > it
> > > > > > > > addresses
> > > > > > > > > > your
> > > > > > > > > > > >> > > concern?
> > > > > > > > > > > >> > > > In general, I think we should be a little
> > careful
> > > > when
> > > > > > we
> > > > > > > > are
> > > > > > > > > > > >> talking
> > > > > > > > > > > >> > > about
> > > > > > > > > > > >> > > > the scalability of the protocol. Regardless of
> > > > whether
> > > > > > > > > > assignment
> > > > > > > > > > > is
> > > > > > > > > > > >> > done
> > > > > > > > > > > >> > > > on the server or the client, the protocol
> > assumes
> > > a
> > > > > > > > relatively
> > > > > > > > > > > >> stable
> > > > > > > > > > > >> > > > configuration. When the number of consumers
> > > > increases
> > > > > > > > beyond a
> > > > > > > > > > > >> certain
> > > > > > > > > > > >> > > > limit, then membership churn becomes a major
> > > > concern.
> > > > > > > > > Similarly
> > > > > > > > > > > >> there
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> > > a
> > > > > > > > > > > >> > > > notion of metadata churn when topics are
> added,
> > > > > deleted,
> > > > > > > or
> > > > > > > > > > > >> resized. If
> > > > > > > > > > > >> > > > either membership or metadata changes, then
> the
> > > > > protocol
> > > > > > > > > forces
> > > > > > > > > > > all
> > > > > > > > > > > >> > > > consumers to stop consumption and rejoin the
> > > group.
> > > > If
> > > > > > > this
> > > > > > > > > > > happens
> > > > > > > > > > > >> > often
> > > > > > > > > > > >> > > > enough, then it can severely impact the
> ability
> > of
> > > > the
> > > > > > > > > consumer
> > > > > > > > > > to
> > > > > > > > > > > >> make
> > > > > > > > > > > >> > > > progress. The point is that the protocol may
> > > already
> > > > > be
> > > > > > > > > unsuited
> > > > > > > > > > > to
> > > > > > > > > > > >> > cases
> > > > > > > > > > > >> > > > where there are either a large number of
> > consumers
> > > > or
> > > > > > > > topics.
> > > > > > > > > I
> > > > > > > > > > > >> wonder
> > > > > > > > > > > >> > if
> > > > > > > > > > > >> > > > you guys can share your thoughts about your
> > > scaling
> > > > > > > > > > expectations?
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > -Jason
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > On Wed, Aug 12, 2015 at 12:28 PM, Jason
> > Gustafson
> > > <
> > > > > > > > > > > >> ja...@confluent.io>
> > > > > > > > > > > >> > > > wrote:
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > Hey Jiangjie,
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > That's a great point. In the worst case (the
> > > > mirror
> > > > > > > maker
> > > > > > > > > > case I
> > > > > > > > > > > >> > > guess),
> > > > > > > > > > > >> > > > > the join group response can be massive. This
> > > would
> > > > > be
> > > > > > > > > > especially
> > > > > > > > > > > >> > deadly
> > > > > > > > > > > >> > > > > when there is a lot of churn in the group
> > (e.g.
> > > > in a
> > > > > > > > rolling
> > > > > > > > > > > >> > upgrade).
> > > > > > > > > > > >> > > > The
> > > > > > > > > > > >> > > > > current protocol is not great for this case
> > > > either,
> > > > > > but
> > > > > > > > it's
> > > > > > > > > > > >> > > > significantly
> > > > > > > > > > > >> > > > > better. Here are a couple ways to deal with
> > the
> > > > > size:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > 1. First, we could have the coordinator
> > compress
> > > > the
> > > > > > > > > > responses.
> > > > > > > > > > > >> This
> > > > > > > > > > > >> > > > would
> > > > > > > > > > > >> > > > > probably be pretty effective if applied
> across
> > > the
> > > > > > > > metadata
> > > > > > > > > > from
> > > > > > > > > > > >> all
> > > > > > > > > > > >> > > > > members.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > 2. I think the regex case is the main
> problem.
> > > Is
> > > > > that
> > > > > > > > > right?
> > > > > > > > > > We
> > > > > > > > > > > >> > could
> > > > > > > > > > > >> > > > > extend the metadata to allow the consumer to
> > > embed
> > > > > its
> > > > > > > > regex
> > > > > > > > > > > >> > > subscription
> > > > > > > > > > > >> > > > > in the metadata directly (note this might
> be a
> > > > good
> > > > > > idea
> > > > > > > > > > > >> regardless
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> > > > the
> > > > > > > > > > > >> > > > > rest of this proposal). To support regex on
> > the
> > > > > > > consumer,
> > > > > > > > we
> > > > > > > > > > > must
> > > > > > > > > > > >> > fetch
> > > > > > > > > > > >> > > > > metadata for all topics. Rather than having
> > all
> > > > > regex
> > > > > > > > > > > subscribers
> > > > > > > > > > > >> > embed
> > > > > > > > > > > >> > > > all
> > > > > > > > > > > >> > > > > of this metadata in their join group
> requests,
> > > > they
> > > > > > > could
> > > > > > > > > > > instead
> > > > > > > > > > > >> > > embed a
> > > > > > > > > > > >> > > > > hash of it. Then after the join group
> > responses
> > > > are
> > > > > > > > > received,
> > > > > > > > > > > they
> > > > > > > > > > > >> > just
> > > > > > > > > > > >> > > > > need to check that the hashes are the same.
> If
> > > > there
> > > > > > is
> > > > > > > a
> > > > > > > > > > > mismatch
> > > > > > > > > > > >> > > (which
> > > > > > > > > > > >> > > > > should only occur when topics are created,
> > > > deleted,
> > > > > or
> > > > > > > > > > resized),
> > > > > > > > > > > >> then
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > group members must refetch the metadata and
> > > rejoin
> > > > > the
> > > > > > > > > group.
> > > > > > > > > > > >> This is
> > > > > > > > > > > >> > > > also
> > > > > > > > > > > >> > > > > how the current protocol behaves when there
> > is a
> > > > > > change
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > >> topic
> > > > > > > > > > > >> > > > > metadata affecting the group--someone
> (either
> > > the
> > > > > > > > > coordinator
> > > > > > > > > > or
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > > > > consumer) detects the change and forces the
> > > group
> > > > to
> > > > > > > > > > rebalance.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > What do you think?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > (Also I think adding groupId/generationId to
> > > fetch
> > > > > and
> > > > > > > > > produce
> > > > > > > > > > > >> > requests
> > > > > > > > > > > >> > > > > seems like an interesting line of thought.)
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > -Jason
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > On Wed, Aug 12, 2015 at 10:57 AM, Jiangjie
> Qin
> > > > > > > > > > > >> > > <j...@linkedin.com.invalid
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > >> Hey Ewen,
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >> Onur and I discussed this a little bit
> more.
> > > And
> > > > we
> > > > > > are
> > > > > > > > > still
> > > > > > > > > > > >> > worrying
> > > > > > > > > > > >> > > > >> about passing all the metadata of all
> > consumers
> > > > > > around.
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >> Let's say I have a cluster has 10,000
> topics,
> > > the
> > > > > > > average
> > > > > > > > > > topic
> > > > > > > > > > > >> name
> > > > > > > > > > > >> > > > >> length
> > > > > > > > > > > >> > > > >> is 10 bytes. In this case, the opaque
> > metadata
> > > > will
> > > > > > > have
> > > > > > > > > 10 *
> > > > > > > > > > > >> > 10,000 =
> > > > > > > > > > > >> > > > >> 100KB for topic name, for each topic, there
> > is
> > > a
> > > > > > 4-byte
> > > > > > > > > > integer
> > > > > > > > > > > >> of
> > > > > > > > > > > >> > > > number
> > > > > > > > > > > >> > > > >> of partitions, that's another 40KB. So one
> > > global
> > > > > > topic
> > > > > > > > > > > metadata
> > > > > > > > > > > >> > will
> > > > > > > > > > > >> > > > have
> > > > > > > > > > > >> > > > >> 140KB data. If I have 100 consumers who are
> > > using
> > > > > > > > wildcard
> > > > > > > > > to
> > > > > > > > > > > >> > consume
> > > > > > > > > > > >> > > > from
> > > > > > > > > > > >> > > > >> all the topics. That means the protocol
> > > metadata
> > > > > end
> > > > > > up
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > >> > > > >> JoinGroupResponse will be 140KB * 100 =
> 14MB.
> > > And
> > > > > the
> > > > > > > > > > > >> > > JoinGroupResponse
> > > > > > > > > > > >> > > > >> will need to be sent to 100 different
> > > consumers,
> > > > > that
> > > > > > > > means
> > > > > > > > > > > 14MB
> > > > > > > > > > > >> *
> > > > > > > > > > > >> > > 100 =
> > > > > > > > > > > >> > > > >> 1.4GB need to be sent by the consumer
> > > coordinator
> > > > > for
> > > > > > > one
> > > > > > > > > > > >> rebalance.
> > > > > > > > > > > >> > > How
> > > > > > > > > > > >> > > > >> would that work?
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >> Also, having two consumers (old owner and
> new
> > > > > owner)
> > > > > > > > > > consuming
> > > > > > > > > > > >> from
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> same partition might also be a problem.
> e.g.
> > > > people
> > > > > > are
> > > > > > > > > > > updating
> > > > > > > > > > > >> > > > database.
> > > > > > > > > > > >> > > > >> One thing might worth doing is to add
> GroupId
> > > and
> > > > > > > > > Generation
> > > > > > > > > > ID
> > > > > > > > > > > >> to
> > > > > > > > > > > >> > > > >> ProducerRequest and FetchRequest as well.
> > This
> > > > will
> > > > > > > also
> > > > > > > > > help
> > > > > > > > > > > >> with
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > >> single producer use case. However, this is
> > > > probably
> > > > > > > > > > orthogonal
> > > > > > > > > > > to
> > > > > > > > > > > >> > this
> > > > > > > > > > > >> > > > >> thread given the current new consumer also
> > has
> > > > this
> > > > > > > > problem
> > > > > > > > > > > and I
> > > > > > > > > > > >> > > > believe
> > > > > > > > > > > >> > > > >> we need to fix it.
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >> Thanks,
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >> Jiangjie (Becket) Qin
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >> On Tue, Aug 11, 2015 at 11:43 PM, Ewen
> > > > > > > Cheslack-Postava <
> > > > > > > > > > > >> > > > >> e...@confluent.io>
> > > > > > > > > > > >> > > > >> wrote:
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >> > On Tue, Aug 11, 2015 at 11:29 PM,
> Jiangjie
> > > Qin
> > > > > > > > > > > >> > > > >> <j...@linkedin.com.invalid>
> > > > > > > > > > > >> > > > >> > wrote:
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > > Ewen,
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > Thanks for the explanation.
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > For (1), I am more concerned about the
> > > > failure
> > > > > > case
> > > > > > > > > > instead
> > > > > > > > > > > >> of
> > > > > > > > > > > >> > > > normal
> > > > > > > > > > > >> > > > >> > case.
> > > > > > > > > > > >> > > > >> > > What if a consumer somehow was kick out
> > of
> > > a
> > > > > > group
> > > > > > > > but
> > > > > > > > > is
> > > > > > > > > > > >> still
> > > > > > > > > > > >> > > > >> consuming
> > > > > > > > > > > >> > > > >> > > and committing offsets? Does that mean
> > the
> > > > new
> > > > > > > owner
> > > > > > > > > and
> > > > > > > > > > > old
> > > > > > > > > > > >> > owner
> > > > > > > > > > > >> > > > >> might
> > > > > > > > > > > >> > > > >> > > potentially consuming from and
> committing
> > > > > offsets
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > >> same
> > > > > > > > > > > >> > > > >> partition?
> > > > > > > > > > > >> > > > >> > > In the old consumer, this won't happen
> > > > because
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > >> consumer
> > > > > > > > > > > >> > > will
> > > > > > > > > > > >> > > > >> not
> > > > > > > > > > > >> > > > >> > be
> > > > > > > > > > > >> > > > >> > > able to start consumption unless the
> > > previous
> > > > > > owner
> > > > > > > > has
> > > > > > > > > > > >> released
> > > > > > > > > > > >> > > its
> > > > > > > > > > > >> > > > >> > > ownership. Basically, without the
> > ownership
> > > > > > > > guarantee,
> > > > > > > > > I
> > > > > > > > > > > >> don't
> > > > > > > > > > > >> > see
> > > > > > > > > > > >> > > > how
> > > > > > > > > > > >> > > > >> > the
> > > > > > > > > > > >> > > > >> > > communication among consumers
> themselves
> > > > alone
> > > > > > can
> > > > > > > > > solve
> > > > > > > > > > > the
> > > > > > > > > > > >> > > problem
> > > > > > > > > > > >> > > > >> > here.
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > The generation ID check still applies to
> > > offset
> > > > > > > > commits.
> > > > > > > > > If
> > > > > > > > > > > >> one of
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> > consumers is kicked out and misbehaving,
> it
> > > can
> > > > > > > > obviously
> > > > > > > > > > > still
> > > > > > > > > > > >> > > fetch
> > > > > > > > > > > >> > > > >> and
> > > > > > > > > > > >> > > > >> > process messages, but offset commits will
> > not
> > > > > work
> > > > > > > > since
> > > > > > > > > it
> > > > > > > > > > > >> will
> > > > > > > > > > > >> > not
> > > > > > > > > > > >> > > > >> have
> > > > > > > > > > > >> > > > >> > the current generation ID.
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > For (2) and (3), now I understand how
> > > > metadata
> > > > > > are
> > > > > > > > > used.
> > > > > > > > > > > But
> > > > > > > > > > > >> I
> > > > > > > > > > > >> > > still
> > > > > > > > > > > >> > > > >> > don't
> > > > > > > > > > > >> > > > >> > > see why should we let the consumers to
> > pass
> > > > the
> > > > > > > topic
> > > > > > > > > > > >> > information
> > > > > > > > > > > >> > > > >> across
> > > > > > > > > > > >> > > > >> > > instead of letting coordinator give the
> > > > > > > information.
> > > > > > > > > The
> > > > > > > > > > > >> single
> > > > > > > > > > > >> > > > >> producer
> > > > > > > > > > > >> > > > >> > > use case does not solve the ownership
> > > problem
> > > > > in
> > > > > > > > > abnormal
> > > > > > > > > > > >> case
> > > > > > > > > > > >> > > > either,
> > > > > > > > > > > >> > > > >> > > which seems to be a little bit
> > vulnerable.
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > One of the goals here was to generalize
> > group
> > > > > > > > membership
> > > > > > > > > so
> > > > > > > > > > > we
> > > > > > > > > > > >> > can,
> > > > > > > > > > > >> > > > for
> > > > > > > > > > > >> > > > >> > example, use it for balancing Copycat
> tasks
> > > > > across
> > > > > > > > > workers.
> > > > > > > > > > > >> > There's
> > > > > > > > > > > >> > > no
> > > > > > > > > > > >> > > > >> > topic subscription info in that case. The
> > > > > metadata
> > > > > > > for
> > > > > > > > > > > copycat
> > > > > > > > > > > >> > > workers
> > > > > > > > > > > >> > > > >> > would instead need to somehow indicate
> the
> > > > > current
> > > > > > > set
> > > > > > > > of
> > > > > > > > > > > tasks
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> > > > >> need
> > > > > > > > > > > >> > > > >> > to be assigned to workers. By making the
> > > > metadata
> > > > > > > > > > completely
> > > > > > > > > > > >> > opaque
> > > > > > > > > > > >> > > to
> > > > > > > > > > > >> > > > >> the
> > > > > > > > > > > >> > > > >> > protocol, it becomes more generally
> useful
> > > > since
> > > > > it
> > > > > > > > > focuses
> > > > > > > > > > > >> > squarely
> > > > > > > > > > > >> > > > on
> > > > > > > > > > > >> > > > >> the
> > > > > > > > > > > >> > > > >> > group membership problem, allowing for
> that
> > > > > > > additional
> > > > > > > > > bit
> > > > > > > > > > of
> > > > > > > > > > > >> > > metadata
> > > > > > > > > > > >> > > > >> so
> > > > > > > > > > > >> > > > >> > you don't just get a list of members, but
> > > also
> > > > > get
> > > > > > a
> > > > > > > > > little
> > > > > > > > > > > >> bit of
> > > > > > > > > > > >> > > > info
> > > > > > > > > > > >> > > > >> > about each of them.
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > A different option that we explored is to
> > > use a
> > > > > > sort
> > > > > > > of
> > > > > > > > > > mixed
> > > > > > > > > > > >> > model
> > > > > > > > > > > >> > > --
> > > > > > > > > > > >> > > > >> > still bake all the topic subscriptions
> > > directly
> > > > > > into
> > > > > > > > the
> > > > > > > > > > > >> protocol
> > > > > > > > > > > >> > > but
> > > > > > > > > > > >> > > > >> also
> > > > > > > > > > > >> > > > >> > include metadata. That would allow us to
> > > > maintain
> > > > > > the
> > > > > > > > > > > existing
> > > > > > > > > > > >> > > > >> > coordinator-driven approach to handling
> the
> > > > > > metadata
> > > > > > > > and
> > > > > > > > > > > change
> > > > > > > > > > > >> > > events
> > > > > > > > > > > >> > > > >> like
> > > > > > > > > > > >> > > > >> > the ones Onur pointed out. Then something
> > > like
> > > > > the
> > > > > > > > > Copycat
> > > > > > > > > > > >> workers
> > > > > > > > > > > >> > > > would
> > > > > > > > > > > >> > > > >> > just not fill in any topic subscriptions
> > and
> > > it
> > > > > > would
> > > > > > > > be
> > > > > > > > > > > >> handled
> > > > > > > > > > > >> > as
> > > > > > > > > > > >> > > a
> > > > > > > > > > > >> > > > >> > degenerate case. Based on the way I
> > explained
> > > > > that
> > > > > > we
> > > > > > > > can
> > > > > > > > > > > >> handle
> > > > > > > > > > > >> > > those
> > > > > > > > > > > >> > > > >> > types of events, I personally feel its
> > > cleaner
> > > > > and
> > > > > > a
> > > > > > > > > nicer
> > > > > > > > > > > >> > > > >> generalization
> > > > > > > > > > > >> > > > >> > to not include the subscriptions in the
> > join
> > > > > group
> > > > > > > > > > protocol,
> > > > > > > > > > > >> > making
> > > > > > > > > > > >> > > it
> > > > > > > > > > > >> > > > >> part
> > > > > > > > > > > >> > > > >> > of the metadata instead.
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > For the single producer case, are you
> > saying
> > > it
> > > > > > > doesn't
> > > > > > > > > > solve
> > > > > > > > > > > >> > > > ownership
> > > > > > > > > > > >> > > > >> in
> > > > > > > > > > > >> > > > >> > the abnormal case because a producer that
> > > > doesn't
> > > > > > > know
> > > > > > > > it
> > > > > > > > > > has
> > > > > > > > > > > >> been
> > > > > > > > > > > >> > > > >> kicked
> > > > > > > > > > > >> > > > >> > out of the group yet can still produce
> data
> > > > even
> > > > > > > though
> > > > > > > > > it
> > > > > > > > > > > >> > shouldn't
> > > > > > > > > > > >> > > > be
> > > > > > > > > > > >> > > > >> > able to anymore? I definitely agree that
> > that
> > > > is
> > > > > a
> > > > > > > risk
> > > > > > > > > --
> > > > > > > > > > > this
> > > > > > > > > > > >> > > > >> provides a
> > > > > > > > > > > >> > > > >> > way to get closer to a true
> single-writer,
> > > but
> > > > > > there
> > > > > > > > are
> > > > > > > > > > > >> > definitely
> > > > > > > > > > > >> > > > >> still
> > > > > > > > > > > >> > > > >> > failure modes that this does not address.
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > -Ewen
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > Thanks,
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > Jiangjie (Becket) Qin
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > On Tue, Aug 11, 2015 at 11:06 PM, Ewen
> > > > > > > > > Cheslack-Postava <
> > > > > > > > > > > >> > > > >> > e...@confluent.io
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > wrote:
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > > On Tue, Aug 11, 2015 at 10:15 PM,
> > > Jiangjie
> > > > > Qin
> > > > > > > > > > > >> > > > >> > <j...@linkedin.com.invalid
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > wrote:
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > > Hi Jason,
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > > Thanks for writing this up. It
> would
> > be
> > > > > > useful
> > > > > > > to
> > > > > > > > > > > >> generalize
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> > group
> > > > > > > > > > > >> > > > >> > > > > concept. I have a few questions
> > below.
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > > 1. In old consumer actually the
> > > partition
> > > > > > > > > assignment
> > > > > > > > > > > are
> > > > > > > > > > > >> > done
> > > > > > > > > > > >> > > by
> > > > > > > > > > > >> > > > >> > > > consumers
> > > > > > > > > > > >> > > > >> > > > > themselves. We used zookeeper to
> > > > guarantee
> > > > > > > that a
> > > > > > > > > > > >> partition
> > > > > > > > > > > >> > > will
> > > > > > > > > > > >> > > > >> only
> > > > > > > > > > > >> > > > >> > > be
> > > > > > > > > > > >> > > > >> > > > > consumed by one consumer thread who
> > > > > > > successfully
> > > > > > > > > > > claimed
> > > > > > > > > > > >> its
> > > > > > > > > > > >> > > > >> > ownership.
> > > > > > > > > > > >> > > > >> > > > > Does the new protocol plan to
> provide
> > > the
> > > > > > same
> > > > > > > > > > > guarantee?
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > Once you have all the metadata from
> all
> > > the
> > > > > > > > > consumers,
> > > > > > > > > > > >> > > assignment
> > > > > > > > > > > >> > > > >> > should
> > > > > > > > > > > >> > > > >> > > > just be a simple function mapping
> that
> > > > > > > > > Map<ConsumerId,
> > > > > > > > > > > >> > Metadata>
> > > > > > > > > > > >> > > > to
> > > > > > > > > > > >> > > > >> > > > Map<ConsumerId,
> List<TopicPartition>>.
> > If
> > > > > > > everyone
> > > > > > > > is
> > > > > > > > > > > >> > consistent
> > > > > > > > > > > >> > > > in
> > > > > > > > > > > >> > > > >> > > > computing that, you don't need ZK
> > > involved
> > > > at
> > > > > > > all.
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > In practice, this shouldn't be that
> > hard
> > > to
> > > > > > > ensure
> > > > > > > > > for
> > > > > > > > > > > most
> > > > > > > > > > > >> > > > >> assignment
> > > > > > > > > > > >> > > > >> > > > strategies just by having decent unit
> > > > testing
> > > > > > on
> > > > > > > > > them.
> > > > > > > > > > > You
> > > > > > > > > > > >> > just
> > > > > > > > > > > >> > > > >> have to
> > > > > > > > > > > >> > > > >> > > do
> > > > > > > > > > > >> > > > >> > > > things like ensure your assignment
> > > strategy
> > > > > > sorts
> > > > > > > > > lists
> > > > > > > > > > > >> into a
> > > > > > > > > > > >> > > > >> > consistent
> > > > > > > > > > > >> > > > >> > > > order.
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > You do give up the ability to use
> some
> > > > > > techniques
> > > > > > > > > (e.g.
> > > > > > > > > > > any
> > > > > > > > > > > >> > > > >> randomized
> > > > > > > > > > > >> > > > >> > > > algorithm if you can't distribute the
> > > seed
> > > > w/
> > > > > > the
> > > > > > > > > > > metadata)
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > > it's
> > > > > > > > > > > >> > > > >> > true
> > > > > > > > > > > >> > > > >> > > > that nothing validates the
> assignment,
> > > but
> > > > if
> > > > > > > that
> > > > > > > > > > > >> assignment
> > > > > > > > > > > >> > > > >> algorithm
> > > > > > > > > > > >> > > > >> > > > step is kept simple, small, and well
> > > > tested,
> > > > > > the
> > > > > > > > risk
> > > > > > > > > > is
> > > > > > > > > > > >> very
> > > > > > > > > > > >> > > > >> minimal.
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > > 2. It looks that both
> > JoinGroupRequest
> > > > and
> > > > > > > > > > > >> JoinGroupResponse
> > > > > > > > > > > >> > > has
> > > > > > > > > > > >> > > > >> the
> > > > > > > > > > > >> > > > >> > > > >
> > > > > ProtocolMetadata.AssignmentStrategyMetadata,
> > > > > > > what
> > > > > > > > > > would
> > > > > > > > > > > >> be
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > >> > metadata
> > > > > > > > > > > >> > > > >> > > > be
> > > > > > > > > > > >> > > > >> > > > > sent and returned by coordinator?
> How
> > > > will
> > > > > > the
> > > > > > > > > > > >> coordinator
> > > > > > > > > > > >> > > > handle
> > > > > > > > > > > >> > > > >> the
> > > > > > > > > > > >> > > > >> > > > > metadata?
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > The coordinator is basically just
> > blindly
> > > > > > > > > broadcasting
> > > > > > > > > > > all
> > > > > > > > > > > >> of
> > > > > > > > > > > >> > it
> > > > > > > > > > > >> > > > to
> > > > > > > > > > > >> > > > >> > group
> > > > > > > > > > > >> > > > >> > > > members so they have a consistent
> view.
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > So from the coordinators perspective,
> > it
> > > > sees
> > > > > > > > > something
> > > > > > > > > > > >> like:
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > Consumer 1 -> JoinGroupRequest with
> > > > > > > GroupProtocols
> > > > > > > > =
> > > > > > > > > [
> > > > > > > > > > > >> > > "consumer"
> > > > > > > > > > > >> > > > >> > > > <Consumer1 opaque byte[]>]
> > > > > > > > > > > >> > > > >> > > > Consumer 2 -> JoinGroupRequest with
> > > > > > > GroupProtocols
> > > > > > > > =
> > > > > > > > > [
> > > > > > > > > > > >> > > "consumer"
> > > > > > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > Then, in the responses would look
> like:
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > Consumer 1 <- JoinGroupResponse with
> > > > > > > GroupProtocol
> > > > > > > > =
> > > > > > > > > > > >> > "consumer"
> > > > > > > > > > > >> > > > and
> > > > > > > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1
> <Consumer1
> > > > opaque
> > > > > > > > > byte[]>,
> > > > > > > > > > > >> > Consumer
> > > > > > > > > > > >> > > 2
> > > > > > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > > > > > >> > > > >> > > > Consumer 2 <- JoinGroupResponse with
> > > > > > > GroupProtocol
> > > > > > > > =
> > > > > > > > > > > >> > "consumer"
> > > > > > > > > > > >> > > > and
> > > > > > > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1
> <Consumer1
> > > > opaque
> > > > > > > > > byte[]>,
> > > > > > > > > > > >> > Consumer
> > > > > > > > > > > >> > > 2
> > > > > > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > So all the responses include all the
> > > > metadata
> > > > > > for
> > > > > > > > > every
> > > > > > > > > > > >> member
> > > > > > > > > > > >> > > in
> > > > > > > > > > > >> > > > >> the
> > > > > > > > > > > >> > > > >> > > > group, and everyone can use that to
> > > > > > consistently
> > > > > > > > > decide
> > > > > > > > > > > on
> > > > > > > > > > > >> > > > >> assignment.
> > > > > > > > > > > >> > > > >> > > The
> > > > > > > > > > > >> > > > >> > > > broker doesn't care and cannot even
> > > > > understand
> > > > > > > the
> > > > > > > > > > > metadata
> > > > > > > > > > > >> > > since
> > > > > > > > > > > >> > > > >> the
> > > > > > > > > > > >> > > > >> > > data
> > > > > > > > > > > >> > > > >> > > > format for it is dependent on the
> > > > assignment
> > > > > > > > strategy
> > > > > > > > > > > being
> > > > > > > > > > > >> > > used.
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > As another example that is *not* a
> > > > consumer,
> > > > > > > let's
> > > > > > > > > say
> > > > > > > > > > > you
> > > > > > > > > > > >> > just
> > > > > > > > > > > >> > > > >> want to
> > > > > > > > > > > >> > > > >> > > > have a single writer in the group
> which
> > > > > > everyone
> > > > > > > > will
> > > > > > > > > > > >> forward
> > > > > > > > > > > >> > > > >> requests
> > > > > > > > > > > >> > > > >> > > to.
> > > > > > > > > > > >> > > > >> > > > To accomplish this, you could use a
> > very
> > > > dumb
> > > > > > > > > > assignment
> > > > > > > > > > > >> > > strategy:
> > > > > > > > > > > >> > > > >> > there
> > > > > > > > > > > >> > > > >> > > is
> > > > > > > > > > > >> > > > >> > > > no metadata (empty byte[]) and all we
> > > care
> > > > > > about
> > > > > > > is
> > > > > > > > > who
> > > > > > > > > > > is
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > > > first
> > > > > > > > > > > >> > > > >> > > member
> > > > > > > > > > > >> > > > >> > > > in the group (e.g. when IDs are
> sorted
> > > > > > > > > > > lexicographically).
> > > > > > > > > > > >> > That
> > > > > > > > > > > >> > > > >> member
> > > > > > > > > > > >> > > > >> > is
> > > > > > > > > > > >> > > > >> > > > selected as the writer. In that case,
> > we
> > > > > > actually
> > > > > > > > > just
> > > > > > > > > > > care
> > > > > > > > > > > >> > > about
> > > > > > > > > > > >> > > > >> the
> > > > > > > > > > > >> > > > >> > > > membership list, there's no
> additional
> > > info
> > > > > > about
> > > > > > > > > each
> > > > > > > > > > > >> member
> > > > > > > > > > > >> > > that
> > > > > > > > > > > >> > > > >> is
> > > > > > > > > > > >> > > > >> > > > required to determine who is the
> > writer.
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > > 3. Do you mean that the number of
> > > > > partitions
> > > > > > in
> > > > > > > > > > > >> > > > JoinGroupResponse
> > > > > > > > > > > >> > > > >> > will
> > > > > > > > > > > >> > > > >> > > be
> > > > > > > > > > > >> > > > >> > > > > the max partition number of a topic
> > > among
> > > > > all
> > > > > > > the
> > > > > > > > > > > >> reported
> > > > > > > > > > > >> > > > >> partition
> > > > > > > > > > > >> > > > >> > > > number
> > > > > > > > > > > >> > > > >> > > > > by consumers? Is there any reason
> not
> > > > just
> > > > > > let
> > > > > > > > > > > >> Coordinator
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> > > > >> return
> > > > > > > > > > > >> > > > >> > > the
> > > > > > > > > > > >> > > > >> > > > > number of partitions of a topic in
> > its
> > > > > > metadata
> > > > > > > > > > cache?
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > Nothing from the embedded protocol is
> > > > touched
> > > > > > by
> > > > > > > > the
> > > > > > > > > > > >> broker.
> > > > > > > > > > > >> > The
> > > > > > > > > > > >> > > > >> broker
> > > > > > > > > > > >> > > > >> > > > just collects opaque bytes of
> metadata,
> > > > does
> > > > > > the
> > > > > > > > > > > selection
> > > > > > > > > > > >> of
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> > > strategy
> > > > > > > > > > > >> > > > >> > > > if multiple are supported by some
> > > > consumers,
> > > > > > and
> > > > > > > > then
> > > > > > > > > > > >> returns
> > > > > > > > > > > >> > > that
> > > > > > > > > > > >> > > > >> > opaque
> > > > > > > > > > > >> > > > >> > > > metadata for all the members back to
> > > every
> > > > > > > member.
> > > > > > > > In
> > > > > > > > > > > that
> > > > > > > > > > > >> way
> > > > > > > > > > > >> > > > they
> > > > > > > > > > > >> > > > >> all
> > > > > > > > > > > >> > > > >> > > > have a consistent view of the group.
> > For
> > > > > > regular
> > > > > > > > > > > consumers,
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> > > > >> view
> > > > > > > > > > > >> > > > >> > of
> > > > > > > > > > > >> > > > >> > > > the group includes information about
> > how
> > > > many
> > > > > > > > > > partitions
> > > > > > > > > > > >> each
> > > > > > > > > > > >> > > > >> consumer
> > > > > > > > > > > >> > > > >> > > > currently thinks the topics it is
> > > > subscribed
> > > > > to
> > > > > > > > has.
> > > > > > > > > > > These
> > > > > > > > > > > >> > could
> > > > > > > > > > > >> > > > be
> > > > > > > > > > > >> > > > >> > > > inconsistent due to out of date
> > metadata
> > > > and
> > > > > it
> > > > > > > > would
> > > > > > > > > > be
> > > > > > > > > > > >> up to
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> > > > assignment strategy on the *client*
> to
> > > > > resolve
> > > > > > > > that.
> > > > > > > > > As
> > > > > > > > > > > you
> > > > > > > > > > > >> > > point
> > > > > > > > > > > >> > > > >> out,
> > > > > > > > > > > >> > > > >> > in
> > > > > > > > > > > >> > > > >> > > > that case they could just take the
> max
> > > > value
> > > > > > that
> > > > > > > > any
> > > > > > > > > > > >> consumer
> > > > > > > > > > > >> > > > >> reported
> > > > > > > > > > > >> > > > >> > > > seeing and use that. The consumers
> that
> > > > > notice
> > > > > > > that
> > > > > > > > > > their
> > > > > > > > > > > >> > > metadata
> > > > > > > > > > > >> > > > >> had
> > > > > > > > > > > >> > > > >> > a
> > > > > > > > > > > >> > > > >> > > > smaller # of partitions should also
> > > > trigger a
> > > > > > > > > metadata
> > > > > > > > > > > >> update
> > > > > > > > > > > >> > > when
> > > > > > > > > > > >> > > > >> they
> > > > > > > > > > > >> > > > >> > > see
> > > > > > > > > > > >> > > > >> > > > someone else observing a larger # of
> > > > > > partitions.
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > > Thanks,
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > > On Tue, Aug 11, 2015 at 1:19 PM,
> > Jason
> > > > > > > Gustafson
> > > > > > > > <
> > > > > > > > > > > >> > > > >> ja...@confluent.io
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> > > > > wrote:
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > > > > Hi Kafka Devs,
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > > > One of the nagging issues in the
> > > > current
> > > > > > > design
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > >> new
> > > > > > > > > > > >> > > > >> consumer
> > > > > > > > > > > >> > > > >> > > has
> > > > > > > > > > > >> > > > >> > > > > > been the need to support a
> variety
> > of
> > > > > > > > assignment
> > > > > > > > > > > >> > strategies.
> > > > > > > > > > > >> > > > >> We've
> > > > > > > > > > > >> > > > >> > > > > > encountered this in particular in
> > the
> > > > > > design
> > > > > > > of
> > > > > > > > > > > copycat
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > > the
> > > > > > > > > > > >> > > > >> > > > > processing
> > > > > > > > > > > >> > > > >> > > > > > framework (KIP-28). From what I
> > > > > understand,
> > > > > > > > Samza
> > > > > > > > > > > also
> > > > > > > > > > > >> > has a
> > > > > > > > > > > >> > > > >> number
> > > > > > > > > > > >> > > > >> > > of
> > > > > > > > > > > >> > > > >> > > > > use
> > > > > > > > > > > >> > > > >> > > > > > cases with custom assignment
> needs.
> > > The
> > > > > new
> > > > > > > > > > consumer
> > > > > > > > > > > >> > > protocol
> > > > > > > > > > > >> > > > >> > > supports
> > > > > > > > > > > >> > > > >> > > > > new
> > > > > > > > > > > >> > > > >> > > > > > assignment strategies by hooking
> > them
> > > > > into
> > > > > > > the
> > > > > > > > > > > broker.
> > > > > > > > > > > >> For
> > > > > > > > > > > >> > > > many
> > > > > > > > > > > >> > > > >> > > > > > environments, this is a major
> pain
> > > and
> > > > in
> > > > > > > some
> > > > > > > > > > > cases, a
> > > > > > > > > > > >> > > > >> > non-starter.
> > > > > > > > > > > >> > > > >> > > It
> > > > > > > > > > > >> > > > >> > > > > > also challenges the validation
> that
> > > the
> > > > > > > > > coordinator
> > > > > > > > > > > can
> > > > > > > > > > > >> > > > provide.
> > > > > > > > > > > >> > > > >> > For
> > > > > > > > > > > >> > > > >> > > > > > example, some assignment
> strategies
> > > > call
> > > > > > for
> > > > > > > > > > > >> partitions to
> > > > > > > > > > > >> > > be
> > > > > > > > > > > >> > > > >> > > assigned
> > > > > > > > > > > >> > > > >> > > > > > multiple times, which means that
> > the
> > > > > > > > coordinator
> > > > > > > > > > can
> > > > > > > > > > > >> only
> > > > > > > > > > > >> > > > check
> > > > > > > > > > > >> > > > >> > that
> > > > > > > > > > > >> > > > >> > > > > > partitions have been assigned at
> > > least
> > > > > > once.
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > > > To solve these issues, we'd like
> to
> > > > > propose
> > > > > > > > > moving
> > > > > > > > > > > >> > > assignment
> > > > > > > > > > > >> > > > to
> > > > > > > > > > > >> > > > >> > the
> > > > > > > > > > > >> > > > >> > > > > > client. I've written a wiki which
> > > > > outlines
> > > > > > > some
> > > > > > > > > > > >> protocol
> > > > > > > > > > > >> > > > >> changes to
> > > > > > > > > > > >> > > > >> > > > > achieve
> > > > > > > > > > > >> > > > >> > > > > > this:
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > > > > > > > > > > >> > > > >> > > > > > .
> > > > > > > > > > > >> > > > >> > > > > > To summarize briefly, instead of
> > the
> > > > > > > > coordinator
> > > > > > > > > > > >> assigning
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> > > > partitions
> > > > > > > > > > > >> > > > >> > > > > > itself, all subscriptions are
> > > forwarded
> > > > > to
> > > > > > > each
> > > > > > > > > > > member
> > > > > > > > > > > >> of
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> group
> > > > > > > > > > > >> > > > >> > > > which
> > > > > > > > > > > >> > > > >> > > > > > then decides independently which
> > > > > partitions
> > > > > > > it
> > > > > > > > > > should
> > > > > > > > > > > >> > > consume.
> > > > > > > > > > > >> > > > >> The
> > > > > > > > > > > >> > > > >> > > > > protocol
> > > > > > > > > > > >> > > > >> > > > > > provides a mechanism for the
> > > > coordinator
> > > > > to
> > > > > > > > > > validate
> > > > > > > > > > > >> that
> > > > > > > > > > > >> > > all
> > > > > > > > > > > >> > > > >> > > consumers
> > > > > > > > > > > >> > > > >> > > > > use
> > > > > > > > > > > >> > > > >> > > > > > the same assignment strategy, but
> > it
> > > > does
> > > > > > not
> > > > > > > > > > ensure
> > > > > > > > > > > >> that
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > >> > > resulting
> > > > > > > > > > > >> > > > >> > > > > > assignment is "correct." This
> > > provides
> > > > a
> > > > > > > > powerful
> > > > > > > > > > > >> > capability
> > > > > > > > > > > >> > > > for
> > > > > > > > > > > >> > > > >> > > users
> > > > > > > > > > > >> > > > >> > > > to
> > > > > > > > > > > >> > > > >> > > > > > control the full data flow on the
> > > > client
> > > > > > > side.
> > > > > > > > > They
> > > > > > > > > > > >> > control
> > > > > > > > > > > >> > > > how
> > > > > > > > > > > >> > > > >> > data
> > > > > > > > > > > >> > > > >> > > is
> > > > > > > > > > > >> > > > >> > > > > > written to partitions through the
> > > > > > Partitioner
> > > > > > > > > > > interface
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > > they
> > > > > > > > > > > >> > > > >> > > > control
> > > > > > > > > > > >> > > > >> > > > > > how data is consumed through the
> > > > > assignment
> > > > > > > > > > strategy,
> > > > > > > > > > > >> all
> > > > > > > > > > > >> > > > >> without
> > > > > > > > > > > >> > > > >> > > > > touching
> > > > > > > > > > > >> > > > >> > > > > > the server.
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > > > Of course nothing comes for free.
> > In
> > > > > > > > particular,
> > > > > > > > > > this
> > > > > > > > > > > >> > change
> > > > > > > > > > > >> > > > >> > removes
> > > > > > > > > > > >> > > > >> > > > the
> > > > > > > > > > > >> > > > >> > > > > > ability of the coordinator to
> > > validate
> > > > > that
> > > > > > > > > commits
> > > > > > > > > > > are
> > > > > > > > > > > >> > made
> > > > > > > > > > > >> > > > by
> > > > > > > > > > > >> > > > >> > > > consumers
> > > > > > > > > > > >> > > > >> > > > > > who were assigned the respective
> > > > > partition.
> > > > > > > > This
> > > > > > > > > > > might
> > > > > > > > > > > >> not
> > > > > > > > > > > >> > > be
> > > > > > > > > > > >> > > > >> too
> > > > > > > > > > > >> > > > >> > bad
> > > > > > > > > > > >> > > > >> > > > > since
> > > > > > > > > > > >> > > > >> > > > > > we retain the ability to validate
> > the
> > > > > > > > generation
> > > > > > > > > > id,
> > > > > > > > > > > >> but
> > > > > > > > > > > >> > it
> > > > > > > > > > > >> > > > is a
> > > > > > > > > > > >> > > > >> > > > > potential
> > > > > > > > > > > >> > > > >> > > > > > concern. We have considered
> > > alternative
> > > > > > > > protocols
> > > > > > > > > > > which
> > > > > > > > > > > >> > add
> > > > > > > > > > > >> > > a
> > > > > > > > > > > >> > > > >> > second
> > > > > > > > > > > >> > > > >> > > > > > round-trip to the protocol in
> order
> > > to
> > > > > give
> > > > > > > the
> > > > > > > > > > > >> > coordinator
> > > > > > > > > > > >> > > > the
> > > > > > > > > > > >> > > > >> > > ability
> > > > > > > > > > > >> > > > >> > > > > to
> > > > > > > > > > > >> > > > >> > > > > > confirm the assignment. As
> > mentioned
> > > > > above,
> > > > > > > the
> > > > > > > > > > > >> > coordinator
> > > > > > > > > > > >> > > is
> > > > > > > > > > > >> > > > >> > > somewhat
> > > > > > > > > > > >> > > > >> > > > > > limited in what it can actually
> > > > validate,
> > > > > > but
> > > > > > > > > this
> > > > > > > > > > > >> would
> > > > > > > > > > > >> > > > return
> > > > > > > > > > > >> > > > >> its
> > > > > > > > > > > >> > > > >> > > > > ability
> > > > > > > > > > > >> > > > >> > > > > > to validate commits. The tradeoff
> > is
> > > > that
> > > > > > it
> > > > > > > > > > > increases
> > > > > > > > > > > >> the
> > > > > > > > > > > >> > > > >> > protocol's
> > > > > > > > > > > >> > > > >> > > > > > complexity which means more ways
> > for
> > > > the
> > > > > > > > protocol
> > > > > > > > > > to
> > > > > > > > > > > >> fail
> > > > > > > > > > > >> > > and
> > > > > > > > > > > >> > > > >> > > > > consequently
> > > > > > > > > > > >> > > > >> > > > > > more edge cases in the code.
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > > > It also misses an opportunity to
> > > > > generalize
> > > > > > > the
> > > > > > > > > > group
> > > > > > > > > > > >> > > > membership
> > > > > > > > > > > >> > > > >> > > > protocol
> > > > > > > > > > > >> > > > >> > > > > > for additional use cases. In
> fact,
> > > > after
> > > > > > > you've
> > > > > > > > > > gone
> > > > > > > > > > > to
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> > > > >> trouble
> > > > > > > > > > > >> > > > >> > > of
> > > > > > > > > > > >> > > > >> > > > > > moving assignment to the client,
> > the
> > > > main
> > > > > > > thing
> > > > > > > > > > that
> > > > > > > > > > > is
> > > > > > > > > > > >> > left
> > > > > > > > > > > >> > > > in
> > > > > > > > > > > >> > > > >> > this
> > > > > > > > > > > >> > > > >> > > > > > protocol is basically a general
> > group
> > > > > > > > management
> > > > > > > > > > > >> > capability.
> > > > > > > > > > > >> > > > >> This
> > > > > > > > > > > >> > > > >> > is
> > > > > > > > > > > >> > > > >> > > > > > exactly what is needed for a few
> > > cases
> > > > > that
> > > > > > > are
> > > > > > > > > > > >> currently
> > > > > > > > > > > >> > > > under
> > > > > > > > > > > >> > > > >> > > > > discussion
> > > > > > > > > > > >> > > > >> > > > > > (e.g. copycat or single-writer
> > > > producer).
> > > > > > > We've
> > > > > > > > > > taken
> > > > > > > > > > > >> this
> > > > > > > > > > > >> > > > >> further
> > > > > > > > > > > >> > > > >> > > step
> > > > > > > > > > > >> > > > >> > > > > in
> > > > > > > > > > > >> > > > >> > > > > > the proposal and attempted to
> > > envision
> > > > > what
> > > > > > > > that
> > > > > > > > > > > >> general
> > > > > > > > > > > >> > > > >> protocol
> > > > > > > > > > > >> > > > >> > > might
> > > > > > > > > > > >> > > > >> > > > > > look like and how it could be
> used
> > > both
> > > > > by
> > > > > > > the
> > > > > > > > > > > consumer
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> > > > for
> > > > > > > > > > > >> > > > >> > some
> > > > > > > > > > > >> > > > >> > > of
> > > > > > > > > > > >> > > > >> > > > > > these other cases.
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > > > Anyway, since time is running out
> > on
> > > > the
> > > > > > new
> > > > > > > > > > > consumer,
> > > > > > > > > > > >> we
> > > > > > > > > > > >> > > have
> > > > > > > > > > > >> > > > >> > > perhaps
> > > > > > > > > > > >> > > > >> > > > > one
> > > > > > > > > > > >> > > > >> > > > > > last chance to consider a
> > significant
> > > > > > change
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > >> > protocol
> > > > > > > > > > > >> > > > >> like
> > > > > > > > > > > >> > > > >> > > this,
> > > > > > > > > > > >> > > > >> > > > > so
> > > > > > > > > > > >> > > > >> > > > > > have a look at the wiki and share
> > > your
> > > > > > > > thoughts.
> > > > > > > > > > I've
> > > > > > > > > > > >> no
> > > > > > > > > > > >> > > doubt
> > > > > > > > > > > >> > > > >> that
> > > > > > > > > > > >> > > > >> > > > some
> > > > > > > > > > > >> > > > >> > > > > > ideas seem clearer in my mind
> than
> > > they
> > > > > do
> > > > > > on
> > > > > > > > > > paper,
> > > > > > > > > > > so
> > > > > > > > > > > >> > ask
> > > > > > > > > > > >> > > > >> > questions
> > > > > > > > > > > >> > > > >> > > > if
> > > > > > > > > > > >> > > > >> > > > > > there is any confusion.
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > > > Thanks!
> > > > > > > > > > > >> > > > >> > > > > > Jason
> > > > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > > > >> > > > >> > > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > > > --
> > > > > > > > > > > >> > > > >> > > > Thanks,
> > > > > > > > > > > >> > > > >> > > > Ewen
> > > > > > > > > > > >> > > > >> > > >
> > > > > > > > > > > >> > > > >> > >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >> > --
> > > > > > > > > > > >> > > > >> > Thanks,
> > > > > > > > > > > >> > > > >> > Ewen
> > > > > > > > > > > >> > > > >> >
> > > > > > > > > > > >> > > > >>
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Neha
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Neha
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Thanks,
> > > > > > > > > > Ewen
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Thanks,
> > > > > > > > Ewen
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to