Lock is a bad way to say it; a barrier is better.  I don't think what I am 
saying is even a barrier, since the rebalance would just need to recompute a 
rebalance schedule and submit it.  The only processing delay is to allow a soft 
remove to let the client cleanup, before you turn on the new guy, so it lags a 
bit.  Do you think this could this work?

Thanks,
Rob

> On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com> 
> wrote:
> 
> Hi Guozhang,
> 
> Thank you for considering my suggestions.  The security layer sounds like the 
> right facet to design for these sorts of capabilities.  Have you considered a 
> chained ocap security model for the broker using hash tokens?  This would 
> provide for per-partition read/write capabilities with QoS context including 
> leases, revocation, debug level and monitoring.  Overkill disappears as no 
> domain specific info needs to be stored at the brokers, like 
> consumer/partition assignments.  The read ocap for consumer 7/topic 
> bingo/partition 131 could be revoked at the brokers for a partition and 
> subsequent reads would fail the fetch for requests with that ocap token.  You 
> could also dynamically change the log level for a specific consumer/partition.
> 
> There are advantages we could discuss to having finer grained control.  
> Consider that scheduled partition rebalancing could be implemented with no 
> pauses from the perspective of the consumer threads; it looks like single 
> partition lag, as the offset commit occurs before rotation, with no lag to 
> non-rebalanced partitions: rebalance 1 partition per second so as to creep 
> load to a newbie consumer.  It would eliminate a global read lock and even 
> the internal Kafka consumer would never block on IO protocol other than the 
> normal fetch request (and the initial join group request). 
> 
> A global lock acquired through a pull protocol (HeartbeatRequest followed by 
> a JoinGroupRequest) for all live consumers is a much bigger lock than 
> security-based push protocol, as I assume the coordinator will have open 
> sockets to all brokers in order to reach out as needed.  As well, each lock 
> would be independent between partitions and be with only those brokers in ISR 
> for a given partition.  It is a much smaller lock.
> 
> I had some time to consider my suggestion that it be viewed as a relativistic 
> frame of reference.  Consider the model where each dimension of the frame of 
> reference for each consumer is each partition, actually a sub-space with the 
> dimensionality of the replication factor, but with a single leader election, 
> so consider it 1 dimension.  The total dimensionality of the consumers frame 
> of reference is the number of partitions, but only assigned partitions are 
> open to a given consumers viewpoint.  The offset is the partition dimension's 
> coordinate and only consumers with an open dimension can translate the 
> offset.  A rebalance opens or closes a dimension for a given consumer and can 
> be viewed as a rotation.  Could Kafka consumption and rebalance (and ISR 
> leader election) be reduced to matrix operations?
> 
> Rob
> 
>> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>> 
>> Hi Rob,
>> 
>> Sorry for the late reply.
>> 
>> If I understand your approach correctly, it requires all brokers to
>> remember the partition assignment of each consumer in order to decide
>> whether or not authorizing the fetch request, correct? If we are indeed
>> going to do such authorization for the security project then maybe it is a
>> good way to go, but otherwise might be an overkill to just support finer
>> grained partition assignment. In addition, instead of requiring a round
>> trip between the coordinator and the consumers for the synchronization
>> barrier, now the coordinator needs to wait for a round trip between itself
>> and other brokers before it can return the join-group request, right?
>> 
>> Guozhang
>> 
>> 
>> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers <robert.w.with...@gmail.com>
>> wrote:
>> 
>>> Hi Guozhang,
>>> 
>>> 
>>> 
>>> Currently, the brokers do not know which high-level consumers are reading
>>> which partitions and it is the rebalance between the consumers and the
>>> coordinator which would authorize a consumer to fetch a particular
>>> partition, I think.  Does this mean that when a rebalance occurs, all
>>> consumers must send a JoinGroupRequest and that the coordinator will not
>>> respond to any consumers until all consumers have sent the
>>> JoinGroupRequest, to enable the synchronization barrier?  That has the
>>> potential to be a sizable global delay.
>>> 
>>> 
>>> 
>>> On the assumption that there is only one coordinator for a group, why
>>> couldn't the synchronization barrier be per partition and internal to kafka
>>> and mostly not involve the consumers, other than a chance for offsetCommit
>>> by the consumer losing a partition?   If the brokers have session state and
>>> knows the new assignment before the consumer is notified with a
>>> HeartbeatResponse, it could fail the fetch request from that consumer for
>>> an invalidly assigned partition.  The consumer could take an invalid
>>> partition failure of the fetch request as if it were a HeartbeatResponse
>>> partition removal.
>>> 
>>> 
>>> 
>>> The only gap seems to be: when does a consumer that is losing a partition
>>> get a chance to commit its offset?  If there were a
>>> PartitionCommittedNotification message that a consumer could send to the
>>> coordinator after committing its offsets, then the coordinator could send
>>> the add partition HeartbeatResponse, after receiving the
>>> PartitionCommittedNotification, to the consumer gaining the partition and
>>> the offset management is stable.  The advantage is that none of the other
>>> consumers would be paused on any other partitions.  Only partitions being
>>> rebalanced would see any consumption pause.
>>> 
>>> 
>>> 
>>> So, something like:
>>> 
>>> 1.  3 consumers running with 12 partitions balanced, 4 each
>>> 
>>> 2.  New consumer starts and sends JoinGroupRequest to coordinator
>>> 
>>> 3.  Coordinator computes rebalance with 4 consumers: each existing
>>> consumer will lose a partition assigned to the new consumer
>>> 
>>> 4.  Coordinator informs all live brokers of partition reassignments
>>> 
>>> 5.  Brokers receive reassignments, starts failing unauthorized fetch
>>> requests and acks back to the coordinator
>>> 
>>> 6.  Coordinator receives all broker acks and sends HeartbeatResponses with
>>> partition removals to existing consumers and awaits
>>> PartitionCommittedNotifications from consumers losing partitions.
>>> 
>>> 7.  Existing consumers can continue to fetch messages from correctly
>>> assigned partitions
>>> 
>>> 8.  When an existing consumer fails a fetch for a partition or gets a
>>> HeartbeatResponse with a partition removal, it would commitOffsets for that
>>> partition and then send a PartitionCommittedNotification to the coordinator.
>>> 
>>> 9.  As the Coordinator receives the PartitionCommittedNotification, for a
>>> particular partition from an existing consumer, it sends the addPartition
>>> to consumer 4, in a HeartbeatResponse and the new consumer can start
>>> fetching that partition.
>>> 
>>> 
>>> 
>>> If a consumer drops HeartbeatRequests within a session timeout, the
>>> coordinator would inform the brokers and they would fail fetchRequests for
>>> those partitions from that consumer.   There is no chance to send
>>> removePartitions since no Heartbeat is occurring, but the addPartitions
>>> could be sent and the offset is what it is.  This seems no different than
>>> this sort of failure, today.
>>> 
>>> 
>>> 
>>> Instead of a global synchronization barrier isn’t it possible to have an
>>> incremental per-partition synchronization barrier?  The brokers would have
>>> to be aware of this.  I think of it as relativistic from each acceleration
>>> frame of reference, which is each consumer: event horizons.
>>> 
>>> 
>>> 
>>> Regards,
>>> 
>>> Rob
>>> 
>>> 
>>> 
>>> 
>>> 
>>> -----Original Message-----
>>> From: Guozhang Wang [mailto:wangg...@gmail.com]
>>> Sent: Wednesday, July 16, 2014 9:20 AM
>>> To: users@kafka.apache.org
>>> Subject: Re: New Consumer Design
>>> 
>>> 
>>> 
>>> Hi Rob,
>>> 
>>> 
>>> 
>>> Piggy-back the rebalance partition info in HeartbeatResponse may cause
>>> inconsistency of the partition assignments to consumers with consecutive
>>> triggering of rebalances, since the coordinator no longer has a
>>> synchronization barrier any more for re-compute the distribution with a
>>> consistent view.
>>> 
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto:
>>> robert.w.with...@gmail.com> robert.w.with...@gmail.com>
>>> 
>>> wrote:
>>> 
>>> 
>>> 
>>> 
>>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto:
>>> ba...@tinkerhq.com> ba...@tinkerhq.com>
>>> 
>>>> wrote:
>>> 
>>> 
>>> 
>>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto:
>>> wangg...@gmail.com> wangg...@gmail.com> wrote:
>>> 
>>> 
>>>>>> Hi All,
>>> 
>>> 
>>>>>> We have written a wiki a few weeks back proposing a single-threaded
>>> 
>>>> ZK-free
>>> 
>>>>>> consumer client design for 0.9:
>>> 
>>> 
>>> 
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R>
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R
>>> 
>>>> ewrite+Design
>>> 
>>> 
>>>>>> We want to share some of the ideas that came up for this design to
>>> 
>>>>>> get
>>> 
>>>> some
>>> 
>>>>>> early feedback. The below discussion assumes you have read the
>>> 
>>>>>> above
>>> 
>>>> wiki.
>>> 
>>> 
>>>>>> *Offset Management*
>>> 
>>> 
>>>>>> To make consumer clients ZK-free we need to move the offset
>>> 
>>>>>> management utility from ZooKeeper into the Kafka servers, which
>>> 
>>>>>> then store offsets
>>> 
>>>> as
>>> 
>>>>>> a special log. Key-based log compaction will be used to keep this
>>> 
>>>>>> ever-growing log's size bounded. Brokers who are responsible for
>>> 
>>>>>> the
>>> 
>>>> offset
>>> 
>>>>>> management for a given consumer group will be the leader of this
>>> 
>>>>>> special offset log partition where the partition key will be
>>> 
>>>>>> consumer group
>>> 
>>>> names.
>>> 
>>>>>> On the consumer side, instead of talking to ZK for commit and read
>>> 
>>>> offsets,
>>> 
>>>>>> it talks to the servers with new offset commit and offset fetch
>>> 
>>>>>> request types for offset management. This work has been done and
>>> 
>>>>>> will be
>>> 
>>>> included
>>> 
>>>>>> in the 0.8.2 release. Details of the implementation can be found in
>>> 
>>>>>> KAFKA-1000 and this wiki:
>>> 
>>> 
>>> 
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off>
>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off
>>> 
>>>> set+Management
>>> 
>>> 
>>>>>> *Group Membership Management*
>>> 
>>> 
>>>>>> The next step is to move the membership management and rebalancing
>>> 
>>>> utility
>>> 
>>>>>> out of ZooKeeper as well. To do this we introduced a consumer
>>> 
>>>> coordinator
>>> 
>>>>>> hosted on the Kafka servers which is responsible for keeping track
>>> 
>>>>>> of
>>> 
>>>> group
>>> 
>>>>>> membership and consumption partition changes, and the corresponding
>>> 
>>>>>> rebalancing process.
>>> 
>>> 
>>>>>> *1. Failure Detection*
>>> 
>>> 
>>>>>> More specifically, we will use a heartbeating protocol for consumer
>>> 
>>>>>> and coordinator failure detections. The heartbeating protocol would
>>> 
>>>>>> be
>>> 
>>>> similar
>>> 
>>>>>> to the one ZK used, i.e. the consumer will set a session timeout
>>> 
>>>>>> value
>>> 
>>>> to
>>> 
>>>>>> the coordinator on startup, and keep sending heartbeats to the
>>> 
>>>> coordinator
>>> 
>>>>>> afterwards every session-timeout / heartbeat-frequency. The
>>> 
>>>>>> coordinator will treat a consumer as failed when it has not heard
>>> 
>>>>>> from the consumer after session-timeout, and the consumer will
>>> 
>>>>>> treat its coordinator as failed after it has not heard its
>>> 
>>>>>> heartbeat responses after
>>> 
>>>> session-timeout.
>>> 
>>> 
>>>>>> One difference with ZK heartbeat protocol is that instead of fixing
>>> 
>>>>>> the heartbeat-frequency as three, we make this value configurable
>>> 
>>>>>> in
>>> 
>>>> consumer
>>> 
>>>>>> clients. This is because the rebalancing process (we will discuss
>>> 
>>>>>> later) latency is lower bounded by the heartbeat frequency , so we
>>> 
>>>>>> want this
>>> 
>>>> value
>>> 
>>>>>> to be large while not DDoSing the servers.
>>> 
>>> 
>>>>>> *2. Consumer Rebalance*
>>> 
>>> 
>>>>>> A bunch of events can trigger a rebalance process, 1) consumer
>>> 
>>>>>> failure,
>>> 
>>>> 2)
>>> 
>>>>>> new consumer joining group, 3) existing consumer change consume
>>> 
>>>>>> topic/partitions, 4) partition change for the consuming topics.
>>> 
>>>>>> Once the coordinator decides to trigger a rebalance it will notify
>>> 
>>>>>> the consumers within to resend their topic/partition subscription
>>> 
>>>>>> information, and
>>> 
>>>> then
>>> 
>>>>>> assign existing partitions to these consumers. On the consumers
>>> 
>>>>>> end,
>>> 
>>>> they
>>> 
>>>>>> no long run any rebalance logic in a distributed manner but follows
>>> 
>>>>>> the partition assignment it receives from the coordinator. Since
>>> 
>>>>>> the
>>> 
>>>> consumers
>>> 
>>>>>> can only be notified of a rebalance triggering via heartbeat
>>> 
>>>>>> responses,
>>> 
>>>> the
>>> 
>>>>>> rebalance latency is lower bounded by the heartbeat frequency; that
>>> 
>>>>>> is
>>> 
>>>> why
>>> 
>>>>>> we allow consumers to negotiate this value with the coordinator
>>> 
>>>>>> upon registration, and we would like to hear people's thoughts
>>> 
>>>>>> about this protocol.
>>> 
>>> 
>>>>> One question on this topic, my planned usage pattern is to have a
>>> 
>>>>> high
>>> 
>>>> number of consumers that are ephemeral.  Imagine a consumer coming
>>> 
>>>> online, consuming messages for a short duration, then disappearing
>>> 
>>>> forever.  Is this use case supported?
>>> 
>>> 
>>> 
>>>> If consumers are coming and going, there will be a high rate of
>>> 
>>>> rebalances and that will really start throttling consumption at some
>>> 
>>>> point.  You may have a use that hits that detrimentally.  A custom
>>> 
>>>> rebalance algorithm may be able to rebalance a sub-set of the
>>> 
>>>> consumers and not inform others of a need to rebalance them, which
>>> 
>>>> would ameliorate the cost.  Still the consumers that need to rebalance
>>> would pay.
>>> 
>>> 
>>>> If the initial JoinGroupRequest was preserved, would it be possible to
>>> 
>>>> send rebalanced partition information in the HeartbeatResponse, up
>>> 
>>>> front to all (or a sub-set) consumers, and avoid the rejoin round
>>> 
>>>> trip?  I mean, push the rebalance and not pull it with a secondary
>>> 
>>>> JoinGroupRequest.  This would reduce cost and lower the throttle
>>> 
>>>> point, but poorly timed fetch requests may fail.  If the consumer were
>>> 
>>>> resilient to fetch failures, might it work?
>>> 
>>> 
>>> 
>>>>>> In addition, the rebalance logic in the new consumer will be
>>> 
>>>> customizable
>>> 
>>>>>> instead of hard-written in a per-topic round-robin manner. Users
>>> 
>>>>>> will be able to write their own assignment class following a common
>>> 
>>>>>> interface,
>>> 
>>>> and
>>> 
>>>>>> consumers upon registering themselves will specify in their
>>> 
>>>>>> registration request the assigner class name they wanted to use. If
>>> 
>>>>>> consumers within
>>> 
>>>> the
>>> 
>>>>>> same group specify different algorithms the registration will be
>>> 
>>>> rejected.
>>> 
>>>>>> We are not sure if it is the best way of customizing rebalance
>>> 
>>>>>> logic
>>> 
>>>> yet,
>>> 
>>>>>> so any feedbacks are more than welcome.
>>> 
>>> 
>>>>>> For wildcard consumption, the consumers now will capture new topics
>>> 
>>>>>> that are available for fetching through the topic metadata request.
>>> 
>>>>>> That is, periodically the consumers will update its topic metadata
>>> 
>>>>>> for fetching,
>>> 
>>>> and
>>> 
>>>>>> if new topics are returned in the metadata response matching its
>>> 
>>>> wildcard
>>> 
>>>>>> regex, it will notify the coordinator to let it trigger a new
>>> 
>>>>>> rebalance
>>> 
>>>> to
>>> 
>>>>>> assign partitions for this new topic.
>>> 
>>> 
>>>>>> There are some failure handling cases during the rebalancing
>>> 
>>>>>> process discussed in the consumer rewrite design wiki. We would
>>> 
>>>>>> encourage
>>> 
>>>> people to
>>> 
>>>>>> read it and let us know if there are any other corner cases not
>>> covered.
>>> 
>>> 
>>>>>> Moving forward, we are thinking about making the coordinator to
>>> 
>>>>>> handle
>>> 
>>>> both
>>> 
>>>>>> the group management and offset management for the given groups, i.e.
>>> 
>>>> the
>>> 
>>>>>> leader of the offset log partition will naturally become the
>>> 
>>>> coordinator of
>>> 
>>>>>> the corresponding consumer. This allows the coordinator to reject
>>> 
>>>>>> offset commit requests if its sender is not part of the group. To
>>> 
>>>>>> do so a
>>> 
>>>> consumer
>>> 
>>>>>> id and generation id are applied to control the group member
>>> 
>>>> generations.
>>> 
>>>>>> Details of this design is included in the write design wiki page,
>>> 
>>>>>> and
>>> 
>>>> again
>>> 
>>>>>> any feedbacks are appreciated.
>>> 
>>> 
>>>>>> *3. Non-blocking Network IO*
>>> 
>>> 
>>>>>> With a single-threaded consumer, we will instead use a non-blocking
>>> 
>>>> network
>>> 
>>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer
>>> 
>>>>>> does
>>> 
>>>> for
>>> 
>>>>>> asynchronous sending data. In terms of implementation, we will let
>>> 
>>>>>> the
>>> 
>>>> new
>>> 
>>>>>> consumer/producer clients be based on a common non-blocking IO
>>> 
>>>>>> thread class. Details of this refactoring can be found in KAFKA-1316.
>>> 
>>> 
>>>>>> *4. Consumer API*
>>> 
>>> 
>>>>>> As a result of the single-threaded non-blocking network IO
>>> 
>>>> implementation,
>>> 
>>>>>> the new consumer API will no longer base on a blocking stream
>>> 
>>>>>> iterator interface but a poll(timeout) interface. In addition, the
>>> 
>>>>>> consumer APIs
>>> 
>>>> are
>>> 
>>>>>> designed to be flexible so that people can choose to either use the
>>> 
>>>> default
>>> 
>>>>>> offset and group membership management utilities mentioned above or
>>> 
>>>>>> implement their own offset/group management logic. For example, for
>>> 
>>>>>> scenarios that needs fixed partition assignment or that prefer to
>>> 
>>>>>> store offsets locally in a data store, the new consumer APIs would
>>> 
>>>>>> make it
>>> 
>>>> much
>>> 
>>>>>> easier to customize. Details of the API design can be found in
>>> 
>>>> KAFKA-1328.
>>> 
>>>>>> A question to think about is are there any other potential use
>>> 
>>>>>> cases
>>> 
>>>> that
>>> 
>>>>>> can not be easily supported by this API.
>>> 
>>> 
>>>>>> Cheers,
>>> 
>>>>>> Guozhang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>> -- Guozhang
>> 
>> 
>> -- 
>> -- Guozhang

Reply via email to