Hi All,

We have written a wiki a few weeks back proposing a single-threaded ZK-free
consumer client design for 0.9:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

We want to share some of the ideas that came up for this design to get some
early feedback. The below discussion assumes you have read the above wiki.

*Offset Management*

To make consumer clients ZK-free we need to move the offset management
utility from ZooKeeper into the Kafka servers, which then store offsets as
a special log. Key-based log compaction will be used to keep this
ever-growing log's size bounded. Brokers who are responsible for the offset
management for a given consumer group will be the leader of this special
offset log partition where the partition key will be consumer group names.
On the consumer side, instead of talking to ZK for commit and read offsets,
it talks to the servers with new offset commit and offset fetch request
types for offset management. This work has been done and will be included
in the 0.8.2 release. Details of the implementation can be found in
KAFKA-1000 and this wiki:

https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

*Group Membership Management*

The next step is to move the membership management and rebalancing utility
out of ZooKeeper as well. To do this we introduced a consumer coordinator
hosted on the Kafka servers which is responsible for keeping track of group
membership and consumption partition changes, and the corresponding
rebalancing process.

*1. Failure Detection*

More specifically, we will use a heartbeating protocol for consumer and
coordinator failure detections. The heartbeating protocol would be similar
to the one ZK used, i.e. the consumer will set a session timeout value to
the coordinator on startup, and keep sending heartbeats to the coordinator
afterwards every session-timeout / heartbeat-frequency. The coordinator
will treat a consumer as failed when it has not heard from the consumer
after session-timeout, and the consumer will treat its coordinator as
failed after it has not heard its heartbeat responses after session-timeout.

One difference with ZK heartbeat protocol is that instead of fixing the
heartbeat-frequency as three, we make this value configurable in consumer
clients. This is because the rebalancing process (we will discuss later)
latency is lower bounded by the heartbeat frequency , so we want this value
to be large while not DDoSing the servers.

*2. Consumer Rebalance*

A bunch of events can trigger a rebalance process, 1) consumer failure, 2)
new consumer joining group, 3) existing consumer change consume
topic/partitions, 4) partition change for the consuming topics. Once the
coordinator decides to trigger a rebalance it will notify the consumers
within to resend their topic/partition subscription information, and then
assign existing partitions to these consumers. On the consumers end, they
no long run any rebalance logic in a distributed manner but follows the
partition assignment it receives from the coordinator. Since the consumers
can only be notified of a rebalance triggering via heartbeat responses, the
rebalance latency is lower bounded by the heartbeat frequency; that is why
we allow consumers to negotiate this value with the coordinator upon
registration, and we would like to hear people's thoughts about this
protocol.

In addition, the rebalance logic in the new consumer will be customizable
instead of hard-written in a per-topic round-robin manner. Users will be
able to write their own assignment class following a common interface, and
consumers upon registering themselves will specify in their registration
request the assigner class name they wanted to use. If consumers within the
same group specify different algorithms the registration will be rejected.
We are not sure if it is the best way of customizing rebalance logic yet,
so any feedbacks are more than welcome.

For wildcard consumption, the consumers now will capture new topics that
are available for fetching through the topic metadata request. That is,
periodically the consumers will update its topic metadata for fetching, and
if new topics are returned in the metadata response matching its wildcard
regex, it will notify the coordinator to let it trigger a new rebalance to
assign partitions for this new topic.

There are some failure handling cases during the rebalancing process
discussed in the consumer rewrite design wiki. We would encourage people to
read it and let us know if there are any other corner cases not covered.

Moving forward, we are thinking about making the coordinator to handle both
the group management and offset management for the given groups, i.e. the
leader of the offset log partition will naturally become the coordinator of
the corresponding consumer. This allows the coordinator to reject offset
commit requests if its sender is not part of the group. To do so a consumer
id and generation id are applied to control the group member generations.
Details of this design is included in the write design wiki page, and again
any feedbacks are appreciated.

*3. Non-blocking Network IO*

With a single-threaded consumer, we will instead use a non-blocking network
IO (i.e. java.nio) to do fetching loops just as the new producer does for
asynchronous sending data. In terms of implementation, we will let the new
consumer/producer clients be based on a common non-blocking IO thread
class. Details of this refactoring can be found in KAFKA-1316.

*4. Consumer API*

As a result of the single-threaded non-blocking network IO implementation,
the new consumer API will no longer base on a blocking stream iterator
interface but a poll(timeout) interface. In addition, the consumer APIs are
designed to be flexible so that people can choose to either use the default
offset and group membership management utilities mentioned above or
implement their own offset/group management logic. For example, for
scenarios that needs fixed partition assignment or that prefer to store
offsets locally in a data store, the new consumer APIs would make it much
easier to customize. Details of the API design can be found in KAFKA-1328.
A question to think about is are there any other potential use cases that
can not be easily supported by this API.

Cheers,
Guozhang

Reply via email to