Hey Becket,

Thanks for raising these concerns. I think the rolling upgrade problem is
handled quite a bit better with the new consumer if Onur's LeaveGroup patch
(or a variant) is accepted. Without it, the coordinator has to wait for the
full session timeout to detect that a node has left the group, which can
significantly delay rebalancing.

-Jason

On Sun, Aug 16, 2015 at 11:42 AM, Neha Narkhede <n...@confluent.io> wrote:

> Hey Becket,
>
> These are all fair points. Regarding running Kafka as a service, it will be
> good for everyone to know some numbers around topic creation and changes
> around # of partitions. I don't think the test usage is a good one since no
> one should be creating and deleting topics in a loop on a production
> cluster.
>
> Regarding your point about mirror maker, I think we should definitely
> ensure that a mirror maker with a large topic subscription list should be
> well supported. The reason for the half an hour stalls in LinkedIn mirror
> makers is due to the fact that it still uses the old consumer. The 3 major
> reasons for long rebalance operations in the mirror maker using the old
> consumer are -
> 1. Every rebalance operation involves many writes to ZK. For large consumer
> groups and large number of partitions, this adds up to several seconds
> 2. The view of the members in a group is inconsistent and hence there are
> typically several rebalance attempts. Note that this is fixed in the new
> design where the group membership is always consistently communicated to
> all members.
> 3. The rebalance backoff time is high (of the order of seconds).
>
> The good news is that the new design has none of these problems. As we
> said, we will run some tests to ensure that the mirror maker case is
> handled well. Thank you for the feedback!
>
> Thanks,
> Neha
>
> On Sat, Aug 15, 2015 at 6:17 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Neha, Ewen and Jason,
> >
> > Maybe I am over concerning and I agree that it does depend on the
> metadata
> > change frequency. As Neha said, a few tests will be helpful. We can see
> how
> > it goes.
> >
> > What worries me is that in LinkedIn we are in the progress of running
> Kafka
> > as a service. That means user will have the ability to create/delete
> topics
> > and update their topic configurations programmatically or through UI
> using
> > LinkedIn internal cloud service platform. And some automated test will
> also
> > be consistently running to create topic, produce/consume some data, then
> > clean up the topics. This brings us some new challenges for mirror maker
> > because we need to make sure it actually copies data instead of spending
> > too much time on rebalance. As what we see now is that each rebalance
> will
> > probably take 30 seconds or more to finish even with some tricks and
> > tuning.
> >
> > Another related use case I want to bring up is that today when we want to
> > do a rolling upgrade of mirror maker (26 nodes), there will be two rounds
> > of rebalance to bounce each node, each rebalance takes about 30 seconds.
> So
> > bouncing 26 nodes takes roughly half an hour. Awkwardly, during the
> rolling
> > bounce, because mirror maker is keeping rebalancing, it actually does not
> > really copy any data! So the pipeline will literally stall for half an
> > hour! Since we are designing the new protocol, it will also be good it we
> > make sure this use case is addressed.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Aug 15, 2015 at 10:50 AM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > Becket,
> > >
> > > This is a clever approach for to ensure that only one thing
> communicates
> > > the metadata so even if it is stale, the entire group has the same
> view.
> > > However, the big assumption this makes is that the coordinator is that
> > one
> > > process that has the ability to know the metadata for group members,
> > which
> > > does not work for any non-consumer use case.
> > >
> > > I wonder if we may be complicating the design of 95% use cases for the
> > > remaining 5%. For instance, how many times do people create and remove
> > > topics or even add partitions? We operated LI clusters for a long time
> > and
> > > this wasn't a frequent event that would need us to optimize this design
> > > for.
> > >
> > > Also, this is something we can easily validate by running a few tests
> on
> > > the patch and I suggest we wait for that.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Sat, Aug 15, 2015 at 9:14 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Jiangjie,
> > > >
> > > > I was thinking about the same problem. When metadata is changing
> > > > frequently, the clients may not be able to ever find agreement on the
> > > > current state. The server doesn't have this problem, as you say,
> > because
> > > it
> > > > can just take a snapshot and send that to the clients. Adding a
> > dampening
> > > > setting to the client would help if churn is sporadic, but not if it
> is
> > > > steady. I think the metadata would have to be changing really
> > frequently
> > > > for this to be a problem practically, but this is a case where the
> > > > server-side approach has an advantage.
> > > >
> > > > Including the metadata in the join group response would require
> making
> > > the
> > > > subscription available to the coordinator, right? We lose a little
> bit
> > of
> > > > the generality of the protocol, but it might not be too bad since
> most
> > > use
> > > > cases for reusing this protocol have a similar need for metadata (and
> > > they
> > > > can always pass an empty subscription if they don't). We've gone back
> > and
> > > > forth a few times on this, and I'm generally not opposed. It might
> help
> > > if
> > > > we try to quantify the impact of the metadata churn in practice. I
> > think
> > > > the rate of change would have to be in the low seconds for this to
> be a
> > > > real problem. It does seem nice though that we have a way to manage
> > even
> > > > this much churn when we do this.
> > > >
> > > > -Jason
> > > >
> > > >
> > > >
> > > > On Fri, Aug 14, 2015 at 9:03 PM, Jiangjie Qin
> > <j...@linkedin.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Ewen,
> > > > >
> > > > > I agree that if there is a churn in metadata, the consumers need
> > > several
> > > > > rounds of rebalances to succeed. The difference I am thinking is
> that
> > > > with
> > > > > coordinator as single source of truth, we can let the consumer
> finish
> > > one
> > > > > round of rebalance, work for a while and start the next round of
> > > > rebalance.
> > > > > If we purely depend on the consumers to synchronize by themselves
> > based
> > > > on
> > > > > different metadata sources, is it possible we have some groups
> > > spending a
> > > > > lot of time on rebalancing but not able to make too much progress
> in
> > > > > consuming?
> > > > >
> > > > > I'm thinking can we let consumers to fetch metadata only from their
> > > > > coordinator? So the rebalance can be done in the following way:
> > > > >
> > > > > 1. Consumers refresh their metadata periodically
> > > > > 2. If one of the consumer sees a change in metadata that triggers a
> > > > > rebalance, it sends JoinGroupRequest to coordinator.
> > > > > 3. Once the coordinator receives the first JoinGroupRequest of a
> > group,
> > > > it
> > > > > takes a snapshot of current metadata and the group enters
> > > > prepare-rebalance
> > > > > state.
> > > > > 4. The metadata snapshot will be used for this round of rebalance.
> > i.e.
> > > > the
> > > > > metadata snapshot will be sent to consumers in JoinGroupResponse.
> > > > > 4.1 If the consumers are subscribing to explicit topic lists (not
> > > regex),
> > > > > the JoinGroupResponse needs only contain the metadata of all topics
> > the
> > > > > group is interested.
> > > > > 4.2 If the consumers are subscribing using regex, all the topic
> > > metadata
> > > > > will be returned to the consumer.
> > > > > 5. Consumers get JoinGroupResponse, refresh metadata using the
> > metadata
> > > > in
> > > > > JoinGroupResponse, run algorithm to assign partitions and start
> > > consume.
> > > > > 6. Go back to 1.
> > > > >
> > > > > The benefit here is that we can let rebalance finish in one round,
> > and
> > > > all
> > > > > the rest of changes will be captured in next consumer metadata
> > refresh
> > > -
> > > > so
> > > > > we get group commit. One concern might be letting consumer refresh
> > > > metadata
> > > > > from coordinator might cause issue for big consumer groups. Maybe
> > that
> > > is
> > > > > OK because metadata refresh is infrequent.
> > > > >
> > > > > This approach actually is very similar to what is proposed now:
> > > rebalance
> > > > > is triggered by metadata refresh, consumer provides subscription
> list
> > > to
> > > > > pass around. The only difference is that we don't need metadata
> hash
> > > > > anymore because the metadata is guaranteed to be the same.
> Replacing
> > > > > metadata hash with actual metadata will not have too much overhead
> > for
> > > > > small subscription groups. There will be some overhead for regex
> > > > > subscriptions, but this can save the potential extra round of
> > metadata
> > > > > fetch and will only occur when consumer see metadata change, which
> is
> > > > > infrequent.
> > > > >
> > > > > Any thoughts?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Fri, Aug 14, 2015 at 12:57 PM, Ewen Cheslack-Postava <
> > > > e...@confluent.io
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > On Fri, Aug 14, 2015 at 10:59 AM, Jiangjie Qin
> > > > <j...@linkedin.com.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Neha and Ewen,
> > > > > > >
> > > > > > > About the metadata change frequency. I guess it really depends
> on
> > > how
> > > > > > > frequent the metadata change might occur. If we run Kafka as a
> > > > > service, I
> > > > > > > can see that happens from time to time. As I can imagine people
> > > will
> > > > > > create
> > > > > > > some topic, test and maybe delete the topic in some automated
> > test.
> > > > If
> > > > > > so,
> > > > > > > the proposed protocol might be a little bit vulnerable.
> > > > > > >
> > > > > > > More specifically the scenario I am thinking is:
> > > > > > > 1. Consumer 0 periodically refresh metadata and detected a
> > metadata
> > > > > > change.
> > > > > > > So it sends a JoinGroupRequest with metadata_hash_0.
> > > > > > > 2. Consumer 1 was notified by controller to start a rebalance,
> so
> > > it
> > > > > > > refreshes its metadata and send a JoingGroupRequest with
> > > > > metadata_hash_1,
> > > > > > > which is different from metadata hash 0.
> > > > > > > 3. Rebalance failed and both consumer refresh there metadata
> > again
> > > > from
> > > > > > > different brokers.
> > > > > > > 4. Depending on the metadata change frequency(or some admin
> > > operation
> > > > > > like
> > > > > > > partition reassigment), they may or may not have the same
> > metadata
> > > > > > > returned, so the restart from 3 again.
> > > > > > >
> > > > > > > I agree that step 4 might not be a big concern if consumers
> > updates
> > > > > > > metadata at almost the same time, but I'm a little bit worried
> > > > whether
> > > > > > that
> > > > > > > assumption really stands because we do not have control over
> how
> > > > > frequent
> > > > > > > the metadata can change.
> > > > > > >
> > > > > > >
> > > > > > Is this really that different from what would happen if the
> > > coordinator
> > > > > > distributed the metadata to consumers? In that case you would
> > > trivially
> > > > > > have everyone in a consistent state, but those metadata changes
> > would
> > > > > still
> > > > > > cause churn and require JoinGroup rounds, during which processing
> > is
> > > > > > stalled for the nodes that are waiting on other members to
> re-join
> > > the
> > > > > > group.
> > > > > >
> > > > > > -Ewen
> > > > > >
> > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > > On Fri, Aug 14, 2015 at 2:03 AM, Ewen Cheslack-Postava <
> > > > > > e...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > On Thu, Aug 13, 2015 at 11:07 PM, Neha Narkhede <
> > > n...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Becket,
> > > > > > > > >
> > > > > > > > > As you say, the metadata hash addresses the concern you
> > > > originally
> > > > > > > raised
> > > > > > > > > about large topic subscriptions. Can you please list other
> > > > problems
> > > > > > you
> > > > > > > > are
> > > > > > > > > raising more clearly? It is more helpful to know problems
> > that
> > > > the
> > > > > > > > proposal
> > > > > > > > > does not address or addresses poorly.
> > > > > > > > >
> > > > > > > > > Regarding other things you said -
> > > > > > > > >
> > > > > > > > > it is required that each
> > > > > > > > > > consumer refresh their metadata before sending a
> > > > JoinGroupRequest
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > This is required for wildcard topic subscriptions anyway.
> So
> > > this
> > > > > > > > proposal
> > > > > > > > > does not introduce a regression. We had agreed earlier that
> > it
> > > > does
> > > > > > not
> > > > > > > > > make sense for the server to deserialize regular
> expressions
> > > sent
> > > > > by
> > > > > > > the
> > > > > > > > > consumer.
> > > > > > > > >
> > > > > > > >
> > > > > > > > I don't think consumers need to do a metadata refresh before
> > > > sending
> > > > > a
> > > > > > > > JoinGroupRequest. Metadata changes that affect assignment are
> > > rare
> > > > --
> > > > > > it
> > > > > > > > requires changing the number of partitions in a topic. But
> you
> > > > might
> > > > > > > send a
> > > > > > > > JoinGroupRequest simply because a new member is trying to
> join
> > > the
> > > > > > group.
> > > > > > > > That case is presumably much more common.
> > > > > > > >
> > > > > > > > I think it's actually a good idea to have the first JoinGroup
> > > cycle
> > > > > > fail
> > > > > > > in
> > > > > > > > some cases, and has little impact. Lets say the metadata does
> > > > change
> > > > > > > > because partitions are added. Then we might fail in the first
> > > > round,
> > > > > > but
> > > > > > > > then all members detect that issue *immediately*, refresh
> their
> > > > > > metadata,
> > > > > > > > and submit a new join group request. This second cycle does
> not
> > > > > > require a
> > > > > > > > full heartbeat cycle. It happens much more quickly because
> > > everyone
> > > > > > > > detected the inconsistency based on the first
> > JoinGroupResponse.
> > > > The
> > > > > > > > inconsistency should be resolved very quickly (barring other
> > > > failures
> > > > > > > like
> > > > > > > > a member leaving mid-rebalance)
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > the metadata might still be inconsistent if there is a
> > topic
> > > or
> > > > > > > > partition
> > > > > > > > > > change because the
> > > > > > > > > > UpdateMetadataRequest from controller might be handled at
> > > > > different
> > > > > > > > time.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Topic metadata does not change frequently and even if it
> > did, a
> > > > > > couple
> > > > > > > > > rebalance attempts will be needed whether the coordinator
> > > drives
> > > > > the
> > > > > > > > > assignments or the consumer. Because guess how the
> > coordinator
> > > > > knows
> > > > > > > > about
> > > > > > > > > the topic metadata changes -- indirectly through either a
> zk
> > > > > callback
> > > > > > > or
> > > > > > > > > UpdateMetadataRequest, so it is completely possible the
> > > > coordinator
> > > > > > > sees
> > > > > > > > > the topic metadata changes in batches, not all at once.
> > > > > > > > >
> > > > > > > >
> > > > > > > > > On Thu, Aug 13, 2015 at 10:50 PM, Neha Narkhede <
> > > > n...@confluent.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Ewen/Jason,
> > > > > > > > > >
> > > > > > > > > > The metadata hash is a clever approach and certainly
> > > addresses
> > > > > the
> > > > > > > > > problem
> > > > > > > > > > of large metadata for consumers like mirror maker. Few
> > > > comments -
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >    1. In the interest of simplifying the format of the
> > > > consumer's
> > > > > > > > > >    metadata - Why not just always include only the topic
> > > names
> > > > in
> > > > > > the
> > > > > > > > > metadata
> > > > > > > > > >    followed by the metadata hash? If the metadata hash
> > check
> > > > > > > succeeds,
> > > > > > > > > each
> > > > > > > > > >    consumer uses the # of partitions it had fetched. If
> it
> > > > > fails, a
> > > > > > > > > rebalance
> > > > > > > > > >    happens and the metadata is not used anyway.
> > > > > > > > >
> > > > > > > >
> > > > > > > > Doing this requires that every consumer always fetch the full
> > > > > metadata.
> > > > > > > The
> > > > > > > > most common use case is consumers that just want to consume
> one
> > > or
> > > > a
> > > > > > > couple
> > > > > > > > of topics, in which case grabbing all metadata for the entire
> > > > cluster
> > > > > > is
> > > > > > > > wasteful. If I subscribe only to topic A, why make all
> > consumers
> > > > grab
> > > > > > > > metadata for the entire topic (and need to rebalance every
> time
> > > it
> > > > > > > > changes!). Including the # of partitions for each topic lets
> > you
> > > > > avoid
> > > > > > > > having to grab the global set of metadata.
> > > > > > > >
> > > > > > > > So if you're just subscribing to one or a couple of topics,
> why
> > > not
> > > > > > just
> > > > > > > > compute the hash by filtering out everything but the topics
> you
> > > are
> > > > > > > > subscribed to? The problem there is if you ever add/remove
> > > > > > subscriptions
> > > > > > > > and want to support rolling upgrades. If the group was
> > subscribed
> > > > to
> > > > > > > topic
> > > > > > > > A, but later changes require subscribing to A + B, then to
> > > achieve
> > > > a
> > > > > > > > seamless rolling upgrade would require one (old) consumer to
> be
> > > > > > > subscribing
> > > > > > > > to A and one (new) consumer to be subscribing to A+B. If we
> > > > computed
> > > > > > > > metadata hashes based on filtered metadata, those two would
> > > > disagree
> > > > > > and
> > > > > > > we
> > > > > > > > could not perform assignment while the upgrade was in
> progress.
> > > > > > > >
> > > > > > > > The solution is to differentiate between the cases when a
> very
> > > > small
> > > > > > > amount
> > > > > > > > of the metadata is needed (one or a couple of topic
> > > subscriptions;
> > > > > > > > communicate and share this via metadata in the JoinGroup
> > > protocol)
> > > > vs
> > > > > > > when
> > > > > > > > *all* the metadata is needed (regex subscription; verify
> > > agreement
> > > > > via
> > > > > > > > hash).
> > > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > > >    2. Do you need a topic list and topic regex to be
> > > separate?
> > > > A
> > > > > > > single
> > > > > > > > > >    topic or list of topics can be expressed as a regex.
> > > > > > > > >
> > > > > > > >
> > > > > > > > See above note about collecting all metadata when you really
> > only
> > > > > need
> > > > > > it
> > > > > > > > for 1 or 2 topics. There's probably some debate to be had
> about
> > > > > whether
> > > > > > > > this cost would be too high -- every consumer would need to
> > > request
> > > > > the
> > > > > > > > metadata for all topics, and they'd need to request that all
> > > every
> > > > > time
> > > > > > > > they might be out of date.
> > > > > > > >
> > > > > > > Are we going to allow consumers in the same group to subscribe
> to
> > > > > > different
> > > > > > > topic set? If we do, we need to let them refresh metadata for
> all
> > > the
> > > > > > > topics a group is consuming from. If we don't then in the
> > protocol
> > > we
> > > > > > only
> > > > > > > need a subscription set hash.
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > > >    3. Let's include a version explicitly at the beginning
> > of
> > > > the
> > > > > > > > > >    ProtocolMetadata. The version dictates how to
> > deserialize
> > > > the
> > > > > > > > > >    ProtocolMetadata blob and is consistent with the rest
> of
> > > > > Kafka.
> > > > > > > > >
> > > > > > > >
> > > > > > > > If I'm understanding correctly, in JoinGroupRequest I would
> > > change
> > > > > > > >
> > > > > > > > GroupProtocols          => [Protocol ProtocolMetadata]
> > > > > > > >
> > > > > > > > to
> > > > > > > >
> > > > > > > > GroupProtocols          => [Protocol ProtocolVersion
> > > > > ProtocolMetadata]
> > > > > > > >
> > > > > > > > We had been talking about just baking the version into the
> > > Protocol
> > > > > > > field,
> > > > > > > > but making it separate seems perfectly reasonable to me.
> Jason,
> > > any
> > > > > > issue
> > > > > > > > with splitting the version out into a separate field like
> this?
> > > > > > > >
> > > > > > > > >
> > > > > > > > > > That can simplify the metadata format to the following:
> > > > > > > > > >
> > > > > > > > > > GroupType => "consumer"
> > > > > > > > > >>
> > > > > > > > > >> Protocol => AssignmentStrategy
> > > > > > > > > >>   AssignmentStrategy   => String
> > > > > > > > > >>
> > > > > > > > > >> ProtocolMetadata => Version Subscription
> > > > > > AssignmentStrategyMetadata
> > > > > > > > > >
> > > > > > > > > >     Version                    => String
> > > > > > > > > >
> > > > > > > > > >   Subscription                 => TopicRegex MetadataHash
> > > > > > > > > >>     TopicRegex                 => String
> > > > > > > > > >>     MetadataHash               => String
> > > > > > > > > >>   AssignmentStrategyMetadata   => bytes
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 13, 2015 at 6:28 PM, Jiangjie Qin
> > > > > > > > <j...@linkedin.com.invalid
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Ewen and Jason,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the reply. Sorry I missed the metadata hash.
> > Yes,
> > > > > that
> > > > > > > is a
> > > > > > > > > >> clever approach and would solve my concern about the
> data
> > > > > passing
> > > > > > > > > around.
> > > > > > > > > >> I
> > > > > > > > > >> can see both pros and cons from doing this, though. The
> > > > > advantage
> > > > > > is
> > > > > > > > we
> > > > > > > > > >> don't need the topic metadata in JoinGroupResponse
> > anymore.
> > > > The
> > > > > > > > downside
> > > > > > > > > >> is
> > > > > > > > > >> that now rebalance have extra dependency on the
> consensus
> > of
> > > > > > > metadata
> > > > > > > > of
> > > > > > > > > >> all consumers, which is obtained separately. So it is
> > > required
> > > > > > that
> > > > > > > > each
> > > > > > > > > >> consumer refresh their metadata before sending a
> > > > > JoinGroupRequest,
> > > > > > > > > >> otherwise in some cases (e.g. wildcard consumers) will
> > > almost
> > > > > > > > certainly
> > > > > > > > > >> fail for the first rebalance attempt. Even if we do
> that,
> > > > since
> > > > > > the
> > > > > > > > > >> consumers are getting metadata from different brokers,
> the
> > > > > > metadata
> > > > > > > > > might
> > > > > > > > > >> still be inconsistent if there is a topic or partition
> > > change
> > > > > > > because
> > > > > > > > > the
> > > > > > > > > >> UpdateMetadataRequest from controller might be handled
> at
> > > > > > different
> > > > > > > > > time.
> > > > > > > > > >> Just want to make sure we think through the cases so the
> > > > > protocol
> > > > > > > does
> > > > > > > > > not
> > > > > > > > > >> cause us unexpected issues.
> > > > > > > > > >>
> > > > > > > > > >> About the number of consumers, I think with the current
> > > > > liveliness
> > > > > > > > > >> definition, we can tolerate churns by bumping up the
> > session
> > > > > > > timeout.
> > > > > > > > > Also
> > > > > > > > > >> I guess we will see an increasing number of consumers
> for
> > > new
> > > > > > > > consumer,
> > > > > > > > > >> because every the old consumer thread will probably
> > become a
> > > > > > > consumer.
> > > > > > > > > >>
> > > > > > > > > >> It is a valid concern for consumers that have large
> > > > subscription
> > > > > > > set.
> > > > > > > > > This
> > > > > > > > > >> might not be avoided though for client side assignment
> > > > approach.
> > > > > > One
> > > > > > > > > >> solution is having topic names associate with a topic
> ID.
> > > And
> > > > > only
> > > > > > > use
> > > > > > > > > >> topic ID in JoinGroupRequest and JoinGroupResponse,
> There
> > > is a
> > > > > > > > > discussion
> > > > > > > > > >> thread about this to solve the topic renaming case but
> > this
> > > > is a
> > > > > > > > > >> completely
> > > > > > > > > >> different discussion.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >>
> > > > > > > > > >> Jiangjie (Becket) Qin
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Thu, Aug 13, 2015 at 2:14 PM, Jason Gustafson <
> > > > > > > ja...@confluent.io>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Thanks Jiangjie, that information helps. I agree the
> > > > protocol
> > > > > > must
> > > > > > > > > >> consider
> > > > > > > > > >> > scalability. My point was that the synchronization
> > barrier
> > > > in
> > > > > > the
> > > > > > > > > >> current
> > > > > > > > > >> > protocol already effectively limits the number of
> > > consumers
> > > > > > since
> > > > > > > it
> > > > > > > > > >> > provides no way to gracefully handle churn. It
> wouldn't
> > be
> > > > > worth
> > > > > > > > > >> worrying
> > > > > > > > > >> > about scaling up to 100,000 members, for example,
> > because
> > > > > > there's
> > > > > > > no
> > > > > > > > > way
> > > > > > > > > >> > the group would be stable. So we just need to set some
> > > clear
> > > > > > > > > >> expectations
> > > > > > > > > >> > on the size we can scale to, and that can help inform
> > the
> > > > > > > discussion
> > > > > > > > > on
> > > > > > > > > >> the
> > > > > > > > > >> > size of messages in this protocol.
> > > > > > > > > >> >
> > > > > > > > > >> > Ewen and I were discussing this morning along similar
> > > lines
> > > > to
> > > > > > > what
> > > > > > > > > >> you're
> > > > > > > > > >> > suggesting. However, even if the coordinator decides
> on
> > > the
> > > > > > > metadata
> > > > > > > > > for
> > > > > > > > > >> > the group, each member still needs to communicate its
> > > > > > > subscriptions
> > > > > > > > to
> > > > > > > > > >> the
> > > > > > > > > >> > rest of the group. This is nice for the regex case
> since
> > > the
> > > > > > regex
> > > > > > > > is
> > > > > > > > > >> > probably small, but if the members have a large topic
> > > list,
> > > > > then
> > > > > > > we
> > > > > > > > > have
> > > > > > > > > >> > the same problem. One thing I was thinking about was
> > > whether
> > > > > we
> > > > > > > > really
> > > > > > > > > >> need
> > > > > > > > > >> > to handle different subscriptions for every member. If
> > the
> > > > > > > > coordinator
> > > > > > > > > >> > could guarantee that all members had the same
> > > subscription,
> > > > > then
> > > > > > > > there
> > > > > > > > > >> > would be no need for the coordinator to return the
> > > > > subscriptions
> > > > > > > for
> > > > > > > > > >> each
> > > > > > > > > >> > member. However, this would prevent graceful upgrades.
> > We
> > > > > might
> > > > > > be
> > > > > > > > > able
> > > > > > > > > >> to
> > > > > > > > > >> > fix that problem by allowing the consumer to provide
> two
> > > > > > > > subscriptions
> > > > > > > > > >> to
> > > > > > > > > >> > allowing rolling updates, but that starts to sound
> > pretty
> > > > > nasty.
> > > > > > > > > >> >
> > > > > > > > > >> > -Jason
> > > > > > > > > >> >
> > > > > > > > > >> > On Thu, Aug 13, 2015 at 1:41 PM, Jiangjie Qin
> > > > > > > > > <j...@linkedin.com.invalid
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Jason,
> > > > > > > > > >> > >
> > > > > > > > > >> > > The protocol has to consider the scalability. The
> > > protocol
> > > > > in
> > > > > > > the
> > > > > > > > > wiki
> > > > > > > > > >> > > means the JoinGroupResoponse size would be:
> > > > > > > > > >> > > NumberOfTopics * (AvgTopicNameLength + 4) *
> > > > > > > (NumberOfConsumers)^2
> > > > > > > > > >> > >
> > > > > > > > > >> > > To give some real number, we have 26-node Mirror
> Maker
> > > > > > cluster,
> > > > > > > > each
> > > > > > > > > >> > with 4
> > > > > > > > > >> > > consumers. That is 104 consumers using regex ".*".
> And
> > > > most
> > > > > of
> > > > > > > our
> > > > > > > > > >> > clusters
> > > > > > > > > >> > > have around 3000 topics, whose topic name are
> > typically
> > > > > around
> > > > > > > 20
> > > > > > > > > >> > > characters.
> > > > > > > > > >> > >
> > > > > > > > > >> > > I think the key issue for client side partition
> > > assignment
> > > > > > logic
> > > > > > > > is
> > > > > > > > > to
> > > > > > > > > >> > make
> > > > > > > > > >> > > sure 1) all the clients run the same algorithm. 2)
> all
> > > the
> > > > > > > clients
> > > > > > > > > >> make
> > > > > > > > > >> > > decision on the same topic metadata. The second
> > purpose
> > > > can
> > > > > be
> > > > > > > > done
> > > > > > > > > by
> > > > > > > > > >> > > simply letting coordinator provide the topic
> metadata
> > > and
> > > > > all
> > > > > > > then
> > > > > > > > > >> member
> > > > > > > > > >> > > information as source of truth. Is it necessary to
> > pass
> > > > > topic
> > > > > > > > > >> metadata of
> > > > > > > > > >> > > each consumer around? Can we keep the protocol
> > metadata
> > > > > field
> > > > > > > > > >> completely
> > > > > > > > > >> > > independent of topic metadata? I think In the
> > > > > > JoinGroupResponse,
> > > > > > > > we
> > > > > > > > > >> > should
> > > > > > > > > >> > > have only one copy of topic metadata provided by
> > > > coordinator
> > > > > > and
> > > > > > > > is
> > > > > > > > > >> > outside
> > > > > > > > > >> > > of protocol metadata. If user decides to put some
> > > metadata
> > > > > in
> > > > > > > the
> > > > > > > > > >> > > JoinGroupRequest and let coordinator pass around,
> they
> > > are
> > > > > > > > > responsible
> > > > > > > > > >> > for
> > > > > > > > > >> > > understanding the risk.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Thanks,
> > > > > > > > > >> > >
> > > > > > > > > >> > > Jiangjie (Becket) Qin
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Thu, Aug 13, 2015 at 12:41 PM, Jason Gustafson <
> > > > > > > > > ja...@confluent.io
> > > > > > > > > >> >
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Hey Onur and Jiangjie,
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > I've updated that wiki with a proposal to add
> regex
> > > > > > > > subscriptions
> > > > > > > > > to
> > > > > > > > > >> > the
> > > > > > > > > >> > > > consumer metadata. Can you have a look to see if
> it
> > > > > > addresses
> > > > > > > > your
> > > > > > > > > >> > > concern?
> > > > > > > > > >> > > > In general, I think we should be a little careful
> > when
> > > > we
> > > > > > are
> > > > > > > > > >> talking
> > > > > > > > > >> > > about
> > > > > > > > > >> > > > the scalability of the protocol. Regardless of
> > whether
> > > > > > > > assignment
> > > > > > > > > is
> > > > > > > > > >> > done
> > > > > > > > > >> > > > on the server or the client, the protocol assumes
> a
> > > > > > relatively
> > > > > > > > > >> stable
> > > > > > > > > >> > > > configuration. When the number of consumers
> > increases
> > > > > > beyond a
> > > > > > > > > >> certain
> > > > > > > > > >> > > > limit, then membership churn becomes a major
> > concern.
> > > > > > > Similarly
> > > > > > > > > >> there
> > > > > > > > > >> > is
> > > > > > > > > >> > > a
> > > > > > > > > >> > > > notion of metadata churn when topics are added,
> > > deleted,
> > > > > or
> > > > > > > > > >> resized. If
> > > > > > > > > >> > > > either membership or metadata changes, then the
> > > protocol
> > > > > > > forces
> > > > > > > > > all
> > > > > > > > > >> > > > consumers to stop consumption and rejoin the
> group.
> > If
> > > > > this
> > > > > > > > > happens
> > > > > > > > > >> > often
> > > > > > > > > >> > > > enough, then it can severely impact the ability of
> > the
> > > > > > > consumer
> > > > > > > > to
> > > > > > > > > >> make
> > > > > > > > > >> > > > progress. The point is that the protocol may
> already
> > > be
> > > > > > > unsuited
> > > > > > > > > to
> > > > > > > > > >> > cases
> > > > > > > > > >> > > > where there are either a large number of consumers
> > or
> > > > > > topics.
> > > > > > > I
> > > > > > > > > >> wonder
> > > > > > > > > >> > if
> > > > > > > > > >> > > > you guys can share your thoughts about your
> scaling
> > > > > > > > expectations?
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > -Jason
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Wed, Aug 12, 2015 at 12:28 PM, Jason Gustafson
> <
> > > > > > > > > >> ja...@confluent.io>
> > > > > > > > > >> > > > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > Hey Jiangjie,
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > That's a great point. In the worst case (the
> > mirror
> > > > > maker
> > > > > > > > case I
> > > > > > > > > >> > > guess),
> > > > > > > > > >> > > > > the join group response can be massive. This
> would
> > > be
> > > > > > > > especially
> > > > > > > > > >> > deadly
> > > > > > > > > >> > > > > when there is a lot of churn in the group (e.g.
> > in a
> > > > > > rolling
> > > > > > > > > >> > upgrade).
> > > > > > > > > >> > > > The
> > > > > > > > > >> > > > > current protocol is not great for this case
> > either,
> > > > but
> > > > > > it's
> > > > > > > > > >> > > > significantly
> > > > > > > > > >> > > > > better. Here are a couple ways to deal with the
> > > size:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > 1. First, we could have the coordinator compress
> > the
> > > > > > > > responses.
> > > > > > > > > >> This
> > > > > > > > > >> > > > would
> > > > > > > > > >> > > > > probably be pretty effective if applied across
> the
> > > > > > metadata
> > > > > > > > from
> > > > > > > > > >> all
> > > > > > > > > >> > > > > members.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > 2. I think the regex case is the main problem.
> Is
> > > that
> > > > > > > right?
> > > > > > > > We
> > > > > > > > > >> > could
> > > > > > > > > >> > > > > extend the metadata to allow the consumer to
> embed
> > > its
> > > > > > regex
> > > > > > > > > >> > > subscription
> > > > > > > > > >> > > > > in the metadata directly (note this might be a
> > good
> > > > idea
> > > > > > > > > >> regardless
> > > > > > > > > >> > of
> > > > > > > > > >> > > > the
> > > > > > > > > >> > > > > rest of this proposal). To support regex on the
> > > > > consumer,
> > > > > > we
> > > > > > > > > must
> > > > > > > > > >> > fetch
> > > > > > > > > >> > > > > metadata for all topics. Rather than having all
> > > regex
> > > > > > > > > subscribers
> > > > > > > > > >> > embed
> > > > > > > > > >> > > > all
> > > > > > > > > >> > > > > of this metadata in their join group requests,
> > they
> > > > > could
> > > > > > > > > instead
> > > > > > > > > >> > > embed a
> > > > > > > > > >> > > > > hash of it. Then after the join group responses
> > are
> > > > > > > received,
> > > > > > > > > they
> > > > > > > > > >> > just
> > > > > > > > > >> > > > > need to check that the hashes are the same. If
> > there
> > > > is
> > > > > a
> > > > > > > > > mismatch
> > > > > > > > > >> > > (which
> > > > > > > > > >> > > > > should only occur when topics are created,
> > deleted,
> > > or
> > > > > > > > resized),
> > > > > > > > > >> then
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > > group members must refetch the metadata and
> rejoin
> > > the
> > > > > > > group.
> > > > > > > > > >> This is
> > > > > > > > > >> > > > also
> > > > > > > > > >> > > > > how the current protocol behaves when there is a
> > > > change
> > > > > in
> > > > > > > the
> > > > > > > > > >> topic
> > > > > > > > > >> > > > > metadata affecting the group--someone (either
> the
> > > > > > > coordinator
> > > > > > > > or
> > > > > > > > > >> the
> > > > > > > > > >> > > > > consumer) detects the change and forces the
> group
> > to
> > > > > > > > rebalance.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > What do you think?
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > (Also I think adding groupId/generationId to
> fetch
> > > and
> > > > > > > produce
> > > > > > > > > >> > requests
> > > > > > > > > >> > > > > seems like an interesting line of thought.)
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > -Jason
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > On Wed, Aug 12, 2015 at 10:57 AM, Jiangjie Qin
> > > > > > > > > >> > > <j...@linkedin.com.invalid
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >> Hey Ewen,
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> Onur and I discussed this a little bit more.
> And
> > we
> > > > are
> > > > > > > still
> > > > > > > > > >> > worrying
> > > > > > > > > >> > > > >> about passing all the metadata of all consumers
> > > > around.
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> Let's say I have a cluster has 10,000 topics,
> the
> > > > > average
> > > > > > > > topic
> > > > > > > > > >> name
> > > > > > > > > >> > > > >> length
> > > > > > > > > >> > > > >> is 10 bytes. In this case, the opaque metadata
> > will
> > > > > have
> > > > > > > 10 *
> > > > > > > > > >> > 10,000 =
> > > > > > > > > >> > > > >> 100KB for topic name, for each topic, there is
> a
> > > > 4-byte
> > > > > > > > integer
> > > > > > > > > >> of
> > > > > > > > > >> > > > number
> > > > > > > > > >> > > > >> of partitions, that's another 40KB. So one
> global
> > > > topic
> > > > > > > > > metadata
> > > > > > > > > >> > will
> > > > > > > > > >> > > > have
> > > > > > > > > >> > > > >> 140KB data. If I have 100 consumers who are
> using
> > > > > > wildcard
> > > > > > > to
> > > > > > > > > >> > consume
> > > > > > > > > >> > > > from
> > > > > > > > > >> > > > >> all the topics. That means the protocol
> metadata
> > > end
> > > > up
> > > > > > in
> > > > > > > > the
> > > > > > > > > >> > > > >> JoinGroupResponse will be 140KB * 100 = 14MB.
> And
> > > the
> > > > > > > > > >> > > JoinGroupResponse
> > > > > > > > > >> > > > >> will need to be sent to 100 different
> consumers,
> > > that
> > > > > > means
> > > > > > > > > 14MB
> > > > > > > > > >> *
> > > > > > > > > >> > > 100 =
> > > > > > > > > >> > > > >> 1.4GB need to be sent by the consumer
> coordinator
> > > for
> > > > > one
> > > > > > > > > >> rebalance.
> > > > > > > > > >> > > How
> > > > > > > > > >> > > > >> would that work?
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> Also, having two consumers (old owner and new
> > > owner)
> > > > > > > > consuming
> > > > > > > > > >> from
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> same partition might also be a problem. e.g.
> > people
> > > > are
> > > > > > > > > updating
> > > > > > > > > >> > > > database.
> > > > > > > > > >> > > > >> One thing might worth doing is to add GroupId
> and
> > > > > > > Generation
> > > > > > > > ID
> > > > > > > > > >> to
> > > > > > > > > >> > > > >> ProducerRequest and FetchRequest as well. This
> > will
> > > > > also
> > > > > > > help
> > > > > > > > > >> with
> > > > > > > > > >> > the
> > > > > > > > > >> > > > >> single producer use case. However, this is
> > probably
> > > > > > > > orthogonal
> > > > > > > > > to
> > > > > > > > > >> > this
> > > > > > > > > >> > > > >> thread given the current new consumer also has
> > this
> > > > > > problem
> > > > > > > > > and I
> > > > > > > > > >> > > > believe
> > > > > > > > > >> > > > >> we need to fix it.
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> Thanks,
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> Jiangjie (Becket) Qin
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> On Tue, Aug 11, 2015 at 11:43 PM, Ewen
> > > > > Cheslack-Postava <
> > > > > > > > > >> > > > >> e...@confluent.io>
> > > > > > > > > >> > > > >> wrote:
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> > On Tue, Aug 11, 2015 at 11:29 PM, Jiangjie
> Qin
> > > > > > > > > >> > > > >> <j...@linkedin.com.invalid>
> > > > > > > > > >> > > > >> > wrote:
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > > Ewen,
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > Thanks for the explanation.
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > For (1), I am more concerned about the
> > failure
> > > > case
> > > > > > > > instead
> > > > > > > > > >> of
> > > > > > > > > >> > > > normal
> > > > > > > > > >> > > > >> > case.
> > > > > > > > > >> > > > >> > > What if a consumer somehow was kick out of
> a
> > > > group
> > > > > > but
> > > > > > > is
> > > > > > > > > >> still
> > > > > > > > > >> > > > >> consuming
> > > > > > > > > >> > > > >> > > and committing offsets? Does that mean the
> > new
> > > > > owner
> > > > > > > and
> > > > > > > > > old
> > > > > > > > > >> > owner
> > > > > > > > > >> > > > >> might
> > > > > > > > > >> > > > >> > > potentially consuming from and committing
> > > offsets
> > > > > for
> > > > > > > the
> > > > > > > > > >> same
> > > > > > > > > >> > > > >> partition?
> > > > > > > > > >> > > > >> > > In the old consumer, this won't happen
> > because
> > > > the
> > > > > > new
> > > > > > > > > >> consumer
> > > > > > > > > >> > > will
> > > > > > > > > >> > > > >> not
> > > > > > > > > >> > > > >> > be
> > > > > > > > > >> > > > >> > > able to start consumption unless the
> previous
> > > > owner
> > > > > > has
> > > > > > > > > >> released
> > > > > > > > > >> > > its
> > > > > > > > > >> > > > >> > > ownership. Basically, without the ownership
> > > > > > guarantee,
> > > > > > > I
> > > > > > > > > >> don't
> > > > > > > > > >> > see
> > > > > > > > > >> > > > how
> > > > > > > > > >> > > > >> > the
> > > > > > > > > >> > > > >> > > communication among consumers themselves
> > alone
> > > > can
> > > > > > > solve
> > > > > > > > > the
> > > > > > > > > >> > > problem
> > > > > > > > > >> > > > >> > here.
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > The generation ID check still applies to
> offset
> > > > > > commits.
> > > > > > > If
> > > > > > > > > >> one of
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> > consumers is kicked out and misbehaving, it
> can
> > > > > > obviously
> > > > > > > > > still
> > > > > > > > > >> > > fetch
> > > > > > > > > >> > > > >> and
> > > > > > > > > >> > > > >> > process messages, but offset commits will not
> > > work
> > > > > > since
> > > > > > > it
> > > > > > > > > >> will
> > > > > > > > > >> > not
> > > > > > > > > >> > > > >> have
> > > > > > > > > >> > > > >> > the current generation ID.
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > For (2) and (3), now I understand how
> > metadata
> > > > are
> > > > > > > used.
> > > > > > > > > But
> > > > > > > > > >> I
> > > > > > > > > >> > > still
> > > > > > > > > >> > > > >> > don't
> > > > > > > > > >> > > > >> > > see why should we let the consumers to pass
> > the
> > > > > topic
> > > > > > > > > >> > information
> > > > > > > > > >> > > > >> across
> > > > > > > > > >> > > > >> > > instead of letting coordinator give the
> > > > > information.
> > > > > > > The
> > > > > > > > > >> single
> > > > > > > > > >> > > > >> producer
> > > > > > > > > >> > > > >> > > use case does not solve the ownership
> problem
> > > in
> > > > > > > abnormal
> > > > > > > > > >> case
> > > > > > > > > >> > > > either,
> > > > > > > > > >> > > > >> > > which seems to be a little bit vulnerable.
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > One of the goals here was to generalize group
> > > > > > membership
> > > > > > > so
> > > > > > > > > we
> > > > > > > > > >> > can,
> > > > > > > > > >> > > > for
> > > > > > > > > >> > > > >> > example, use it for balancing Copycat tasks
> > > across
> > > > > > > workers.
> > > > > > > > > >> > There's
> > > > > > > > > >> > > no
> > > > > > > > > >> > > > >> > topic subscription info in that case. The
> > > metadata
> > > > > for
> > > > > > > > > copycat
> > > > > > > > > >> > > workers
> > > > > > > > > >> > > > >> > would instead need to somehow indicate the
> > > current
> > > > > set
> > > > > > of
> > > > > > > > > tasks
> > > > > > > > > >> > that
> > > > > > > > > >> > > > >> need
> > > > > > > > > >> > > > >> > to be assigned to workers. By making the
> > metadata
> > > > > > > > completely
> > > > > > > > > >> > opaque
> > > > > > > > > >> > > to
> > > > > > > > > >> > > > >> the
> > > > > > > > > >> > > > >> > protocol, it becomes more generally useful
> > since
> > > it
> > > > > > > focuses
> > > > > > > > > >> > squarely
> > > > > > > > > >> > > > on
> > > > > > > > > >> > > > >> the
> > > > > > > > > >> > > > >> > group membership problem, allowing for that
> > > > > additional
> > > > > > > bit
> > > > > > > > of
> > > > > > > > > >> > > metadata
> > > > > > > > > >> > > > >> so
> > > > > > > > > >> > > > >> > you don't just get a list of members, but
> also
> > > get
> > > > a
> > > > > > > little
> > > > > > > > > >> bit of
> > > > > > > > > >> > > > info
> > > > > > > > > >> > > > >> > about each of them.
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > A different option that we explored is to
> use a
> > > > sort
> > > > > of
> > > > > > > > mixed
> > > > > > > > > >> > model
> > > > > > > > > >> > > --
> > > > > > > > > >> > > > >> > still bake all the topic subscriptions
> directly
> > > > into
> > > > > > the
> > > > > > > > > >> protocol
> > > > > > > > > >> > > but
> > > > > > > > > >> > > > >> also
> > > > > > > > > >> > > > >> > include metadata. That would allow us to
> > maintain
> > > > the
> > > > > > > > > existing
> > > > > > > > > >> > > > >> > coordinator-driven approach to handling the
> > > > metadata
> > > > > > and
> > > > > > > > > change
> > > > > > > > > >> > > events
> > > > > > > > > >> > > > >> like
> > > > > > > > > >> > > > >> > the ones Onur pointed out. Then something
> like
> > > the
> > > > > > > Copycat
> > > > > > > > > >> workers
> > > > > > > > > >> > > > would
> > > > > > > > > >> > > > >> > just not fill in any topic subscriptions and
> it
> > > > would
> > > > > > be
> > > > > > > > > >> handled
> > > > > > > > > >> > as
> > > > > > > > > >> > > a
> > > > > > > > > >> > > > >> > degenerate case. Based on the way I explained
> > > that
> > > > we
> > > > > > can
> > > > > > > > > >> handle
> > > > > > > > > >> > > those
> > > > > > > > > >> > > > >> > types of events, I personally feel its
> cleaner
> > > and
> > > > a
> > > > > > > nicer
> > > > > > > > > >> > > > >> generalization
> > > > > > > > > >> > > > >> > to not include the subscriptions in the join
> > > group
> > > > > > > > protocol,
> > > > > > > > > >> > making
> > > > > > > > > >> > > it
> > > > > > > > > >> > > > >> part
> > > > > > > > > >> > > > >> > of the metadata instead.
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > For the single producer case, are you saying
> it
> > > > > doesn't
> > > > > > > > solve
> > > > > > > > > >> > > > ownership
> > > > > > > > > >> > > > >> in
> > > > > > > > > >> > > > >> > the abnormal case because a producer that
> > doesn't
> > > > > know
> > > > > > it
> > > > > > > > has
> > > > > > > > > >> been
> > > > > > > > > >> > > > >> kicked
> > > > > > > > > >> > > > >> > out of the group yet can still produce data
> > even
> > > > > though
> > > > > > > it
> > > > > > > > > >> > shouldn't
> > > > > > > > > >> > > > be
> > > > > > > > > >> > > > >> > able to anymore? I definitely agree that that
> > is
> > > a
> > > > > risk
> > > > > > > --
> > > > > > > > > this
> > > > > > > > > >> > > > >> provides a
> > > > > > > > > >> > > > >> > way to get closer to a true single-writer,
> but
> > > > there
> > > > > > are
> > > > > > > > > >> > definitely
> > > > > > > > > >> > > > >> still
> > > > > > > > > >> > > > >> > failure modes that this does not address.
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > -Ewen
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > Thanks,
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > Jiangjie (Becket) Qin
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > On Tue, Aug 11, 2015 at 11:06 PM, Ewen
> > > > > > > Cheslack-Postava <
> > > > > > > > > >> > > > >> > e...@confluent.io
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > wrote:
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > > On Tue, Aug 11, 2015 at 10:15 PM,
> Jiangjie
> > > Qin
> > > > > > > > > >> > > > >> > <j...@linkedin.com.invalid
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > wrote:
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > > Hi Jason,
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > > Thanks for writing this up. It would be
> > > > useful
> > > > > to
> > > > > > > > > >> generalize
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> > group
> > > > > > > > > >> > > > >> > > > > concept. I have a few questions below.
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > > 1. In old consumer actually the
> partition
> > > > > > > assignment
> > > > > > > > > are
> > > > > > > > > >> > done
> > > > > > > > > >> > > by
> > > > > > > > > >> > > > >> > > > consumers
> > > > > > > > > >> > > > >> > > > > themselves. We used zookeeper to
> > guarantee
> > > > > that a
> > > > > > > > > >> partition
> > > > > > > > > >> > > will
> > > > > > > > > >> > > > >> only
> > > > > > > > > >> > > > >> > > be
> > > > > > > > > >> > > > >> > > > > consumed by one consumer thread who
> > > > > successfully
> > > > > > > > > claimed
> > > > > > > > > >> its
> > > > > > > > > >> > > > >> > ownership.
> > > > > > > > > >> > > > >> > > > > Does the new protocol plan to provide
> the
> > > > same
> > > > > > > > > guarantee?
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > Once you have all the metadata from all
> the
> > > > > > > consumers,
> > > > > > > > > >> > > assignment
> > > > > > > > > >> > > > >> > should
> > > > > > > > > >> > > > >> > > > just be a simple function mapping that
> > > > > > > Map<ConsumerId,
> > > > > > > > > >> > Metadata>
> > > > > > > > > >> > > > to
> > > > > > > > > >> > > > >> > > > Map<ConsumerId, List<TopicPartition>>. If
> > > > > everyone
> > > > > > is
> > > > > > > > > >> > consistent
> > > > > > > > > >> > > > in
> > > > > > > > > >> > > > >> > > > computing that, you don't need ZK
> involved
> > at
> > > > > all.
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > In practice, this shouldn't be that hard
> to
> > > > > ensure
> > > > > > > for
> > > > > > > > > most
> > > > > > > > > >> > > > >> assignment
> > > > > > > > > >> > > > >> > > > strategies just by having decent unit
> > testing
> > > > on
> > > > > > > them.
> > > > > > > > > You
> > > > > > > > > >> > just
> > > > > > > > > >> > > > >> have to
> > > > > > > > > >> > > > >> > > do
> > > > > > > > > >> > > > >> > > > things like ensure your assignment
> strategy
> > > > sorts
> > > > > > > lists
> > > > > > > > > >> into a
> > > > > > > > > >> > > > >> > consistent
> > > > > > > > > >> > > > >> > > > order.
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > You do give up the ability to use some
> > > > techniques
> > > > > > > (e.g.
> > > > > > > > > any
> > > > > > > > > >> > > > >> randomized
> > > > > > > > > >> > > > >> > > > algorithm if you can't distribute the
> seed
> > w/
> > > > the
> > > > > > > > > metadata)
> > > > > > > > > >> > and
> > > > > > > > > >> > > > it's
> > > > > > > > > >> > > > >> > true
> > > > > > > > > >> > > > >> > > > that nothing validates the assignment,
> but
> > if
> > > > > that
> > > > > > > > > >> assignment
> > > > > > > > > >> > > > >> algorithm
> > > > > > > > > >> > > > >> > > > step is kept simple, small, and well
> > tested,
> > > > the
> > > > > > risk
> > > > > > > > is
> > > > > > > > > >> very
> > > > > > > > > >> > > > >> minimal.
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > > 2. It looks that both JoinGroupRequest
> > and
> > > > > > > > > >> JoinGroupResponse
> > > > > > > > > >> > > has
> > > > > > > > > >> > > > >> the
> > > > > > > > > >> > > > >> > > > >
> > > ProtocolMetadata.AssignmentStrategyMetadata,
> > > > > what
> > > > > > > > would
> > > > > > > > > >> be
> > > > > > > > > >> > the
> > > > > > > > > >> > > > >> > metadata
> > > > > > > > > >> > > > >> > > > be
> > > > > > > > > >> > > > >> > > > > sent and returned by coordinator? How
> > will
> > > > the
> > > > > > > > > >> coordinator
> > > > > > > > > >> > > > handle
> > > > > > > > > >> > > > >> the
> > > > > > > > > >> > > > >> > > > > metadata?
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > The coordinator is basically just blindly
> > > > > > > broadcasting
> > > > > > > > > all
> > > > > > > > > >> of
> > > > > > > > > >> > it
> > > > > > > > > >> > > > to
> > > > > > > > > >> > > > >> > group
> > > > > > > > > >> > > > >> > > > members so they have a consistent view.
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > So from the coordinators perspective, it
> > sees
> > > > > > > something
> > > > > > > > > >> like:
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > Consumer 1 -> JoinGroupRequest with
> > > > > GroupProtocols
> > > > > > =
> > > > > > > [
> > > > > > > > > >> > > "consumer"
> > > > > > > > > >> > > > >> > > > <Consumer1 opaque byte[]>]
> > > > > > > > > >> > > > >> > > > Consumer 2 -> JoinGroupRequest with
> > > > > GroupProtocols
> > > > > > =
> > > > > > > [
> > > > > > > > > >> > > "consumer"
> > > > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > Then, in the responses would look like:
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > Consumer 1 <- JoinGroupResponse with
> > > > > GroupProtocol
> > > > > > =
> > > > > > > > > >> > "consumer"
> > > > > > > > > >> > > > and
> > > > > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1
> > opaque
> > > > > > > byte[]>,
> > > > > > > > > >> > Consumer
> > > > > > > > > >> > > 2
> > > > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > > > >> > > > >> > > > Consumer 2 <- JoinGroupResponse with
> > > > > GroupProtocol
> > > > > > =
> > > > > > > > > >> > "consumer"
> > > > > > > > > >> > > > and
> > > > > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1
> > opaque
> > > > > > > byte[]>,
> > > > > > > > > >> > Consumer
> > > > > > > > > >> > > 2
> > > > > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > So all the responses include all the
> > metadata
> > > > for
> > > > > > > every
> > > > > > > > > >> member
> > > > > > > > > >> > > in
> > > > > > > > > >> > > > >> the
> > > > > > > > > >> > > > >> > > > group, and everyone can use that to
> > > > consistently
> > > > > > > decide
> > > > > > > > > on
> > > > > > > > > >> > > > >> assignment.
> > > > > > > > > >> > > > >> > > The
> > > > > > > > > >> > > > >> > > > broker doesn't care and cannot even
> > > understand
> > > > > the
> > > > > > > > > metadata
> > > > > > > > > >> > > since
> > > > > > > > > >> > > > >> the
> > > > > > > > > >> > > > >> > > data
> > > > > > > > > >> > > > >> > > > format for it is dependent on the
> > assignment
> > > > > > strategy
> > > > > > > > > being
> > > > > > > > > >> > > used.
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > As another example that is *not* a
> > consumer,
> > > > > let's
> > > > > > > say
> > > > > > > > > you
> > > > > > > > > >> > just
> > > > > > > > > >> > > > >> want to
> > > > > > > > > >> > > > >> > > > have a single writer in the group which
> > > > everyone
> > > > > > will
> > > > > > > > > >> forward
> > > > > > > > > >> > > > >> requests
> > > > > > > > > >> > > > >> > > to.
> > > > > > > > > >> > > > >> > > > To accomplish this, you could use a very
> > dumb
> > > > > > > > assignment
> > > > > > > > > >> > > strategy:
> > > > > > > > > >> > > > >> > there
> > > > > > > > > >> > > > >> > > is
> > > > > > > > > >> > > > >> > > > no metadata (empty byte[]) and all we
> care
> > > > about
> > > > > is
> > > > > > > who
> > > > > > > > > is
> > > > > > > > > >> the
> > > > > > > > > >> > > > first
> > > > > > > > > >> > > > >> > > member
> > > > > > > > > >> > > > >> > > > in the group (e.g. when IDs are sorted
> > > > > > > > > lexicographically).
> > > > > > > > > >> > That
> > > > > > > > > >> > > > >> member
> > > > > > > > > >> > > > >> > is
> > > > > > > > > >> > > > >> > > > selected as the writer. In that case, we
> > > > actually
> > > > > > > just
> > > > > > > > > care
> > > > > > > > > >> > > about
> > > > > > > > > >> > > > >> the
> > > > > > > > > >> > > > >> > > > membership list, there's no additional
> info
> > > > about
> > > > > > > each
> > > > > > > > > >> member
> > > > > > > > > >> > > that
> > > > > > > > > >> > > > >> is
> > > > > > > > > >> > > > >> > > > required to determine who is the writer.
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > > 3. Do you mean that the number of
> > > partitions
> > > > in
> > > > > > > > > >> > > > JoinGroupResponse
> > > > > > > > > >> > > > >> > will
> > > > > > > > > >> > > > >> > > be
> > > > > > > > > >> > > > >> > > > > the max partition number of a topic
> among
> > > all
> > > > > the
> > > > > > > > > >> reported
> > > > > > > > > >> > > > >> partition
> > > > > > > > > >> > > > >> > > > number
> > > > > > > > > >> > > > >> > > > > by consumers? Is there any reason not
> > just
> > > > let
> > > > > > > > > >> Coordinator
> > > > > > > > > >> > to
> > > > > > > > > >> > > > >> return
> > > > > > > > > >> > > > >> > > the
> > > > > > > > > >> > > > >> > > > > number of partitions of a topic in its
> > > > metadata
> > > > > > > > cache?
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > Nothing from the embedded protocol is
> > touched
> > > > by
> > > > > > the
> > > > > > > > > >> broker.
> > > > > > > > > >> > The
> > > > > > > > > >> > > > >> broker
> > > > > > > > > >> > > > >> > > > just collects opaque bytes of metadata,
> > does
> > > > the
> > > > > > > > > selection
> > > > > > > > > >> of
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> > > strategy
> > > > > > > > > >> > > > >> > > > if multiple are supported by some
> > consumers,
> > > > and
> > > > > > then
> > > > > > > > > >> returns
> > > > > > > > > >> > > that
> > > > > > > > > >> > > > >> > opaque
> > > > > > > > > >> > > > >> > > > metadata for all the members back to
> every
> > > > > member.
> > > > > > In
> > > > > > > > > that
> > > > > > > > > >> way
> > > > > > > > > >> > > > they
> > > > > > > > > >> > > > >> all
> > > > > > > > > >> > > > >> > > > have a consistent view of the group. For
> > > > regular
> > > > > > > > > consumers,
> > > > > > > > > >> > that
> > > > > > > > > >> > > > >> view
> > > > > > > > > >> > > > >> > of
> > > > > > > > > >> > > > >> > > > the group includes information about how
> > many
> > > > > > > > partitions
> > > > > > > > > >> each
> > > > > > > > > >> > > > >> consumer
> > > > > > > > > >> > > > >> > > > currently thinks the topics it is
> > subscribed
> > > to
> > > > > > has.
> > > > > > > > > These
> > > > > > > > > >> > could
> > > > > > > > > >> > > > be
> > > > > > > > > >> > > > >> > > > inconsistent due to out of date metadata
> > and
> > > it
> > > > > > would
> > > > > > > > be
> > > > > > > > > >> up to
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> > > > assignment strategy on the *client* to
> > > resolve
> > > > > > that.
> > > > > > > As
> > > > > > > > > you
> > > > > > > > > >> > > point
> > > > > > > > > >> > > > >> out,
> > > > > > > > > >> > > > >> > in
> > > > > > > > > >> > > > >> > > > that case they could just take the max
> > value
> > > > that
> > > > > > any
> > > > > > > > > >> consumer
> > > > > > > > > >> > > > >> reported
> > > > > > > > > >> > > > >> > > > seeing and use that. The consumers that
> > > notice
> > > > > that
> > > > > > > > their
> > > > > > > > > >> > > metadata
> > > > > > > > > >> > > > >> had
> > > > > > > > > >> > > > >> > a
> > > > > > > > > >> > > > >> > > > smaller # of partitions should also
> > trigger a
> > > > > > > metadata
> > > > > > > > > >> update
> > > > > > > > > >> > > when
> > > > > > > > > >> > > > >> they
> > > > > > > > > >> > > > >> > > see
> > > > > > > > > >> > > > >> > > > someone else observing a larger # of
> > > > partitions.
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > > Thanks,
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > > Jiangjie (Becket) Qin
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason
> > > > > Gustafson
> > > > > > <
> > > > > > > > > >> > > > >> ja...@confluent.io
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> > > > > wrote:
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > > > > Hi Kafka Devs,
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > > > One of the nagging issues in the
> > current
> > > > > design
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > >> new
> > > > > > > > > >> > > > >> consumer
> > > > > > > > > >> > > > >> > > has
> > > > > > > > > >> > > > >> > > > > > been the need to support a variety of
> > > > > > assignment
> > > > > > > > > >> > strategies.
> > > > > > > > > >> > > > >> We've
> > > > > > > > > >> > > > >> > > > > > encountered this in particular in the
> > > > design
> > > > > of
> > > > > > > > > copycat
> > > > > > > > > >> > and
> > > > > > > > > >> > > > the
> > > > > > > > > >> > > > >> > > > > processing
> > > > > > > > > >> > > > >> > > > > > framework (KIP-28). From what I
> > > understand,
> > > > > > Samza
> > > > > > > > > also
> > > > > > > > > >> > has a
> > > > > > > > > >> > > > >> number
> > > > > > > > > >> > > > >> > > of
> > > > > > > > > >> > > > >> > > > > use
> > > > > > > > > >> > > > >> > > > > > cases with custom assignment needs.
> The
> > > new
> > > > > > > > consumer
> > > > > > > > > >> > > protocol
> > > > > > > > > >> > > > >> > > supports
> > > > > > > > > >> > > > >> > > > > new
> > > > > > > > > >> > > > >> > > > > > assignment strategies by hooking them
> > > into
> > > > > the
> > > > > > > > > broker.
> > > > > > > > > >> For
> > > > > > > > > >> > > > many
> > > > > > > > > >> > > > >> > > > > > environments, this is a major pain
> and
> > in
> > > > > some
> > > > > > > > > cases, a
> > > > > > > > > >> > > > >> > non-starter.
> > > > > > > > > >> > > > >> > > It
> > > > > > > > > >> > > > >> > > > > > also challenges the validation that
> the
> > > > > > > coordinator
> > > > > > > > > can
> > > > > > > > > >> > > > provide.
> > > > > > > > > >> > > > >> > For
> > > > > > > > > >> > > > >> > > > > > example, some assignment strategies
> > call
> > > > for
> > > > > > > > > >> partitions to
> > > > > > > > > >> > > be
> > > > > > > > > >> > > > >> > > assigned
> > > > > > > > > >> > > > >> > > > > > multiple times, which means that the
> > > > > > coordinator
> > > > > > > > can
> > > > > > > > > >> only
> > > > > > > > > >> > > > check
> > > > > > > > > >> > > > >> > that
> > > > > > > > > >> > > > >> > > > > > partitions have been assigned at
> least
> > > > once.
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > > > To solve these issues, we'd like to
> > > propose
> > > > > > > moving
> > > > > > > > > >> > > assignment
> > > > > > > > > >> > > > to
> > > > > > > > > >> > > > >> > the
> > > > > > > > > >> > > > >> > > > > > client. I've written a wiki which
> > > outlines
> > > > > some
> > > > > > > > > >> protocol
> > > > > > > > > >> > > > >> changes to
> > > > > > > > > >> > > > >> > > > > achieve
> > > > > > > > > >> > > > >> > > > > > this:
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > > > > > > > > >> > > > >> > > > > > .
> > > > > > > > > >> > > > >> > > > > > To summarize briefly, instead of the
> > > > > > coordinator
> > > > > > > > > >> assigning
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> > > > partitions
> > > > > > > > > >> > > > >> > > > > > itself, all subscriptions are
> forwarded
> > > to
> > > > > each
> > > > > > > > > member
> > > > > > > > > >> of
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> group
> > > > > > > > > >> > > > >> > > > which
> > > > > > > > > >> > > > >> > > > > > then decides independently which
> > > partitions
> > > > > it
> > > > > > > > should
> > > > > > > > > >> > > consume.
> > > > > > > > > >> > > > >> The
> > > > > > > > > >> > > > >> > > > > protocol
> > > > > > > > > >> > > > >> > > > > > provides a mechanism for the
> > coordinator
> > > to
> > > > > > > > validate
> > > > > > > > > >> that
> > > > > > > > > >> > > all
> > > > > > > > > >> > > > >> > > consumers
> > > > > > > > > >> > > > >> > > > > use
> > > > > > > > > >> > > > >> > > > > > the same assignment strategy, but it
> > does
> > > > not
> > > > > > > > ensure
> > > > > > > > > >> that
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> > > resulting
> > > > > > > > > >> > > > >> > > > > > assignment is "correct." This
> provides
> > a
> > > > > > powerful
> > > > > > > > > >> > capability
> > > > > > > > > >> > > > for
> > > > > > > > > >> > > > >> > > users
> > > > > > > > > >> > > > >> > > > to
> > > > > > > > > >> > > > >> > > > > > control the full data flow on the
> > client
> > > > > side.
> > > > > > > They
> > > > > > > > > >> > control
> > > > > > > > > >> > > > how
> > > > > > > > > >> > > > >> > data
> > > > > > > > > >> > > > >> > > is
> > > > > > > > > >> > > > >> > > > > > written to partitions through the
> > > > Partitioner
> > > > > > > > > interface
> > > > > > > > > >> > and
> > > > > > > > > >> > > > they
> > > > > > > > > >> > > > >> > > > control
> > > > > > > > > >> > > > >> > > > > > how data is consumed through the
> > > assignment
> > > > > > > > strategy,
> > > > > > > > > >> all
> > > > > > > > > >> > > > >> without
> > > > > > > > > >> > > > >> > > > > touching
> > > > > > > > > >> > > > >> > > > > > the server.
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > > > Of course nothing comes for free. In
> > > > > > particular,
> > > > > > > > this
> > > > > > > > > >> > change
> > > > > > > > > >> > > > >> > removes
> > > > > > > > > >> > > > >> > > > the
> > > > > > > > > >> > > > >> > > > > > ability of the coordinator to
> validate
> > > that
> > > > > > > commits
> > > > > > > > > are
> > > > > > > > > >> > made
> > > > > > > > > >> > > > by
> > > > > > > > > >> > > > >> > > > consumers
> > > > > > > > > >> > > > >> > > > > > who were assigned the respective
> > > partition.
> > > > > > This
> > > > > > > > > might
> > > > > > > > > >> not
> > > > > > > > > >> > > be
> > > > > > > > > >> > > > >> too
> > > > > > > > > >> > > > >> > bad
> > > > > > > > > >> > > > >> > > > > since
> > > > > > > > > >> > > > >> > > > > > we retain the ability to validate the
> > > > > > generation
> > > > > > > > id,
> > > > > > > > > >> but
> > > > > > > > > >> > it
> > > > > > > > > >> > > > is a
> > > > > > > > > >> > > > >> > > > > potential
> > > > > > > > > >> > > > >> > > > > > concern. We have considered
> alternative
> > > > > > protocols
> > > > > > > > > which
> > > > > > > > > >> > add
> > > > > > > > > >> > > a
> > > > > > > > > >> > > > >> > second
> > > > > > > > > >> > > > >> > > > > > round-trip to the protocol in order
> to
> > > give
> > > > > the
> > > > > > > > > >> > coordinator
> > > > > > > > > >> > > > the
> > > > > > > > > >> > > > >> > > ability
> > > > > > > > > >> > > > >> > > > > to
> > > > > > > > > >> > > > >> > > > > > confirm the assignment. As mentioned
> > > above,
> > > > > the
> > > > > > > > > >> > coordinator
> > > > > > > > > >> > > is
> > > > > > > > > >> > > > >> > > somewhat
> > > > > > > > > >> > > > >> > > > > > limited in what it can actually
> > validate,
> > > > but
> > > > > > > this
> > > > > > > > > >> would
> > > > > > > > > >> > > > return
> > > > > > > > > >> > > > >> its
> > > > > > > > > >> > > > >> > > > > ability
> > > > > > > > > >> > > > >> > > > > > to validate commits. The tradeoff is
> > that
> > > > it
> > > > > > > > > increases
> > > > > > > > > >> the
> > > > > > > > > >> > > > >> > protocol's
> > > > > > > > > >> > > > >> > > > > > complexity which means more ways for
> > the
> > > > > > protocol
> > > > > > > > to
> > > > > > > > > >> fail
> > > > > > > > > >> > > and
> > > > > > > > > >> > > > >> > > > > consequently
> > > > > > > > > >> > > > >> > > > > > more edge cases in the code.
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > > > It also misses an opportunity to
> > > generalize
> > > > > the
> > > > > > > > group
> > > > > > > > > >> > > > membership
> > > > > > > > > >> > > > >> > > > protocol
> > > > > > > > > >> > > > >> > > > > > for additional use cases. In fact,
> > after
> > > > > you've
> > > > > > > > gone
> > > > > > > > > to
> > > > > > > > > >> > the
> > > > > > > > > >> > > > >> trouble
> > > > > > > > > >> > > > >> > > of
> > > > > > > > > >> > > > >> > > > > > moving assignment to the client, the
> > main
> > > > > thing
> > > > > > > > that
> > > > > > > > > is
> > > > > > > > > >> > left
> > > > > > > > > >> > > > in
> > > > > > > > > >> > > > >> > this
> > > > > > > > > >> > > > >> > > > > > protocol is basically a general group
> > > > > > management
> > > > > > > > > >> > capability.
> > > > > > > > > >> > > > >> This
> > > > > > > > > >> > > > >> > is
> > > > > > > > > >> > > > >> > > > > > exactly what is needed for a few
> cases
> > > that
> > > > > are
> > > > > > > > > >> currently
> > > > > > > > > >> > > > under
> > > > > > > > > >> > > > >> > > > > discussion
> > > > > > > > > >> > > > >> > > > > > (e.g. copycat or single-writer
> > producer).
> > > > > We've
> > > > > > > > taken
> > > > > > > > > >> this
> > > > > > > > > >> > > > >> further
> > > > > > > > > >> > > > >> > > step
> > > > > > > > > >> > > > >> > > > > in
> > > > > > > > > >> > > > >> > > > > > the proposal and attempted to
> envision
> > > what
> > > > > > that
> > > > > > > > > >> general
> > > > > > > > > >> > > > >> protocol
> > > > > > > > > >> > > > >> > > might
> > > > > > > > > >> > > > >> > > > > > look like and how it could be used
> both
> > > by
> > > > > the
> > > > > > > > > consumer
> > > > > > > > > >> > and
> > > > > > > > > >> > > > for
> > > > > > > > > >> > > > >> > some
> > > > > > > > > >> > > > >> > > of
> > > > > > > > > >> > > > >> > > > > > these other cases.
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > > > Anyway, since time is running out on
> > the
> > > > new
> > > > > > > > > consumer,
> > > > > > > > > >> we
> > > > > > > > > >> > > have
> > > > > > > > > >> > > > >> > > perhaps
> > > > > > > > > >> > > > >> > > > > one
> > > > > > > > > >> > > > >> > > > > > last chance to consider a significant
> > > > change
> > > > > in
> > > > > > > the
> > > > > > > > > >> > protocol
> > > > > > > > > >> > > > >> like
> > > > > > > > > >> > > > >> > > this,
> > > > > > > > > >> > > > >> > > > > so
> > > > > > > > > >> > > > >> > > > > > have a look at the wiki and share
> your
> > > > > > thoughts.
> > > > > > > > I've
> > > > > > > > > >> no
> > > > > > > > > >> > > doubt
> > > > > > > > > >> > > > >> that
> > > > > > > > > >> > > > >> > > > some
> > > > > > > > > >> > > > >> > > > > > ideas seem clearer in my mind than
> they
> > > do
> > > > on
> > > > > > > > paper,
> > > > > > > > > so
> > > > > > > > > >> > ask
> > > > > > > > > >> > > > >> > questions
> > > > > > > > > >> > > > >> > > > if
> > > > > > > > > >> > > > >> > > > > > there is any confusion.
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > > > Thanks!
> > > > > > > > > >> > > > >> > > > > > Jason
> > > > > > > > > >> > > > >> > > > > >
> > > > > > > > > >> > > > >> > > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > > > --
> > > > > > > > > >> > > > >> > > > Thanks,
> > > > > > > > > >> > > > >> > > > Ewen
> > > > > > > > > >> > > > >> > > >
> > > > > > > > > >> > > > >> > >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > --
> > > > > > > > > >> > > > >> > Thanks,
> > > > > > > > > >> > > > >> > Ewen
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Thanks,
> > > > > > > > > > Neha
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Thanks,
> > > > > > > > > Neha
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Thanks,
> > > > > > > > Ewen
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Ewen
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>
>
>
> --
> Thanks,
> Neha
>

Reply via email to