Hi Rob,

Piggy-back the rebalance partition info in HeartbeatResponse may cause
inconsistency of the partition assignments to consumers with consecutive
triggering of rebalances, since the coordinator no longer has a
synchronization barrier any more for re-compute the distribution with a
consistent view.

Guozhang


On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers <robert.w.with...@gmail.com>
wrote:

>
> > On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu <ba...@tinkerhq.com>
> wrote:
> >
> >
> >> On Jul 8, 2014, at 3:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> >>
> >> Hi All,
> >>
> >> We have written a wiki a few weeks back proposing a single-threaded
> ZK-free
> >> consumer client design for 0.9:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >>
> >> We want to share some of the ideas that came up for this design to get
> some
> >> early feedback. The below discussion assumes you have read the above
> wiki.
> >>
> >> *Offset Management*
> >>
> >> To make consumer clients ZK-free we need to move the offset management
> >> utility from ZooKeeper into the Kafka servers, which then store offsets
> as
> >> a special log. Key-based log compaction will be used to keep this
> >> ever-growing log's size bounded. Brokers who are responsible for the
> offset
> >> management for a given consumer group will be the leader of this special
> >> offset log partition where the partition key will be consumer group
> names.
> >> On the consumer side, instead of talking to ZK for commit and read
> offsets,
> >> it talks to the servers with new offset commit and offset fetch request
> >> types for offset management. This work has been done and will be
> included
> >> in the 0.8.2 release. Details of the implementation can be found in
> >> KAFKA-1000 and this wiki:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management
> >>
> >> *Group Membership Management*
> >>
> >> The next step is to move the membership management and rebalancing
> utility
> >> out of ZooKeeper as well. To do this we introduced a consumer
> coordinator
> >> hosted on the Kafka servers which is responsible for keeping track of
> group
> >> membership and consumption partition changes, and the corresponding
> >> rebalancing process.
> >>
> >> *1. Failure Detection*
> >>
> >> More specifically, we will use a heartbeating protocol for consumer and
> >> coordinator failure detections. The heartbeating protocol would be
> similar
> >> to the one ZK used, i.e. the consumer will set a session timeout value
> to
> >> the coordinator on startup, and keep sending heartbeats to the
> coordinator
> >> afterwards every session-timeout / heartbeat-frequency. The coordinator
> >> will treat a consumer as failed when it has not heard from the consumer
> >> after session-timeout, and the consumer will treat its coordinator as
> >> failed after it has not heard its heartbeat responses after
> session-timeout.
> >>
> >> One difference with ZK heartbeat protocol is that instead of fixing the
> >> heartbeat-frequency as three, we make this value configurable in
> consumer
> >> clients. This is because the rebalancing process (we will discuss later)
> >> latency is lower bounded by the heartbeat frequency , so we want this
> value
> >> to be large while not DDoSing the servers.
> >>
> >> *2. Consumer Rebalance*
> >>
> >> A bunch of events can trigger a rebalance process, 1) consumer failure,
> 2)
> >> new consumer joining group, 3) existing consumer change consume
> >> topic/partitions, 4) partition change for the consuming topics. Once the
> >> coordinator decides to trigger a rebalance it will notify the consumers
> >> within to resend their topic/partition subscription information, and
> then
> >> assign existing partitions to these consumers. On the consumers end,
> they
> >> no long run any rebalance logic in a distributed manner but follows the
> >> partition assignment it receives from the coordinator. Since the
> consumers
> >> can only be notified of a rebalance triggering via heartbeat responses,
> the
> >> rebalance latency is lower bounded by the heartbeat frequency; that is
> why
> >> we allow consumers to negotiate this value with the coordinator upon
> >> registration, and we would like to hear people's thoughts about this
> >> protocol.
> >
> > One question on this topic, my planned usage pattern is to have a high
> number of consumers that are ephemeral.  Imagine a consumer coming online,
> consuming messages for a short duration, then disappearing forever.  Is
> this use case supported?
> >
>
> If consumers are coming and going, there will be a high rate of rebalances
> and that will really start throttling consumption at some point.  You may
> have a use that hits that detrimentally.  A custom rebalance algorithm may
> be able to rebalance a sub-set of the consumers and not inform others of a
> need to rebalance them, which would ameliorate the cost.  Still the
> consumers that need to rebalance would pay.
>
> If the initial JoinGroupRequest was preserved, would it be possible to
> send rebalanced partition information in the HeartbeatResponse, up front to
> all (or a sub-set) consumers, and avoid the rejoin round trip?  I mean,
> push the rebalance and not pull it with a secondary JoinGroupRequest.  This
> would reduce cost and lower the throttle point, but poorly timed fetch
> requests may fail.  If the consumer were resilient to fetch failures, might
> it work?
>
>
> >> In addition, the rebalance logic in the new consumer will be
> customizable
> >> instead of hard-written in a per-topic round-robin manner. Users will be
> >> able to write their own assignment class following a common interface,
> and
> >> consumers upon registering themselves will specify in their registration
> >> request the assigner class name they wanted to use. If consumers within
> the
> >> same group specify different algorithms the registration will be
> rejected.
> >> We are not sure if it is the best way of customizing rebalance logic
> yet,
> >> so any feedbacks are more than welcome.
> >>
> >> For wildcard consumption, the consumers now will capture new topics that
> >> are available for fetching through the topic metadata request. That is,
> >> periodically the consumers will update its topic metadata for fetching,
> and
> >> if new topics are returned in the metadata response matching its
> wildcard
> >> regex, it will notify the coordinator to let it trigger a new rebalance
> to
> >> assign partitions for this new topic.
> >>
> >> There are some failure handling cases during the rebalancing process
> >> discussed in the consumer rewrite design wiki. We would encourage
> people to
> >> read it and let us know if there are any other corner cases not covered.
> >>
> >> Moving forward, we are thinking about making the coordinator to handle
> both
> >> the group management and offset management for the given groups, i.e.
> the
> >> leader of the offset log partition will naturally become the
> coordinator of
> >> the corresponding consumer. This allows the coordinator to reject offset
> >> commit requests if its sender is not part of the group. To do so a
> consumer
> >> id and generation id are applied to control the group member
> generations.
> >> Details of this design is included in the write design wiki page, and
> again
> >> any feedbacks are appreciated.
> >>
> >> *3. Non-blocking Network IO*
> >>
> >> With a single-threaded consumer, we will instead use a non-blocking
> network
> >> IO (i.e. java.nio) to do fetching loops just as the new producer does
> for
> >> asynchronous sending data. In terms of implementation, we will let the
> new
> >> consumer/producer clients be based on a common non-blocking IO thread
> >> class. Details of this refactoring can be found in KAFKA-1316.
> >>
> >> *4. Consumer API*
> >>
> >> As a result of the single-threaded non-blocking network IO
> implementation,
> >> the new consumer API will no longer base on a blocking stream iterator
> >> interface but a poll(timeout) interface. In addition, the consumer APIs
> are
> >> designed to be flexible so that people can choose to either use the
> default
> >> offset and group membership management utilities mentioned above or
> >> implement their own offset/group management logic. For example, for
> >> scenarios that needs fixed partition assignment or that prefer to store
> >> offsets locally in a data store, the new consumer APIs would make it
> much
> >> easier to customize. Details of the API design can be found in
> KAFKA-1328.
> >> A question to think about is are there any other potential use cases
> that
> >> can not be easily supported by this API.
> >>
> >> Cheers,
> >> Guozhang
> >
>



-- 
-- Guozhang

Reply via email to