Hi Rob, Piggy-back the rebalance partition info in HeartbeatResponse may cause inconsistency of the partition assignments to consumers with consecutive triggering of rebalances, since the coordinator no longer has a synchronization barrier any more for re-compute the distribution with a consistent view.
Guozhang On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers <robert.w.with...@gmail.com> wrote: > > > On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu <ba...@tinkerhq.com> > wrote: > > > > > >> On Jul 8, 2014, at 3:17 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> > >> Hi All, > >> > >> We have written a wiki a few weeks back proposing a single-threaded > ZK-free > >> consumer client design for 0.9: > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design > >> > >> We want to share some of the ideas that came up for this design to get > some > >> early feedback. The below discussion assumes you have read the above > wiki. > >> > >> *Offset Management* > >> > >> To make consumer clients ZK-free we need to move the offset management > >> utility from ZooKeeper into the Kafka servers, which then store offsets > as > >> a special log. Key-based log compaction will be used to keep this > >> ever-growing log's size bounded. Brokers who are responsible for the > offset > >> management for a given consumer group will be the leader of this special > >> offset log partition where the partition key will be consumer group > names. > >> On the consumer side, instead of talking to ZK for commit and read > offsets, > >> it talks to the servers with new offset commit and offset fetch request > >> types for offset management. This work has been done and will be > included > >> in the 0.8.2 release. Details of the implementation can be found in > >> KAFKA-1000 and this wiki: > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management > >> > >> *Group Membership Management* > >> > >> The next step is to move the membership management and rebalancing > utility > >> out of ZooKeeper as well. To do this we introduced a consumer > coordinator > >> hosted on the Kafka servers which is responsible for keeping track of > group > >> membership and consumption partition changes, and the corresponding > >> rebalancing process. > >> > >> *1. Failure Detection* > >> > >> More specifically, we will use a heartbeating protocol for consumer and > >> coordinator failure detections. The heartbeating protocol would be > similar > >> to the one ZK used, i.e. the consumer will set a session timeout value > to > >> the coordinator on startup, and keep sending heartbeats to the > coordinator > >> afterwards every session-timeout / heartbeat-frequency. The coordinator > >> will treat a consumer as failed when it has not heard from the consumer > >> after session-timeout, and the consumer will treat its coordinator as > >> failed after it has not heard its heartbeat responses after > session-timeout. > >> > >> One difference with ZK heartbeat protocol is that instead of fixing the > >> heartbeat-frequency as three, we make this value configurable in > consumer > >> clients. This is because the rebalancing process (we will discuss later) > >> latency is lower bounded by the heartbeat frequency , so we want this > value > >> to be large while not DDoSing the servers. > >> > >> *2. Consumer Rebalance* > >> > >> A bunch of events can trigger a rebalance process, 1) consumer failure, > 2) > >> new consumer joining group, 3) existing consumer change consume > >> topic/partitions, 4) partition change for the consuming topics. Once the > >> coordinator decides to trigger a rebalance it will notify the consumers > >> within to resend their topic/partition subscription information, and > then > >> assign existing partitions to these consumers. On the consumers end, > they > >> no long run any rebalance logic in a distributed manner but follows the > >> partition assignment it receives from the coordinator. Since the > consumers > >> can only be notified of a rebalance triggering via heartbeat responses, > the > >> rebalance latency is lower bounded by the heartbeat frequency; that is > why > >> we allow consumers to negotiate this value with the coordinator upon > >> registration, and we would like to hear people's thoughts about this > >> protocol. > > > > One question on this topic, my planned usage pattern is to have a high > number of consumers that are ephemeral. Imagine a consumer coming online, > consuming messages for a short duration, then disappearing forever. Is > this use case supported? > > > > If consumers are coming and going, there will be a high rate of rebalances > and that will really start throttling consumption at some point. You may > have a use that hits that detrimentally. A custom rebalance algorithm may > be able to rebalance a sub-set of the consumers and not inform others of a > need to rebalance them, which would ameliorate the cost. Still the > consumers that need to rebalance would pay. > > If the initial JoinGroupRequest was preserved, would it be possible to > send rebalanced partition information in the HeartbeatResponse, up front to > all (or a sub-set) consumers, and avoid the rejoin round trip? I mean, > push the rebalance and not pull it with a secondary JoinGroupRequest. This > would reduce cost and lower the throttle point, but poorly timed fetch > requests may fail. If the consumer were resilient to fetch failures, might > it work? > > > >> In addition, the rebalance logic in the new consumer will be > customizable > >> instead of hard-written in a per-topic round-robin manner. Users will be > >> able to write their own assignment class following a common interface, > and > >> consumers upon registering themselves will specify in their registration > >> request the assigner class name they wanted to use. If consumers within > the > >> same group specify different algorithms the registration will be > rejected. > >> We are not sure if it is the best way of customizing rebalance logic > yet, > >> so any feedbacks are more than welcome. > >> > >> For wildcard consumption, the consumers now will capture new topics that > >> are available for fetching through the topic metadata request. That is, > >> periodically the consumers will update its topic metadata for fetching, > and > >> if new topics are returned in the metadata response matching its > wildcard > >> regex, it will notify the coordinator to let it trigger a new rebalance > to > >> assign partitions for this new topic. > >> > >> There are some failure handling cases during the rebalancing process > >> discussed in the consumer rewrite design wiki. We would encourage > people to > >> read it and let us know if there are any other corner cases not covered. > >> > >> Moving forward, we are thinking about making the coordinator to handle > both > >> the group management and offset management for the given groups, i.e. > the > >> leader of the offset log partition will naturally become the > coordinator of > >> the corresponding consumer. This allows the coordinator to reject offset > >> commit requests if its sender is not part of the group. To do so a > consumer > >> id and generation id are applied to control the group member > generations. > >> Details of this design is included in the write design wiki page, and > again > >> any feedbacks are appreciated. > >> > >> *3. Non-blocking Network IO* > >> > >> With a single-threaded consumer, we will instead use a non-blocking > network > >> IO (i.e. java.nio) to do fetching loops just as the new producer does > for > >> asynchronous sending data. In terms of implementation, we will let the > new > >> consumer/producer clients be based on a common non-blocking IO thread > >> class. Details of this refactoring can be found in KAFKA-1316. > >> > >> *4. Consumer API* > >> > >> As a result of the single-threaded non-blocking network IO > implementation, > >> the new consumer API will no longer base on a blocking stream iterator > >> interface but a poll(timeout) interface. In addition, the consumer APIs > are > >> designed to be flexible so that people can choose to either use the > default > >> offset and group membership management utilities mentioned above or > >> implement their own offset/group management logic. For example, for > >> scenarios that needs fixed partition assignment or that prefer to store > >> offsets locally in a data store, the new consumer APIs would make it > much > >> easier to customize. Details of the API design can be found in > KAFKA-1328. > >> A question to think about is are there any other potential use cases > that > >> can not be easily supported by this API. > >> > >> Cheers, > >> Guozhang > > > -- -- Guozhang