On Jul 8, 2014, at 3:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi All,
> 
> We have written a wiki a few weeks back proposing a single-threaded ZK-free
> consumer client design for 0.9:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> 
> We want to share some of the ideas that came up for this design to get some
> early feedback. The below discussion assumes you have read the above wiki.
> 
> *Offset Management*
> 
> To make consumer clients ZK-free we need to move the offset management
> utility from ZooKeeper into the Kafka servers, which then store offsets as
> a special log. Key-based log compaction will be used to keep this
> ever-growing log's size bounded. Brokers who are responsible for the offset
> management for a given consumer group will be the leader of this special
> offset log partition where the partition key will be consumer group names.
> On the consumer side, instead of talking to ZK for commit and read offsets,
> it talks to the servers with new offset commit and offset fetch request
> types for offset management. This work has been done and will be included
> in the 0.8.2 release. Details of the implementation can be found in
> KAFKA-1000 and this wiki:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management
> 
> *Group Membership Management*
> 
> The next step is to move the membership management and rebalancing utility
> out of ZooKeeper as well. To do this we introduced a consumer coordinator
> hosted on the Kafka servers which is responsible for keeping track of group
> membership and consumption partition changes, and the corresponding
> rebalancing process.
> 
> *1. Failure Detection*
> 
> More specifically, we will use a heartbeating protocol for consumer and
> coordinator failure detections. The heartbeating protocol would be similar
> to the one ZK used, i.e. the consumer will set a session timeout value to
> the coordinator on startup, and keep sending heartbeats to the coordinator
> afterwards every session-timeout / heartbeat-frequency. The coordinator
> will treat a consumer as failed when it has not heard from the consumer
> after session-timeout, and the consumer will treat its coordinator as
> failed after it has not heard its heartbeat responses after session-timeout.
> 
> One difference with ZK heartbeat protocol is that instead of fixing the
> heartbeat-frequency as three, we make this value configurable in consumer
> clients. This is because the rebalancing process (we will discuss later)
> latency is lower bounded by the heartbeat frequency , so we want this value
> to be large while not DDoSing the servers.
> 
> *2. Consumer Rebalance*
> 
> A bunch of events can trigger a rebalance process, 1) consumer failure, 2)
> new consumer joining group, 3) existing consumer change consume
> topic/partitions, 4) partition change for the consuming topics. Once the
> coordinator decides to trigger a rebalance it will notify the consumers
> within to resend their topic/partition subscription information, and then
> assign existing partitions to these consumers. On the consumers end, they
> no long run any rebalance logic in a distributed manner but follows the
> partition assignment it receives from the coordinator. Since the consumers
> can only be notified of a rebalance triggering via heartbeat responses, the
> rebalance latency is lower bounded by the heartbeat frequency; that is why
> we allow consumers to negotiate this value with the coordinator upon
> registration, and we would like to hear people's thoughts about this
> protocol.

One question on this topic, my planned usage pattern is to have a high number 
of consumers that are ephemeral.  Imagine a consumer coming online, consuming 
messages for a short duration, then disappearing forever.  Is this use case 
supported?

> In addition, the rebalance logic in the new consumer will be customizable
> instead of hard-written in a per-topic round-robin manner. Users will be
> able to write their own assignment class following a common interface, and
> consumers upon registering themselves will specify in their registration
> request the assigner class name they wanted to use. If consumers within the
> same group specify different algorithms the registration will be rejected.
> We are not sure if it is the best way of customizing rebalance logic yet,
> so any feedbacks are more than welcome.
> 
> For wildcard consumption, the consumers now will capture new topics that
> are available for fetching through the topic metadata request. That is,
> periodically the consumers will update its topic metadata for fetching, and
> if new topics are returned in the metadata response matching its wildcard
> regex, it will notify the coordinator to let it trigger a new rebalance to
> assign partitions for this new topic.
> 
> There are some failure handling cases during the rebalancing process
> discussed in the consumer rewrite design wiki. We would encourage people to
> read it and let us know if there are any other corner cases not covered.
> 
> Moving forward, we are thinking about making the coordinator to handle both
> the group management and offset management for the given groups, i.e. the
> leader of the offset log partition will naturally become the coordinator of
> the corresponding consumer. This allows the coordinator to reject offset
> commit requests if its sender is not part of the group. To do so a consumer
> id and generation id are applied to control the group member generations.
> Details of this design is included in the write design wiki page, and again
> any feedbacks are appreciated.
> 
> *3. Non-blocking Network IO*
> 
> With a single-threaded consumer, we will instead use a non-blocking network
> IO (i.e. java.nio) to do fetching loops just as the new producer does for
> asynchronous sending data. In terms of implementation, we will let the new
> consumer/producer clients be based on a common non-blocking IO thread
> class. Details of this refactoring can be found in KAFKA-1316.
> 
> *4. Consumer API*
> 
> As a result of the single-threaded non-blocking network IO implementation,
> the new consumer API will no longer base on a blocking stream iterator
> interface but a poll(timeout) interface. In addition, the consumer APIs are
> designed to be flexible so that people can choose to either use the default
> offset and group membership management utilities mentioned above or
> implement their own offset/group management logic. For example, for
> scenarios that needs fixed partition assignment or that prefer to store
> offsets locally in a data store, the new consumer APIs would make it much
> easier to customize. Details of the API design can be found in KAFKA-1328.
> A question to think about is are there any other potential use cases that
> can not be easily supported by this API.
> 
> Cheers,
> Guozhang

Reply via email to