On Jul 8, 2014, at 3:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Hi All, > > We have written a wiki a few weeks back proposing a single-threaded ZK-free > consumer client design for 0.9: > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design > > We want to share some of the ideas that came up for this design to get some > early feedback. The below discussion assumes you have read the above wiki. > > *Offset Management* > > To make consumer clients ZK-free we need to move the offset management > utility from ZooKeeper into the Kafka servers, which then store offsets as > a special log. Key-based log compaction will be used to keep this > ever-growing log's size bounded. Brokers who are responsible for the offset > management for a given consumer group will be the leader of this special > offset log partition where the partition key will be consumer group names. > On the consumer side, instead of talking to ZK for commit and read offsets, > it talks to the servers with new offset commit and offset fetch request > types for offset management. This work has been done and will be included > in the 0.8.2 release. Details of the implementation can be found in > KAFKA-1000 and this wiki: > > https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management > > *Group Membership Management* > > The next step is to move the membership management and rebalancing utility > out of ZooKeeper as well. To do this we introduced a consumer coordinator > hosted on the Kafka servers which is responsible for keeping track of group > membership and consumption partition changes, and the corresponding > rebalancing process. > > *1. Failure Detection* > > More specifically, we will use a heartbeating protocol for consumer and > coordinator failure detections. The heartbeating protocol would be similar > to the one ZK used, i.e. the consumer will set a session timeout value to > the coordinator on startup, and keep sending heartbeats to the coordinator > afterwards every session-timeout / heartbeat-frequency. The coordinator > will treat a consumer as failed when it has not heard from the consumer > after session-timeout, and the consumer will treat its coordinator as > failed after it has not heard its heartbeat responses after session-timeout. > > One difference with ZK heartbeat protocol is that instead of fixing the > heartbeat-frequency as three, we make this value configurable in consumer > clients. This is because the rebalancing process (we will discuss later) > latency is lower bounded by the heartbeat frequency , so we want this value > to be large while not DDoSing the servers. > > *2. Consumer Rebalance* > > A bunch of events can trigger a rebalance process, 1) consumer failure, 2) > new consumer joining group, 3) existing consumer change consume > topic/partitions, 4) partition change for the consuming topics. Once the > coordinator decides to trigger a rebalance it will notify the consumers > within to resend their topic/partition subscription information, and then > assign existing partitions to these consumers. On the consumers end, they > no long run any rebalance logic in a distributed manner but follows the > partition assignment it receives from the coordinator. Since the consumers > can only be notified of a rebalance triggering via heartbeat responses, the > rebalance latency is lower bounded by the heartbeat frequency; that is why > we allow consumers to negotiate this value with the coordinator upon > registration, and we would like to hear people's thoughts about this > protocol. One question on this topic, my planned usage pattern is to have a high number of consumers that are ephemeral. Imagine a consumer coming online, consuming messages for a short duration, then disappearing forever. Is this use case supported? > In addition, the rebalance logic in the new consumer will be customizable > instead of hard-written in a per-topic round-robin manner. Users will be > able to write their own assignment class following a common interface, and > consumers upon registering themselves will specify in their registration > request the assigner class name they wanted to use. If consumers within the > same group specify different algorithms the registration will be rejected. > We are not sure if it is the best way of customizing rebalance logic yet, > so any feedbacks are more than welcome. > > For wildcard consumption, the consumers now will capture new topics that > are available for fetching through the topic metadata request. That is, > periodically the consumers will update its topic metadata for fetching, and > if new topics are returned in the metadata response matching its wildcard > regex, it will notify the coordinator to let it trigger a new rebalance to > assign partitions for this new topic. > > There are some failure handling cases during the rebalancing process > discussed in the consumer rewrite design wiki. We would encourage people to > read it and let us know if there are any other corner cases not covered. > > Moving forward, we are thinking about making the coordinator to handle both > the group management and offset management for the given groups, i.e. the > leader of the offset log partition will naturally become the coordinator of > the corresponding consumer. This allows the coordinator to reject offset > commit requests if its sender is not part of the group. To do so a consumer > id and generation id are applied to control the group member generations. > Details of this design is included in the write design wiki page, and again > any feedbacks are appreciated. > > *3. Non-blocking Network IO* > > With a single-threaded consumer, we will instead use a non-blocking network > IO (i.e. java.nio) to do fetching loops just as the new producer does for > asynchronous sending data. In terms of implementation, we will let the new > consumer/producer clients be based on a common non-blocking IO thread > class. Details of this refactoring can be found in KAFKA-1316. > > *4. Consumer API* > > As a result of the single-threaded non-blocking network IO implementation, > the new consumer API will no longer base on a blocking stream iterator > interface but a poll(timeout) interface. In addition, the consumer APIs are > designed to be flexible so that people can choose to either use the default > offset and group membership management utilities mentioned above or > implement their own offset/group management logic. For example, for > scenarios that needs fixed partition assignment or that prefer to store > offsets locally in a data store, the new consumer APIs would make it much > easier to customize. Details of the API design can be found in KAFKA-1328. > A question to think about is are there any other potential use cases that > can not be easily supported by this API. > > Cheers, > Guozhang