Hi Jay, For sure and a capability protocol adds even more. It's down to cost/benefit, a nice place to be.
- Rob > On Jul 22, 2014, at 10:46 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > Hey Robert, > > I think the issue is that you lose much of the simplification if you > still need to heart beat and you need a new commit notification > message. I think that would make this proposal more complex than the > current one. > > -Jay > > On Mon, Jul 21, 2014 at 6:44 PM, Robert Withers > <robert.w.with...@gmail.com> wrote: >> Thanks, Jay, for the good summary. Regarding point 2, I would think the >> heartbeat would still be desired, to give control over liveness detection >> parameters and to directly inform clients when gaining or losing a partition >> (especially when gaining a partition). There would be no barrier and the >> rebalancer would be an offline scheduler, issuing SwitchPartition commands. >> The evaluation of a SwitchPartition command would await the consumer losing >> a partition to commit offset and any local commit work needed before >> confirming completion to the co-ordinator, which would then inform the new >> consumer and ISR brokers about the partition gain. Broker security would be >> the master record of love assignments. >> >> Thanks, >> Rob >> >>> On Jul 21, 2014, at 6:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >>> >>> This thread is a bit long, but let me see if I can restate it >>> correctly (not sure I fully follow). >>> >>> There are two suggestions: >>> 1. Allow partial rebalances that move just some partitions. I.e. if a >>> consumer fails and has only one partition only one other consumer >>> should be effected (the one who picks up the new partition). If there >>> are many partitions to be reassigned there will obviously be a >>> tradeoff between impacting all consumers and balancing load evenly. >>> I.e. if you moved all load to one other consumer that would cause >>> little rebalancing interruption but poor load balancing. >>> 2. Have the co-ordinator communicate the assignments to the brokers >>> rather than to the client directly. This could potentially simplify >>> the consumer. Perhaps it would be possible to have the leader track >>> liveness using the fetch requests rather than needing an artificial >>> heartbeat. >>> >>> These are interesting ideas. >>> >>> -Jay >>> >>> >>> >>>> On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang <wangg...@gmail.com> wrote: >>>> Hello Rob, >>>> >>>> If I get your idea right, the idea is that if the rebalance only changes >>>> the ownership of a few consumers in the group, the coordinator can just >>>> sync with them and do not interrupt with other consumers. >>>> >>>> I think this approach may work. However it will likely complicates the >>>> logic of coordinator after we sketch out all the details since the >>>> rebalance results is basically depend on two variables: 1) partitions for >>>> the subscribed topics, 2) consumers inside the group, hence following this >>>> approach by the time the coordinator decides to trigger a rebalance, it >>>> must correctly keep track of which variable changes triggers the current >>>> rebalance process; on the other hand, the normal rebalance process even >>>> with a global barrier should usually be very fast, with a few hundreds of >>>> millis. So I am not sure if this is a worthy optimization that we would >>>> want for now. What do you think? >>>> >>>> Guozhang >>>> >>>> >>>> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers >>>> <robert.w.with...@gmail.com >>>>> wrote: >>>> >>>>> Lock is a bad way to say it; a barrier is better. I don't think what I am >>>>> saying is even a barrier, since the rebalance would just need to recompute >>>>> a rebalance schedule and submit it. The only processing delay is to allow >>>>> a soft remove to let the client cleanup, before you turn on the new guy, >>>>> so >>>>> it lags a bit. Do you think this could this work? >>>>> >>>>> Thanks, >>>>> Rob >>>>> >>>>>>> On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hi Guozhang, >>>>>> >>>>>> Thank you for considering my suggestions. The security layer sounds >>>>> like the right facet to design for these sorts of capabilities. Have you >>>>> considered a chained ocap security model for the broker using hash tokens? >>>>> This would provide for per-partition read/write capabilities with QoS >>>>> context including leases, revocation, debug level and monitoring. >>>>> Overkill >>>>> disappears as no domain specific info needs to be stored at the brokers, >>>>> like consumer/partition assignments. The read ocap for consumer 7/topic >>>>> bingo/partition 131 could be revoked at the brokers for a partition and >>>>> subsequent reads would fail the fetch for requests with that ocap token. >>>>> You could also dynamically change the log level for a specific >>>>> consumer/partition. >>>>>> >>>>>> There are advantages we could discuss to having finer grained control. >>>>> Consider that scheduled partition rebalancing could be implemented with no >>>>> pauses from the perspective of the consumer threads; it looks like single >>>>> partition lag, as the offset commit occurs before rotation, with no lag to >>>>> non-rebalanced partitions: rebalance 1 partition per second so as to creep >>>>> load to a newbie consumer. It would eliminate a global read lock and even >>>>> the internal Kafka consumer would never block on IO protocol other than >>>>> the >>>>> normal fetch request (and the initial join group request). >>>>>> >>>>>> A global lock acquired through a pull protocol (HeartbeatRequest >>>>> followed by a JoinGroupRequest) for all live consumers is a much bigger >>>>> lock than security-based push protocol, as I assume the coordinator will >>>>> have open sockets to all brokers in order to reach out as needed. As >>>>> well, >>>>> each lock would be independent between partitions and be with only those >>>>> brokers in ISR for a given partition. It is a much smaller lock. >>>>>> >>>>>> I had some time to consider my suggestion that it be viewed as a >>>>> relativistic frame of reference. Consider the model where each dimension >>>>> of the frame of reference for each consumer is each partition, actually a >>>>> sub-space with the dimensionality of the replication factor, but with a >>>>> single leader election, so consider it 1 dimension. The total >>>>> dimensionality of the consumers frame of reference is the number of >>>>> partitions, but only assigned partitions are open to a given consumers >>>>> viewpoint. The offset is the partition dimension's coordinate and only >>>>> consumers with an open dimension can translate the offset. A rebalance >>>>> opens or closes a dimension for a given consumer and can be viewed as a >>>>> rotation. Could Kafka consumption and rebalance (and ISR leader election) >>>>> be reduced to matrix operations? >>>>>> >>>>>> Rob >>>>>> >>>>>>> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Rob, >>>>>>> >>>>>>> Sorry for the late reply. >>>>>>> >>>>>>> If I understand your approach correctly, it requires all brokers to >>>>>>> remember the partition assignment of each consumer in order to decide >>>>>>> whether or not authorizing the fetch request, correct? If we are indeed >>>>>>> going to do such authorization for the security project then maybe it >>>>> is a >>>>>>> good way to go, but otherwise might be an overkill to just support finer >>>>>>> grained partition assignment. In addition, instead of requiring a round >>>>>>> trip between the coordinator and the consumers for the synchronization >>>>>>> barrier, now the coordinator needs to wait for a round trip between >>>>> itself >>>>>>> and other brokers before it can return the join-group request, right? >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers < >>>>> robert.w.with...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Guozhang, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Currently, the brokers do not know which high-level consumers are >>>>> reading >>>>>>>> which partitions and it is the rebalance between the consumers and the >>>>>>>> coordinator which would authorize a consumer to fetch a particular >>>>>>>> partition, I think. Does this mean that when a rebalance occurs, all >>>>>>>> consumers must send a JoinGroupRequest and that the coordinator will >>>>> not >>>>>>>> respond to any consumers until all consumers have sent the >>>>>>>> JoinGroupRequest, to enable the synchronization barrier? That has the >>>>>>>> potential to be a sizable global delay. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On the assumption that there is only one coordinator for a group, why >>>>>>>> couldn't the synchronization barrier be per partition and internal to >>>>> kafka >>>>>>>> and mostly not involve the consumers, other than a chance for >>>>> offsetCommit >>>>>>>> by the consumer losing a partition? If the brokers have session >>>>> state and >>>>>>>> knows the new assignment before the consumer is notified with a >>>>>>>> HeartbeatResponse, it could fail the fetch request from that consumer >>>>> for >>>>>>>> an invalidly assigned partition. The consumer could take an invalid >>>>>>>> partition failure of the fetch request as if it were a >>>>> HeartbeatResponse >>>>>>>> partition removal. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> The only gap seems to be: when does a consumer that is losing a >>>>> partition >>>>>>>> get a chance to commit its offset? If there were a >>>>>>>> PartitionCommittedNotification message that a consumer could send to >>>>> the >>>>>>>> coordinator after committing its offsets, then the coordinator could >>>>> send >>>>>>>> the add partition HeartbeatResponse, after receiving the >>>>>>>> PartitionCommittedNotification, to the consumer gaining the partition >>>>> and >>>>>>>> the offset management is stable. The advantage is that none of the >>>>> other >>>>>>>> consumers would be paused on any other partitions. Only partitions >>>>> being >>>>>>>> rebalanced would see any consumption pause. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> So, something like: >>>>>>>> >>>>>>>> 1. 3 consumers running with 12 partitions balanced, 4 each >>>>>>>> >>>>>>>> 2. New consumer starts and sends JoinGroupRequest to coordinator >>>>>>>> >>>>>>>> 3. Coordinator computes rebalance with 4 consumers: each existing >>>>>>>> consumer will lose a partition assigned to the new consumer >>>>>>>> >>>>>>>> 4. Coordinator informs all live brokers of partition reassignments >>>>>>>> >>>>>>>> 5. Brokers receive reassignments, starts failing unauthorized fetch >>>>>>>> requests and acks back to the coordinator >>>>>>>> >>>>>>>> 6. Coordinator receives all broker acks and sends HeartbeatResponses >>>>> with >>>>>>>> partition removals to existing consumers and awaits >>>>>>>> PartitionCommittedNotifications from consumers losing partitions. >>>>>>>> >>>>>>>> 7. Existing consumers can continue to fetch messages from correctly >>>>>>>> assigned partitions >>>>>>>> >>>>>>>> 8. When an existing consumer fails a fetch for a partition or gets a >>>>>>>> HeartbeatResponse with a partition removal, it would commitOffsets for >>>>> that >>>>>>>> partition and then send a PartitionCommittedNotification to the >>>>> coordinator. >>>>>>>> >>>>>>>> 9. As the Coordinator receives the PartitionCommittedNotification, >>>>> for a >>>>>>>> particular partition from an existing consumer, it sends the >>>>> addPartition >>>>>>>> to consumer 4, in a HeartbeatResponse and the new consumer can start >>>>>>>> fetching that partition. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> If a consumer drops HeartbeatRequests within a session timeout, the >>>>>>>> coordinator would inform the brokers and they would fail fetchRequests >>>>> for >>>>>>>> those partitions from that consumer. There is no chance to send >>>>>>>> removePartitions since no Heartbeat is occurring, but the addPartitions >>>>>>>> could be sent and the offset is what it is. This seems no different >>>>> than >>>>>>>> this sort of failure, today. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Instead of a global synchronization barrier isn’t it possible to have >>>>> an >>>>>>>> incremental per-partition synchronization barrier? The brokers would >>>>> have >>>>>>>> to be aware of this. I think of it as relativistic from each >>>>> acceleration >>>>>>>> frame of reference, which is each consumer: event horizons. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Rob >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -----Original Message----- >>>>>>>> From: Guozhang Wang [mailto:wangg...@gmail.com] >>>>>>>> Sent: Wednesday, July 16, 2014 9:20 AM >>>>>>>> To: users@kafka.apache.org >>>>>>>> Subject: Re: New Consumer Design >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Hi Rob, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Piggy-back the rebalance partition info in HeartbeatResponse may cause >>>>>>>> inconsistency of the partition assignments to consumers with >>>>> consecutive >>>>>>>> triggering of rebalances, since the coordinator no longer has a >>>>>>>> synchronization barrier any more for re-compute the distribution with a >>>>>>>> consistent view. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto: >>>>>>>> robert.w.with...@gmail.com> robert.w.with...@gmail.com> >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto: >>>>>>>> ba...@tinkerhq.com> ba...@tinkerhq.com> >>>>>>>> >>>>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto: >>>>>>>> wangg...@gmail.com> wangg...@gmail.com> wrote: >>>>>>>> >>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>> >>>>>>>> >>>>>>>>>>> We have written a wiki a few weeks back proposing a single-threaded >>>>>>>> >>>>>>>>> ZK-free >>>>>>>> >>>>>>>>>>> consumer client design for 0.9: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> < >>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R >>>>>>>> >>>>>>>>> ewrite+Design >>>>>>>> >>>>>>>> >>>>>>>>>>> We want to share some of the ideas that came up for this design to >>>>>>>> >>>>>>>>>>> get >>>>>>>> >>>>>>>>> some >>>>>>>> >>>>>>>>>>> early feedback. The below discussion assumes you have read the >>>>>>>> >>>>>>>>>>> above >>>>>>>> >>>>>>>>> wiki. >>>>>>>> >>>>>>>> >>>>>>>>>>> *Offset Management* >>>>>>>> >>>>>>>> >>>>>>>>>>> To make consumer clients ZK-free we need to move the offset >>>>>>>> >>>>>>>>>>> management utility from ZooKeeper into the Kafka servers, which >>>>>>>> >>>>>>>>>>> then store offsets >>>>>>>> >>>>>>>>> as >>>>>>>> >>>>>>>>>>> a special log. Key-based log compaction will be used to keep this >>>>>>>> >>>>>>>>>>> ever-growing log's size bounded. Brokers who are responsible for >>>>>>>> >>>>>>>>>>> the >>>>>>>> >>>>>>>>> offset >>>>>>>> >>>>>>>>>>> management for a given consumer group will be the leader of this >>>>>>>> >>>>>>>>>>> special offset log partition where the partition key will be >>>>>>>> >>>>>>>>>>> consumer group >>>>>>>> >>>>>>>>> names. >>>>>>>> >>>>>>>>>>> On the consumer side, instead of talking to ZK for commit and read >>>>>>>> >>>>>>>>> offsets, >>>>>>>> >>>>>>>>>>> it talks to the servers with new offset commit and offset fetch >>>>>>>> >>>>>>>>>>> request types for offset management. This work has been done and >>>>>>>> >>>>>>>>>>> will be >>>>>>>> >>>>>>>>> included >>>>>>>> >>>>>>>>>>> in the 0.8.2 release. Details of the implementation can be found in >>>>>>>> >>>>>>>>>>> KAFKA-1000 and this wiki: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> < >>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off >>>>>>>> >>>>>>>>> set+Management >>>>>>>> >>>>>>>> >>>>>>>>>>> *Group Membership Management* >>>>>>>> >>>>>>>> >>>>>>>>>>> The next step is to move the membership management and rebalancing >>>>>>>> >>>>>>>>> utility >>>>>>>> >>>>>>>>>>> out of ZooKeeper as well. To do this we introduced a consumer >>>>>>>> >>>>>>>>> coordinator >>>>>>>> >>>>>>>>>>> hosted on the Kafka servers which is responsible for keeping track >>>>>>>> >>>>>>>>>>> of >>>>>>>> >>>>>>>>> group >>>>>>>> >>>>>>>>>>> membership and consumption partition changes, and the corresponding >>>>>>>> >>>>>>>>>>> rebalancing process. >>>>>>>> >>>>>>>> >>>>>>>>>>> *1. Failure Detection* >>>>>>>> >>>>>>>> >>>>>>>>>>> More specifically, we will use a heartbeating protocol for consumer >>>>>>>> >>>>>>>>>>> and coordinator failure detections. The heartbeating protocol would >>>>>>>> >>>>>>>>>>> be >>>>>>>> >>>>>>>>> similar >>>>>>>> >>>>>>>>>>> to the one ZK used, i.e. the consumer will set a session timeout >>>>>>>> >>>>>>>>>>> value >>>>>>>> >>>>>>>>> to >>>>>>>> >>>>>>>>>>> the coordinator on startup, and keep sending heartbeats to the >>>>>>>> >>>>>>>>> coordinator >>>>>>>> >>>>>>>>>>> afterwards every session-timeout / heartbeat-frequency. The >>>>>>>> >>>>>>>>>>> coordinator will treat a consumer as failed when it has not heard >>>>>>>> >>>>>>>>>>> from the consumer after session-timeout, and the consumer will >>>>>>>> >>>>>>>>>>> treat its coordinator as failed after it has not heard its >>>>>>>> >>>>>>>>>>> heartbeat responses after >>>>>>>> >>>>>>>>> session-timeout. >>>>>>>> >>>>>>>> >>>>>>>>>>> One difference with ZK heartbeat protocol is that instead of fixing >>>>>>>> >>>>>>>>>>> the heartbeat-frequency as three, we make this value configurable >>>>>>>> >>>>>>>>>>> in >>>>>>>> >>>>>>>>> consumer >>>>>>>> >>>>>>>>>>> clients. This is because the rebalancing process (we will discuss >>>>>>>> >>>>>>>>>>> later) latency is lower bounded by the heartbeat frequency , so we >>>>>>>> >>>>>>>>>>> want this >>>>>>>> >>>>>>>>> value >>>>>>>> >>>>>>>>>>> to be large while not DDoSing the servers. >>>>>>>> >>>>>>>> >>>>>>>>>>> *2. Consumer Rebalance* >>>>>>>> >>>>>>>> >>>>>>>>>>> A bunch of events can trigger a rebalance process, 1) consumer >>>>>>>> >>>>>>>>>>> failure, >>>>>>>> >>>>>>>>> 2) >>>>>>>> >>>>>>>>>>> new consumer joining group, 3) existing consumer change consume >>>>>>>> >>>>>>>>>>> topic/partitions, 4) partition change for the consuming topics. >>>>>>>> >>>>>>>>>>> Once the coordinator decides to trigger a rebalance it will notify >>>>>>>> >>>>>>>>>>> the consumers within to resend their topic/partition subscription >>>>>>>> >>>>>>>>>>> information, and >>>>>>>> >>>>>>>>> then >>>>>>>> >>>>>>>>>>> assign existing partitions to these consumers. On the consumers >>>>>>>> >>>>>>>>>>> end, >>>>>>>> >>>>>>>>> they >>>>>>>> >>>>>>>>>>> no long run any rebalance logic in a distributed manner but follows >>>>>>>> >>>>>>>>>>> the partition assignment it receives from the coordinator. Since >>>>>>>> >>>>>>>>>>> the >>>>>>>> >>>>>>>>> consumers >>>>>>>> >>>>>>>>>>> can only be notified of a rebalance triggering via heartbeat >>>>>>>> >>>>>>>>>>> responses, >>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>>>>> rebalance latency is lower bounded by the heartbeat frequency; that >>>>>>>> >>>>>>>>>>> is >>>>>>>> >>>>>>>>> why >>>>>>>> >>>>>>>>>>> we allow consumers to negotiate this value with the coordinator >>>>>>>> >>>>>>>>>>> upon registration, and we would like to hear people's thoughts >>>>>>>> >>>>>>>>>>> about this protocol. >>>>>>>> >>>>>>>> >>>>>>>>>> One question on this topic, my planned usage pattern is to have a >>>>>>>> >>>>>>>>>> high >>>>>>>> >>>>>>>>> number of consumers that are ephemeral. Imagine a consumer coming >>>>>>>> >>>>>>>>> online, consuming messages for a short duration, then disappearing >>>>>>>> >>>>>>>>> forever. Is this use case supported? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> If consumers are coming and going, there will be a high rate of >>>>>>>> >>>>>>>>> rebalances and that will really start throttling consumption at some >>>>>>>> >>>>>>>>> point. You may have a use that hits that detrimentally. A custom >>>>>>>> >>>>>>>>> rebalance algorithm may be able to rebalance a sub-set of the >>>>>>>> >>>>>>>>> consumers and not inform others of a need to rebalance them, which >>>>>>>> >>>>>>>>> would ameliorate the cost. Still the consumers that need to rebalance >>>>>>>> would pay. >>>>>>>> >>>>>>>> >>>>>>>>> If the initial JoinGroupRequest was preserved, would it be possible to >>>>>>>> >>>>>>>>> send rebalanced partition information in the HeartbeatResponse, up >>>>>>>> >>>>>>>>> front to all (or a sub-set) consumers, and avoid the rejoin round >>>>>>>> >>>>>>>>> trip? I mean, push the rebalance and not pull it with a secondary >>>>>>>> >>>>>>>>> JoinGroupRequest. This would reduce cost and lower the throttle >>>>>>>> >>>>>>>>> point, but poorly timed fetch requests may fail. If the consumer were >>>>>>>> >>>>>>>>> resilient to fetch failures, might it work? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>>>> In addition, the rebalance logic in the new consumer will be >>>>>>>> >>>>>>>>> customizable >>>>>>>> >>>>>>>>>>> instead of hard-written in a per-topic round-robin manner. Users >>>>>>>> >>>>>>>>>>> will be able to write their own assignment class following a common >>>>>>>> >>>>>>>>>>> interface, >>>>>>>> >>>>>>>>> and >>>>>>>> >>>>>>>>>>> consumers upon registering themselves will specify in their >>>>>>>> >>>>>>>>>>> registration request the assigner class name they wanted to use. If >>>>>>>> >>>>>>>>>>> consumers within >>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>>>>> same group specify different algorithms the registration will be >>>>>>>> >>>>>>>>> rejected. >>>>>>>> >>>>>>>>>>> We are not sure if it is the best way of customizing rebalance >>>>>>>> >>>>>>>>>>> logic >>>>>>>> >>>>>>>>> yet, >>>>>>>> >>>>>>>>>>> so any feedbacks are more than welcome. >>>>>>>> >>>>>>>> >>>>>>>>>>> For wildcard consumption, the consumers now will capture new topics >>>>>>>> >>>>>>>>>>> that are available for fetching through the topic metadata request. >>>>>>>> >>>>>>>>>>> That is, periodically the consumers will update its topic metadata >>>>>>>> >>>>>>>>>>> for fetching, >>>>>>>> >>>>>>>>> and >>>>>>>> >>>>>>>>>>> if new topics are returned in the metadata response matching its >>>>>>>> >>>>>>>>> wildcard >>>>>>>> >>>>>>>>>>> regex, it will notify the coordinator to let it trigger a new >>>>>>>> >>>>>>>>>>> rebalance >>>>>>>> >>>>>>>>> to >>>>>>>> >>>>>>>>>>> assign partitions for this new topic. >>>>>>>> >>>>>>>> >>>>>>>>>>> There are some failure handling cases during the rebalancing >>>>>>>> >>>>>>>>>>> process discussed in the consumer rewrite design wiki. We would >>>>>>>> >>>>>>>>>>> encourage >>>>>>>> >>>>>>>>> people to >>>>>>>> >>>>>>>>>>> read it and let us know if there are any other corner cases not >>>>>>>> covered. >>>>>>>> >>>>>>>> >>>>>>>>>>> Moving forward, we are thinking about making the coordinator to >>>>>>>> >>>>>>>>>>> handle >>>>>>>> >>>>>>>>> both >>>>>>>> >>>>>>>>>>> the group management and offset management for the given groups, >>>>> i.e. >>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>>>>> leader of the offset log partition will naturally become the >>>>>>>> >>>>>>>>> coordinator of >>>>>>>> >>>>>>>>>>> the corresponding consumer. This allows the coordinator to reject >>>>>>>> >>>>>>>>>>> offset commit requests if its sender is not part of the group. To >>>>>>>> >>>>>>>>>>> do so a >>>>>>>> >>>>>>>>> consumer >>>>>>>> >>>>>>>>>>> id and generation id are applied to control the group member >>>>>>>> >>>>>>>>> generations. >>>>>>>> >>>>>>>>>>> Details of this design is included in the write design wiki page, >>>>>>>> >>>>>>>>>>> and >>>>>>>> >>>>>>>>> again >>>>>>>> >>>>>>>>>>> any feedbacks are appreciated. >>>>>>>> >>>>>>>> >>>>>>>>>>> *3. Non-blocking Network IO* >>>>>>>> >>>>>>>> >>>>>>>>>>> With a single-threaded consumer, we will instead use a non-blocking >>>>>>>> >>>>>>>>> network >>>>>>>> >>>>>>>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer >>>>>>>> >>>>>>>>>>> does >>>>>>>> >>>>>>>>> for >>>>>>>> >>>>>>>>>>> asynchronous sending data. In terms of implementation, we will let >>>>>>>> >>>>>>>>>>> the >>>>>>>> >>>>>>>>> new >>>>>>>> >>>>>>>>>>> consumer/producer clients be based on a common non-blocking IO >>>>>>>> >>>>>>>>>>> thread class. Details of this refactoring can be found in >>>>> KAFKA-1316. >>>>>>>> >>>>>>>> >>>>>>>>>>> *4. Consumer API* >>>>>>>> >>>>>>>> >>>>>>>>>>> As a result of the single-threaded non-blocking network IO >>>>>>>> >>>>>>>>> implementation, >>>>>>>> >>>>>>>>>>> the new consumer API will no longer base on a blocking stream >>>>>>>> >>>>>>>>>>> iterator interface but a poll(timeout) interface. In addition, the >>>>>>>> >>>>>>>>>>> consumer APIs >>>>>>>> >>>>>>>>> are >>>>>>>> >>>>>>>>>>> designed to be flexible so that people can choose to either use the >>>>>>>> >>>>>>>>> default >>>>>>>> >>>>>>>>>>> offset and group membership management utilities mentioned above or >>>>>>>> >>>>>>>>>>> implement their own offset/group management logic. For example, for >>>>>>>> >>>>>>>>>>> scenarios that needs fixed partition assignment or that prefer to >>>>>>>> >>>>>>>>>>> store offsets locally in a data store, the new consumer APIs would >>>>>>>> >>>>>>>>>>> make it >>>>>>>> >>>>>>>>> much >>>>>>>> >>>>>>>>>>> easier to customize. Details of the API design can be found in >>>>>>>> >>>>>>>>> KAFKA-1328. >>>>>>>> >>>>>>>>>>> A question to think about is are there any other potential use >>>>>>>> >>>>>>>>>>> cases >>>>>>>> >>>>>>>>> that >>>>>>>> >>>>>>>>>>> can not be easily supported by this API. >>>>>>>> >>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> -- Guozhang >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>> >>>> >>>> >>>> -- >>>> -- Guozhang