Lock is a bad way to say it; a barrier is better. I don't think what I am saying is even a barrier, since the rebalance would just need to recompute a rebalance schedule and submit it. The only processing delay is to allow a soft remove to let the client cleanup, before you turn on the new guy, so it lags a bit. Do you think this could this work?
Thanks, Rob > On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com> > wrote: > > Hi Guozhang, > > Thank you for considering my suggestions. The security layer sounds like the > right facet to design for these sorts of capabilities. Have you considered a > chained ocap security model for the broker using hash tokens? This would > provide for per-partition read/write capabilities with QoS context including > leases, revocation, debug level and monitoring. Overkill disappears as no > domain specific info needs to be stored at the brokers, like > consumer/partition assignments. The read ocap for consumer 7/topic > bingo/partition 131 could be revoked at the brokers for a partition and > subsequent reads would fail the fetch for requests with that ocap token. You > could also dynamically change the log level for a specific consumer/partition. > > There are advantages we could discuss to having finer grained control. > Consider that scheduled partition rebalancing could be implemented with no > pauses from the perspective of the consumer threads; it looks like single > partition lag, as the offset commit occurs before rotation, with no lag to > non-rebalanced partitions: rebalance 1 partition per second so as to creep > load to a newbie consumer. It would eliminate a global read lock and even > the internal Kafka consumer would never block on IO protocol other than the > normal fetch request (and the initial join group request). > > A global lock acquired through a pull protocol (HeartbeatRequest followed by > a JoinGroupRequest) for all live consumers is a much bigger lock than > security-based push protocol, as I assume the coordinator will have open > sockets to all brokers in order to reach out as needed. As well, each lock > would be independent between partitions and be with only those brokers in ISR > for a given partition. It is a much smaller lock. > > I had some time to consider my suggestion that it be viewed as a relativistic > frame of reference. Consider the model where each dimension of the frame of > reference for each consumer is each partition, actually a sub-space with the > dimensionality of the replication factor, but with a single leader election, > so consider it 1 dimension. The total dimensionality of the consumers frame > of reference is the number of partitions, but only assigned partitions are > open to a given consumers viewpoint. The offset is the partition dimension's > coordinate and only consumers with an open dimension can translate the > offset. A rebalance opens or closes a dimension for a given consumer and can > be viewed as a rotation. Could Kafka consumption and rebalance (and ISR > leader election) be reduced to matrix operations? > > Rob > >> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> >> Hi Rob, >> >> Sorry for the late reply. >> >> If I understand your approach correctly, it requires all brokers to >> remember the partition assignment of each consumer in order to decide >> whether or not authorizing the fetch request, correct? If we are indeed >> going to do such authorization for the security project then maybe it is a >> good way to go, but otherwise might be an overkill to just support finer >> grained partition assignment. In addition, instead of requiring a round >> trip between the coordinator and the consumers for the synchronization >> barrier, now the coordinator needs to wait for a round trip between itself >> and other brokers before it can return the join-group request, right? >> >> Guozhang >> >> >> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers <robert.w.with...@gmail.com> >> wrote: >> >>> Hi Guozhang, >>> >>> >>> >>> Currently, the brokers do not know which high-level consumers are reading >>> which partitions and it is the rebalance between the consumers and the >>> coordinator which would authorize a consumer to fetch a particular >>> partition, I think. Does this mean that when a rebalance occurs, all >>> consumers must send a JoinGroupRequest and that the coordinator will not >>> respond to any consumers until all consumers have sent the >>> JoinGroupRequest, to enable the synchronization barrier? That has the >>> potential to be a sizable global delay. >>> >>> >>> >>> On the assumption that there is only one coordinator for a group, why >>> couldn't the synchronization barrier be per partition and internal to kafka >>> and mostly not involve the consumers, other than a chance for offsetCommit >>> by the consumer losing a partition? If the brokers have session state and >>> knows the new assignment before the consumer is notified with a >>> HeartbeatResponse, it could fail the fetch request from that consumer for >>> an invalidly assigned partition. The consumer could take an invalid >>> partition failure of the fetch request as if it were a HeartbeatResponse >>> partition removal. >>> >>> >>> >>> The only gap seems to be: when does a consumer that is losing a partition >>> get a chance to commit its offset? If there were a >>> PartitionCommittedNotification message that a consumer could send to the >>> coordinator after committing its offsets, then the coordinator could send >>> the add partition HeartbeatResponse, after receiving the >>> PartitionCommittedNotification, to the consumer gaining the partition and >>> the offset management is stable. The advantage is that none of the other >>> consumers would be paused on any other partitions. Only partitions being >>> rebalanced would see any consumption pause. >>> >>> >>> >>> So, something like: >>> >>> 1. 3 consumers running with 12 partitions balanced, 4 each >>> >>> 2. New consumer starts and sends JoinGroupRequest to coordinator >>> >>> 3. Coordinator computes rebalance with 4 consumers: each existing >>> consumer will lose a partition assigned to the new consumer >>> >>> 4. Coordinator informs all live brokers of partition reassignments >>> >>> 5. Brokers receive reassignments, starts failing unauthorized fetch >>> requests and acks back to the coordinator >>> >>> 6. Coordinator receives all broker acks and sends HeartbeatResponses with >>> partition removals to existing consumers and awaits >>> PartitionCommittedNotifications from consumers losing partitions. >>> >>> 7. Existing consumers can continue to fetch messages from correctly >>> assigned partitions >>> >>> 8. When an existing consumer fails a fetch for a partition or gets a >>> HeartbeatResponse with a partition removal, it would commitOffsets for that >>> partition and then send a PartitionCommittedNotification to the coordinator. >>> >>> 9. As the Coordinator receives the PartitionCommittedNotification, for a >>> particular partition from an existing consumer, it sends the addPartition >>> to consumer 4, in a HeartbeatResponse and the new consumer can start >>> fetching that partition. >>> >>> >>> >>> If a consumer drops HeartbeatRequests within a session timeout, the >>> coordinator would inform the brokers and they would fail fetchRequests for >>> those partitions from that consumer. There is no chance to send >>> removePartitions since no Heartbeat is occurring, but the addPartitions >>> could be sent and the offset is what it is. This seems no different than >>> this sort of failure, today. >>> >>> >>> >>> Instead of a global synchronization barrier isn’t it possible to have an >>> incremental per-partition synchronization barrier? The brokers would have >>> to be aware of this. I think of it as relativistic from each acceleration >>> frame of reference, which is each consumer: event horizons. >>> >>> >>> >>> Regards, >>> >>> Rob >>> >>> >>> >>> >>> >>> -----Original Message----- >>> From: Guozhang Wang [mailto:wangg...@gmail.com] >>> Sent: Wednesday, July 16, 2014 9:20 AM >>> To: users@kafka.apache.org >>> Subject: Re: New Consumer Design >>> >>> >>> >>> Hi Rob, >>> >>> >>> >>> Piggy-back the rebalance partition info in HeartbeatResponse may cause >>> inconsistency of the partition assignments to consumers with consecutive >>> triggering of rebalances, since the coordinator no longer has a >>> synchronization barrier any more for re-compute the distribution with a >>> consistent view. >>> >>> >>> >>> Guozhang >>> >>> >>> >>> >>> >>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto: >>> robert.w.with...@gmail.com> robert.w.with...@gmail.com> >>> >>> wrote: >>> >>> >>> >>> >>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto: >>> ba...@tinkerhq.com> ba...@tinkerhq.com> >>> >>>> wrote: >>> >>> >>> >>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto: >>> wangg...@gmail.com> wangg...@gmail.com> wrote: >>> >>> >>>>>> Hi All, >>> >>> >>>>>> We have written a wiki a few weeks back proposing a single-threaded >>> >>>> ZK-free >>> >>>>>> consumer client design for 0.9: >>> >>> >>> >>>> <https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R> >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R >>> >>>> ewrite+Design >>> >>> >>>>>> We want to share some of the ideas that came up for this design to >>> >>>>>> get >>> >>>> some >>> >>>>>> early feedback. The below discussion assumes you have read the >>> >>>>>> above >>> >>>> wiki. >>> >>> >>>>>> *Offset Management* >>> >>> >>>>>> To make consumer clients ZK-free we need to move the offset >>> >>>>>> management utility from ZooKeeper into the Kafka servers, which >>> >>>>>> then store offsets >>> >>>> as >>> >>>>>> a special log. Key-based log compaction will be used to keep this >>> >>>>>> ever-growing log's size bounded. Brokers who are responsible for >>> >>>>>> the >>> >>>> offset >>> >>>>>> management for a given consumer group will be the leader of this >>> >>>>>> special offset log partition where the partition key will be >>> >>>>>> consumer group >>> >>>> names. >>> >>>>>> On the consumer side, instead of talking to ZK for commit and read >>> >>>> offsets, >>> >>>>>> it talks to the servers with new offset commit and offset fetch >>> >>>>>> request types for offset management. This work has been done and >>> >>>>>> will be >>> >>>> included >>> >>>>>> in the 0.8.2 release. Details of the implementation can be found in >>> >>>>>> KAFKA-1000 and this wiki: >>> >>> >>> >>>> <https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off> >>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off >>> >>>> set+Management >>> >>> >>>>>> *Group Membership Management* >>> >>> >>>>>> The next step is to move the membership management and rebalancing >>> >>>> utility >>> >>>>>> out of ZooKeeper as well. To do this we introduced a consumer >>> >>>> coordinator >>> >>>>>> hosted on the Kafka servers which is responsible for keeping track >>> >>>>>> of >>> >>>> group >>> >>>>>> membership and consumption partition changes, and the corresponding >>> >>>>>> rebalancing process. >>> >>> >>>>>> *1. Failure Detection* >>> >>> >>>>>> More specifically, we will use a heartbeating protocol for consumer >>> >>>>>> and coordinator failure detections. The heartbeating protocol would >>> >>>>>> be >>> >>>> similar >>> >>>>>> to the one ZK used, i.e. the consumer will set a session timeout >>> >>>>>> value >>> >>>> to >>> >>>>>> the coordinator on startup, and keep sending heartbeats to the >>> >>>> coordinator >>> >>>>>> afterwards every session-timeout / heartbeat-frequency. The >>> >>>>>> coordinator will treat a consumer as failed when it has not heard >>> >>>>>> from the consumer after session-timeout, and the consumer will >>> >>>>>> treat its coordinator as failed after it has not heard its >>> >>>>>> heartbeat responses after >>> >>>> session-timeout. >>> >>> >>>>>> One difference with ZK heartbeat protocol is that instead of fixing >>> >>>>>> the heartbeat-frequency as three, we make this value configurable >>> >>>>>> in >>> >>>> consumer >>> >>>>>> clients. This is because the rebalancing process (we will discuss >>> >>>>>> later) latency is lower bounded by the heartbeat frequency , so we >>> >>>>>> want this >>> >>>> value >>> >>>>>> to be large while not DDoSing the servers. >>> >>> >>>>>> *2. Consumer Rebalance* >>> >>> >>>>>> A bunch of events can trigger a rebalance process, 1) consumer >>> >>>>>> failure, >>> >>>> 2) >>> >>>>>> new consumer joining group, 3) existing consumer change consume >>> >>>>>> topic/partitions, 4) partition change for the consuming topics. >>> >>>>>> Once the coordinator decides to trigger a rebalance it will notify >>> >>>>>> the consumers within to resend their topic/partition subscription >>> >>>>>> information, and >>> >>>> then >>> >>>>>> assign existing partitions to these consumers. On the consumers >>> >>>>>> end, >>> >>>> they >>> >>>>>> no long run any rebalance logic in a distributed manner but follows >>> >>>>>> the partition assignment it receives from the coordinator. Since >>> >>>>>> the >>> >>>> consumers >>> >>>>>> can only be notified of a rebalance triggering via heartbeat >>> >>>>>> responses, >>> >>>> the >>> >>>>>> rebalance latency is lower bounded by the heartbeat frequency; that >>> >>>>>> is >>> >>>> why >>> >>>>>> we allow consumers to negotiate this value with the coordinator >>> >>>>>> upon registration, and we would like to hear people's thoughts >>> >>>>>> about this protocol. >>> >>> >>>>> One question on this topic, my planned usage pattern is to have a >>> >>>>> high >>> >>>> number of consumers that are ephemeral. Imagine a consumer coming >>> >>>> online, consuming messages for a short duration, then disappearing >>> >>>> forever. Is this use case supported? >>> >>> >>> >>>> If consumers are coming and going, there will be a high rate of >>> >>>> rebalances and that will really start throttling consumption at some >>> >>>> point. You may have a use that hits that detrimentally. A custom >>> >>>> rebalance algorithm may be able to rebalance a sub-set of the >>> >>>> consumers and not inform others of a need to rebalance them, which >>> >>>> would ameliorate the cost. Still the consumers that need to rebalance >>> would pay. >>> >>> >>>> If the initial JoinGroupRequest was preserved, would it be possible to >>> >>>> send rebalanced partition information in the HeartbeatResponse, up >>> >>>> front to all (or a sub-set) consumers, and avoid the rejoin round >>> >>>> trip? I mean, push the rebalance and not pull it with a secondary >>> >>>> JoinGroupRequest. This would reduce cost and lower the throttle >>> >>>> point, but poorly timed fetch requests may fail. If the consumer were >>> >>>> resilient to fetch failures, might it work? >>> >>> >>> >>>>>> In addition, the rebalance logic in the new consumer will be >>> >>>> customizable >>> >>>>>> instead of hard-written in a per-topic round-robin manner. Users >>> >>>>>> will be able to write their own assignment class following a common >>> >>>>>> interface, >>> >>>> and >>> >>>>>> consumers upon registering themselves will specify in their >>> >>>>>> registration request the assigner class name they wanted to use. If >>> >>>>>> consumers within >>> >>>> the >>> >>>>>> same group specify different algorithms the registration will be >>> >>>> rejected. >>> >>>>>> We are not sure if it is the best way of customizing rebalance >>> >>>>>> logic >>> >>>> yet, >>> >>>>>> so any feedbacks are more than welcome. >>> >>> >>>>>> For wildcard consumption, the consumers now will capture new topics >>> >>>>>> that are available for fetching through the topic metadata request. >>> >>>>>> That is, periodically the consumers will update its topic metadata >>> >>>>>> for fetching, >>> >>>> and >>> >>>>>> if new topics are returned in the metadata response matching its >>> >>>> wildcard >>> >>>>>> regex, it will notify the coordinator to let it trigger a new >>> >>>>>> rebalance >>> >>>> to >>> >>>>>> assign partitions for this new topic. >>> >>> >>>>>> There are some failure handling cases during the rebalancing >>> >>>>>> process discussed in the consumer rewrite design wiki. We would >>> >>>>>> encourage >>> >>>> people to >>> >>>>>> read it and let us know if there are any other corner cases not >>> covered. >>> >>> >>>>>> Moving forward, we are thinking about making the coordinator to >>> >>>>>> handle >>> >>>> both >>> >>>>>> the group management and offset management for the given groups, i.e. >>> >>>> the >>> >>>>>> leader of the offset log partition will naturally become the >>> >>>> coordinator of >>> >>>>>> the corresponding consumer. This allows the coordinator to reject >>> >>>>>> offset commit requests if its sender is not part of the group. To >>> >>>>>> do so a >>> >>>> consumer >>> >>>>>> id and generation id are applied to control the group member >>> >>>> generations. >>> >>>>>> Details of this design is included in the write design wiki page, >>> >>>>>> and >>> >>>> again >>> >>>>>> any feedbacks are appreciated. >>> >>> >>>>>> *3. Non-blocking Network IO* >>> >>> >>>>>> With a single-threaded consumer, we will instead use a non-blocking >>> >>>> network >>> >>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer >>> >>>>>> does >>> >>>> for >>> >>>>>> asynchronous sending data. In terms of implementation, we will let >>> >>>>>> the >>> >>>> new >>> >>>>>> consumer/producer clients be based on a common non-blocking IO >>> >>>>>> thread class. Details of this refactoring can be found in KAFKA-1316. >>> >>> >>>>>> *4. Consumer API* >>> >>> >>>>>> As a result of the single-threaded non-blocking network IO >>> >>>> implementation, >>> >>>>>> the new consumer API will no longer base on a blocking stream >>> >>>>>> iterator interface but a poll(timeout) interface. In addition, the >>> >>>>>> consumer APIs >>> >>>> are >>> >>>>>> designed to be flexible so that people can choose to either use the >>> >>>> default >>> >>>>>> offset and group membership management utilities mentioned above or >>> >>>>>> implement their own offset/group management logic. For example, for >>> >>>>>> scenarios that needs fixed partition assignment or that prefer to >>> >>>>>> store offsets locally in a data store, the new consumer APIs would >>> >>>>>> make it >>> >>>> much >>> >>>>>> easier to customize. Details of the API design can be found in >>> >>>> KAFKA-1328. >>> >>>>>> A question to think about is are there any other potential use >>> >>>>>> cases >>> >>>> that >>> >>>>>> can not be easily supported by this API. >>> >>> >>>>>> Cheers, >>> >>>>>> Guozhang >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> -- >>> >>> -- Guozhang >> >> >> -- >> -- Guozhang