Hi All, We have written a wiki a few weeks back proposing a single-threaded ZK-free consumer client design for 0.9:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design We want to share some of the ideas that came up for this design to get some early feedback. The below discussion assumes you have read the above wiki. *Offset Management* To make consumer clients ZK-free we need to move the offset management utility from ZooKeeper into the Kafka servers, which then store offsets as a special log. Key-based log compaction will be used to keep this ever-growing log's size bounded. Brokers who are responsible for the offset management for a given consumer group will be the leader of this special offset log partition where the partition key will be consumer group names. On the consumer side, instead of talking to ZK for commit and read offsets, it talks to the servers with new offset commit and offset fetch request types for offset management. This work has been done and will be included in the 0.8.2 release. Details of the implementation can be found in KAFKA-1000 and this wiki: https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management *Group Membership Management* The next step is to move the membership management and rebalancing utility out of ZooKeeper as well. To do this we introduced a consumer coordinator hosted on the Kafka servers which is responsible for keeping track of group membership and consumption partition changes, and the corresponding rebalancing process. *1. Failure Detection* More specifically, we will use a heartbeating protocol for consumer and coordinator failure detections. The heartbeating protocol would be similar to the one ZK used, i.e. the consumer will set a session timeout value to the coordinator on startup, and keep sending heartbeats to the coordinator afterwards every session-timeout / heartbeat-frequency. The coordinator will treat a consumer as failed when it has not heard from the consumer after session-timeout, and the consumer will treat its coordinator as failed after it has not heard its heartbeat responses after session-timeout. One difference with ZK heartbeat protocol is that instead of fixing the heartbeat-frequency as three, we make this value configurable in consumer clients. This is because the rebalancing process (we will discuss later) latency is lower bounded by the heartbeat frequency , so we want this value to be large while not DDoSing the servers. *2. Consumer Rebalance* A bunch of events can trigger a rebalance process, 1) consumer failure, 2) new consumer joining group, 3) existing consumer change consume topic/partitions, 4) partition change for the consuming topics. Once the coordinator decides to trigger a rebalance it will notify the consumers within to resend their topic/partition subscription information, and then assign existing partitions to these consumers. On the consumers end, they no long run any rebalance logic in a distributed manner but follows the partition assignment it receives from the coordinator. Since the consumers can only be notified of a rebalance triggering via heartbeat responses, the rebalance latency is lower bounded by the heartbeat frequency; that is why we allow consumers to negotiate this value with the coordinator upon registration, and we would like to hear people's thoughts about this protocol. In addition, the rebalance logic in the new consumer will be customizable instead of hard-written in a per-topic round-robin manner. Users will be able to write their own assignment class following a common interface, and consumers upon registering themselves will specify in their registration request the assigner class name they wanted to use. If consumers within the same group specify different algorithms the registration will be rejected. We are not sure if it is the best way of customizing rebalance logic yet, so any feedbacks are more than welcome. For wildcard consumption, the consumers now will capture new topics that are available for fetching through the topic metadata request. That is, periodically the consumers will update its topic metadata for fetching, and if new topics are returned in the metadata response matching its wildcard regex, it will notify the coordinator to let it trigger a new rebalance to assign partitions for this new topic. There are some failure handling cases during the rebalancing process discussed in the consumer rewrite design wiki. We would encourage people to read it and let us know if there are any other corner cases not covered. Moving forward, we are thinking about making the coordinator to handle both the group management and offset management for the given groups, i.e. the leader of the offset log partition will naturally become the coordinator of the corresponding consumer. This allows the coordinator to reject offset commit requests if its sender is not part of the group. To do so a consumer id and generation id are applied to control the group member generations. Details of this design is included in the write design wiki page, and again any feedbacks are appreciated. *3. Non-blocking Network IO* With a single-threaded consumer, we will instead use a non-blocking network IO (i.e. java.nio) to do fetching loops just as the new producer does for asynchronous sending data. In terms of implementation, we will let the new consumer/producer clients be based on a common non-blocking IO thread class. Details of this refactoring can be found in KAFKA-1316. *4. Consumer API* As a result of the single-threaded non-blocking network IO implementation, the new consumer API will no longer base on a blocking stream iterator interface but a poll(timeout) interface. In addition, the consumer APIs are designed to be flexible so that people can choose to either use the default offset and group membership management utilities mentioned above or implement their own offset/group management logic. For example, for scenarios that needs fixed partition assignment or that prefer to store offsets locally in a data store, the new consumer APIs would make it much easier to customize. Details of the API design can be found in KAFKA-1328. A question to think about is are there any other potential use cases that can not be easily supported by this API. Cheers, Guozhang