Hey Robert,

I think the issue is that you lose much of the simplification if you
still need to heart beat and you need a new commit notification
message. I think that would make this proposal more complex than the
current one.

-Jay

On Mon, Jul 21, 2014 at 6:44 PM, Robert Withers
<robert.w.with...@gmail.com> wrote:
> Thanks, Jay, for the good summary.  Regarding point 2, I would think the 
> heartbeat would still be desired, to give control over liveness detection 
> parameters and to directly inform clients when gaining or losing a partition 
> (especially when gaining a partition).  There would be no barrier and the 
> rebalancer would be an offline scheduler, issuing SwitchPartition commands.  
> The evaluation of a SwitchPartition command would await the consumer losing a 
> partition to commit offset and any local commit work needed before confirming 
> completion to the co-ordinator, which would then inform the new consumer and 
> ISR brokers about the partition gain.  Broker security would be the master 
> record of love assignments.
>
> Thanks,
> Rob
>
>> On Jul 21, 2014, at 6:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>
>> This thread is a bit long, but let me see if I can restate it
>> correctly (not sure I fully follow).
>>
>> There are two suggestions:
>> 1. Allow partial rebalances that move just some partitions. I.e. if a
>> consumer fails and has only one partition only one other consumer
>> should be effected (the one who picks up the new partition). If there
>> are many partitions to be reassigned there will obviously be a
>> tradeoff between impacting all consumers and balancing load evenly.
>> I.e. if you moved all load to one other consumer that would cause
>> little rebalancing interruption but poor load balancing.
>> 2. Have the co-ordinator communicate the assignments to the brokers
>> rather than to the client directly. This could potentially simplify
>> the consumer. Perhaps it would be possible to have the leader track
>> liveness using the fetch requests rather than needing an artificial
>> heartbeat.
>>
>> These are interesting ideas.
>>
>> -Jay
>>
>>
>>
>>> On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>> Hello Rob,
>>>
>>> If I get your idea right, the idea is that if the rebalance only changes
>>> the ownership of a few consumers in the group, the coordinator can just
>>> sync with them and do not interrupt with other consumers.
>>>
>>> I think this approach may work. However it will likely complicates the
>>> logic of coordinator after we sketch out all the details since the
>>> rebalance results is basically depend on two variables: 1) partitions for
>>> the subscribed topics, 2) consumers inside the group, hence following this
>>> approach by the time the coordinator decides to trigger a rebalance, it
>>> must correctly keep track of which variable changes triggers the current
>>> rebalance process; on the other hand, the normal rebalance process even
>>> with a global barrier should usually be very fast, with a few hundreds of
>>> millis. So I am not sure if this is a worthy optimization that we would
>>> want for now. What do you think?
>>>
>>> Guozhang
>>>
>>>
>>> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers <robert.w.with...@gmail.com
>>>> wrote:
>>>
>>>> Lock is a bad way to say it; a barrier is better.  I don't think what I am
>>>> saying is even a barrier, since the rebalance would just need to recompute
>>>> a rebalance schedule and submit it.  The only processing delay is to allow
>>>> a soft remove to let the client cleanup, before you turn on the new guy, so
>>>> it lags a bit.  Do you think this could this work?
>>>>
>>>> Thanks,
>>>> Rob
>>>>
>>>>>> On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Guozhang,
>>>>>
>>>>> Thank you for considering my suggestions.  The security layer sounds
>>>> like the right facet to design for these sorts of capabilities.  Have you
>>>> considered a chained ocap security model for the broker using hash tokens?
>>>> This would provide for per-partition read/write capabilities with QoS
>>>> context including leases, revocation, debug level and monitoring.  Overkill
>>>> disappears as no domain specific info needs to be stored at the brokers,
>>>> like consumer/partition assignments.  The read ocap for consumer 7/topic
>>>> bingo/partition 131 could be revoked at the brokers for a partition and
>>>> subsequent reads would fail the fetch for requests with that ocap token.
>>>> You could also dynamically change the log level for a specific
>>>> consumer/partition.
>>>>>
>>>>> There are advantages we could discuss to having finer grained control.
>>>> Consider that scheduled partition rebalancing could be implemented with no
>>>> pauses from the perspective of the consumer threads; it looks like single
>>>> partition lag, as the offset commit occurs before rotation, with no lag to
>>>> non-rebalanced partitions: rebalance 1 partition per second so as to creep
>>>> load to a newbie consumer.  It would eliminate a global read lock and even
>>>> the internal Kafka consumer would never block on IO protocol other than the
>>>> normal fetch request (and the initial join group request).
>>>>>
>>>>> A global lock acquired through a pull protocol (HeartbeatRequest
>>>> followed by a JoinGroupRequest) for all live consumers is a much bigger
>>>> lock than security-based push protocol, as I assume the coordinator will
>>>> have open sockets to all brokers in order to reach out as needed.  As well,
>>>> each lock would be independent between partitions and be with only those
>>>> brokers in ISR for a given partition.  It is a much smaller lock.
>>>>>
>>>>> I had some time to consider my suggestion that it be viewed as a
>>>> relativistic frame of reference.  Consider the model where each dimension
>>>> of the frame of reference for each consumer is each partition, actually a
>>>> sub-space with the dimensionality of the replication factor, but with a
>>>> single leader election, so consider it 1 dimension.  The total
>>>> dimensionality of the consumers frame of reference is the number of
>>>> partitions, but only assigned partitions are open to a given consumers
>>>> viewpoint.  The offset is the partition dimension's coordinate and only
>>>> consumers with an open dimension can translate the offset.  A rebalance
>>>> opens or closes a dimension for a given consumer and can be viewed as a
>>>> rotation.  Could Kafka consumption and rebalance (and ISR leader election)
>>>> be reduced to matrix operations?
>>>>>
>>>>> Rob
>>>>>
>>>>>> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Rob,
>>>>>>
>>>>>> Sorry for the late reply.
>>>>>>
>>>>>> If I understand your approach correctly, it requires all brokers to
>>>>>> remember the partition assignment of each consumer in order to decide
>>>>>> whether or not authorizing the fetch request, correct? If we are indeed
>>>>>> going to do such authorization for the security project then maybe it
>>>> is a
>>>>>> good way to go, but otherwise might be an overkill to just support finer
>>>>>> grained partition assignment. In addition, instead of requiring a round
>>>>>> trip between the coordinator and the consumers for the synchronization
>>>>>> barrier, now the coordinator needs to wait for a round trip between
>>>> itself
>>>>>> and other brokers before it can return the join-group request, right?
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers <
>>>> robert.w.with...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Guozhang,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Currently, the brokers do not know which high-level consumers are
>>>> reading
>>>>>>> which partitions and it is the rebalance between the consumers and the
>>>>>>> coordinator which would authorize a consumer to fetch a particular
>>>>>>> partition, I think.  Does this mean that when a rebalance occurs, all
>>>>>>> consumers must send a JoinGroupRequest and that the coordinator will
>>>> not
>>>>>>> respond to any consumers until all consumers have sent the
>>>>>>> JoinGroupRequest, to enable the synchronization barrier?  That has the
>>>>>>> potential to be a sizable global delay.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On the assumption that there is only one coordinator for a group, why
>>>>>>> couldn't the synchronization barrier be per partition and internal to
>>>> kafka
>>>>>>> and mostly not involve the consumers, other than a chance for
>>>> offsetCommit
>>>>>>> by the consumer losing a partition?   If the brokers have session
>>>> state and
>>>>>>> knows the new assignment before the consumer is notified with a
>>>>>>> HeartbeatResponse, it could fail the fetch request from that consumer
>>>> for
>>>>>>> an invalidly assigned partition.  The consumer could take an invalid
>>>>>>> partition failure of the fetch request as if it were a
>>>> HeartbeatResponse
>>>>>>> partition removal.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The only gap seems to be: when does a consumer that is losing a
>>>> partition
>>>>>>> get a chance to commit its offset?  If there were a
>>>>>>> PartitionCommittedNotification message that a consumer could send to
>>>> the
>>>>>>> coordinator after committing its offsets, then the coordinator could
>>>> send
>>>>>>> the add partition HeartbeatResponse, after receiving the
>>>>>>> PartitionCommittedNotification, to the consumer gaining the partition
>>>> and
>>>>>>> the offset management is stable.  The advantage is that none of the
>>>> other
>>>>>>> consumers would be paused on any other partitions.  Only partitions
>>>> being
>>>>>>> rebalanced would see any consumption pause.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> So, something like:
>>>>>>>
>>>>>>> 1.  3 consumers running with 12 partitions balanced, 4 each
>>>>>>>
>>>>>>> 2.  New consumer starts and sends JoinGroupRequest to coordinator
>>>>>>>
>>>>>>> 3.  Coordinator computes rebalance with 4 consumers: each existing
>>>>>>> consumer will lose a partition assigned to the new consumer
>>>>>>>
>>>>>>> 4.  Coordinator informs all live brokers of partition reassignments
>>>>>>>
>>>>>>> 5.  Brokers receive reassignments, starts failing unauthorized fetch
>>>>>>> requests and acks back to the coordinator
>>>>>>>
>>>>>>> 6.  Coordinator receives all broker acks and sends HeartbeatResponses
>>>> with
>>>>>>> partition removals to existing consumers and awaits
>>>>>>> PartitionCommittedNotifications from consumers losing partitions.
>>>>>>>
>>>>>>> 7.  Existing consumers can continue to fetch messages from correctly
>>>>>>> assigned partitions
>>>>>>>
>>>>>>> 8.  When an existing consumer fails a fetch for a partition or gets a
>>>>>>> HeartbeatResponse with a partition removal, it would commitOffsets for
>>>> that
>>>>>>> partition and then send a PartitionCommittedNotification to the
>>>> coordinator.
>>>>>>>
>>>>>>> 9.  As the Coordinator receives the PartitionCommittedNotification,
>>>> for a
>>>>>>> particular partition from an existing consumer, it sends the
>>>> addPartition
>>>>>>> to consumer 4, in a HeartbeatResponse and the new consumer can start
>>>>>>> fetching that partition.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> If a consumer drops HeartbeatRequests within a session timeout, the
>>>>>>> coordinator would inform the brokers and they would fail fetchRequests
>>>> for
>>>>>>> those partitions from that consumer.   There is no chance to send
>>>>>>> removePartitions since no Heartbeat is occurring, but the addPartitions
>>>>>>> could be sent and the offset is what it is.  This seems no different
>>>> than
>>>>>>> this sort of failure, today.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Instead of a global synchronization barrier isn’t it possible to have
>>>> an
>>>>>>> incremental per-partition synchronization barrier?  The brokers would
>>>> have
>>>>>>> to be aware of this.  I think of it as relativistic from each
>>>> acceleration
>>>>>>> frame of reference, which is each consumer: event horizons.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Rob
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -----Original Message-----
>>>>>>> From: Guozhang Wang [mailto:wangg...@gmail.com]
>>>>>>> Sent: Wednesday, July 16, 2014 9:20 AM
>>>>>>> To: users@kafka.apache.org
>>>>>>> Subject: Re: New Consumer Design
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi Rob,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Piggy-back the rebalance partition info in HeartbeatResponse may cause
>>>>>>> inconsistency of the partition assignments to consumers with
>>>> consecutive
>>>>>>> triggering of rebalances, since the coordinator no longer has a
>>>>>>> synchronization barrier any more for re-compute the distribution with a
>>>>>>> consistent view.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto:
>>>>>>> robert.w.with...@gmail.com> robert.w.with...@gmail.com>
>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto:
>>>>>>> ba...@tinkerhq.com> ba...@tinkerhq.com>
>>>>>>>
>>>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto:
>>>>>>> wangg...@gmail.com> wangg...@gmail.com> wrote:
>>>>>>>
>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>
>>>>>>>
>>>>>>>>>> We have written a wiki a few weeks back proposing a single-threaded
>>>>>>>
>>>>>>>> ZK-free
>>>>>>>
>>>>>>>>>> consumer client design for 0.9:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R
>>>>>>>
>>>>>>>> ewrite+Design
>>>>>>>
>>>>>>>
>>>>>>>>>> We want to share some of the ideas that came up for this design to
>>>>>>>
>>>>>>>>>> get
>>>>>>>
>>>>>>>> some
>>>>>>>
>>>>>>>>>> early feedback. The below discussion assumes you have read the
>>>>>>>
>>>>>>>>>> above
>>>>>>>
>>>>>>>> wiki.
>>>>>>>
>>>>>>>
>>>>>>>>>> *Offset Management*
>>>>>>>
>>>>>>>
>>>>>>>>>> To make consumer clients ZK-free we need to move the offset
>>>>>>>
>>>>>>>>>> management utility from ZooKeeper into the Kafka servers, which
>>>>>>>
>>>>>>>>>> then store offsets
>>>>>>>
>>>>>>>> as
>>>>>>>
>>>>>>>>>> a special log. Key-based log compaction will be used to keep this
>>>>>>>
>>>>>>>>>> ever-growing log's size bounded. Brokers who are responsible for
>>>>>>>
>>>>>>>>>> the
>>>>>>>
>>>>>>>> offset
>>>>>>>
>>>>>>>>>> management for a given consumer group will be the leader of this
>>>>>>>
>>>>>>>>>> special offset log partition where the partition key will be
>>>>>>>
>>>>>>>>>> consumer group
>>>>>>>
>>>>>>>> names.
>>>>>>>
>>>>>>>>>> On the consumer side, instead of talking to ZK for commit and read
>>>>>>>
>>>>>>>> offsets,
>>>>>>>
>>>>>>>>>> it talks to the servers with new offset commit and offset fetch
>>>>>>>
>>>>>>>>>> request types for offset management. This work has been done and
>>>>>>>
>>>>>>>>>> will be
>>>>>>>
>>>>>>>> included
>>>>>>>
>>>>>>>>>> in the 0.8.2 release. Details of the implementation can be found in
>>>>>>>
>>>>>>>>>> KAFKA-1000 and this wiki:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off
>>>>>>>
>>>>>>>> set+Management
>>>>>>>
>>>>>>>
>>>>>>>>>> *Group Membership Management*
>>>>>>>
>>>>>>>
>>>>>>>>>> The next step is to move the membership management and rebalancing
>>>>>>>
>>>>>>>> utility
>>>>>>>
>>>>>>>>>> out of ZooKeeper as well. To do this we introduced a consumer
>>>>>>>
>>>>>>>> coordinator
>>>>>>>
>>>>>>>>>> hosted on the Kafka servers which is responsible for keeping track
>>>>>>>
>>>>>>>>>> of
>>>>>>>
>>>>>>>> group
>>>>>>>
>>>>>>>>>> membership and consumption partition changes, and the corresponding
>>>>>>>
>>>>>>>>>> rebalancing process.
>>>>>>>
>>>>>>>
>>>>>>>>>> *1. Failure Detection*
>>>>>>>
>>>>>>>
>>>>>>>>>> More specifically, we will use a heartbeating protocol for consumer
>>>>>>>
>>>>>>>>>> and coordinator failure detections. The heartbeating protocol would
>>>>>>>
>>>>>>>>>> be
>>>>>>>
>>>>>>>> similar
>>>>>>>
>>>>>>>>>> to the one ZK used, i.e. the consumer will set a session timeout
>>>>>>>
>>>>>>>>>> value
>>>>>>>
>>>>>>>> to
>>>>>>>
>>>>>>>>>> the coordinator on startup, and keep sending heartbeats to the
>>>>>>>
>>>>>>>> coordinator
>>>>>>>
>>>>>>>>>> afterwards every session-timeout / heartbeat-frequency. The
>>>>>>>
>>>>>>>>>> coordinator will treat a consumer as failed when it has not heard
>>>>>>>
>>>>>>>>>> from the consumer after session-timeout, and the consumer will
>>>>>>>
>>>>>>>>>> treat its coordinator as failed after it has not heard its
>>>>>>>
>>>>>>>>>> heartbeat responses after
>>>>>>>
>>>>>>>> session-timeout.
>>>>>>>
>>>>>>>
>>>>>>>>>> One difference with ZK heartbeat protocol is that instead of fixing
>>>>>>>
>>>>>>>>>> the heartbeat-frequency as three, we make this value configurable
>>>>>>>
>>>>>>>>>> in
>>>>>>>
>>>>>>>> consumer
>>>>>>>
>>>>>>>>>> clients. This is because the rebalancing process (we will discuss
>>>>>>>
>>>>>>>>>> later) latency is lower bounded by the heartbeat frequency , so we
>>>>>>>
>>>>>>>>>> want this
>>>>>>>
>>>>>>>> value
>>>>>>>
>>>>>>>>>> to be large while not DDoSing the servers.
>>>>>>>
>>>>>>>
>>>>>>>>>> *2. Consumer Rebalance*
>>>>>>>
>>>>>>>
>>>>>>>>>> A bunch of events can trigger a rebalance process, 1) consumer
>>>>>>>
>>>>>>>>>> failure,
>>>>>>>
>>>>>>>> 2)
>>>>>>>
>>>>>>>>>> new consumer joining group, 3) existing consumer change consume
>>>>>>>
>>>>>>>>>> topic/partitions, 4) partition change for the consuming topics.
>>>>>>>
>>>>>>>>>> Once the coordinator decides to trigger a rebalance it will notify
>>>>>>>
>>>>>>>>>> the consumers within to resend their topic/partition subscription
>>>>>>>
>>>>>>>>>> information, and
>>>>>>>
>>>>>>>> then
>>>>>>>
>>>>>>>>>> assign existing partitions to these consumers. On the consumers
>>>>>>>
>>>>>>>>>> end,
>>>>>>>
>>>>>>>> they
>>>>>>>
>>>>>>>>>> no long run any rebalance logic in a distributed manner but follows
>>>>>>>
>>>>>>>>>> the partition assignment it receives from the coordinator. Since
>>>>>>>
>>>>>>>>>> the
>>>>>>>
>>>>>>>> consumers
>>>>>>>
>>>>>>>>>> can only be notified of a rebalance triggering via heartbeat
>>>>>>>
>>>>>>>>>> responses,
>>>>>>>
>>>>>>>> the
>>>>>>>
>>>>>>>>>> rebalance latency is lower bounded by the heartbeat frequency; that
>>>>>>>
>>>>>>>>>> is
>>>>>>>
>>>>>>>> why
>>>>>>>
>>>>>>>>>> we allow consumers to negotiate this value with the coordinator
>>>>>>>
>>>>>>>>>> upon registration, and we would like to hear people's thoughts
>>>>>>>
>>>>>>>>>> about this protocol.
>>>>>>>
>>>>>>>
>>>>>>>>> One question on this topic, my planned usage pattern is to have a
>>>>>>>
>>>>>>>>> high
>>>>>>>
>>>>>>>> number of consumers that are ephemeral.  Imagine a consumer coming
>>>>>>>
>>>>>>>> online, consuming messages for a short duration, then disappearing
>>>>>>>
>>>>>>>> forever.  Is this use case supported?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> If consumers are coming and going, there will be a high rate of
>>>>>>>
>>>>>>>> rebalances and that will really start throttling consumption at some
>>>>>>>
>>>>>>>> point.  You may have a use that hits that detrimentally.  A custom
>>>>>>>
>>>>>>>> rebalance algorithm may be able to rebalance a sub-set of the
>>>>>>>
>>>>>>>> consumers and not inform others of a need to rebalance them, which
>>>>>>>
>>>>>>>> would ameliorate the cost.  Still the consumers that need to rebalance
>>>>>>> would pay.
>>>>>>>
>>>>>>>
>>>>>>>> If the initial JoinGroupRequest was preserved, would it be possible to
>>>>>>>
>>>>>>>> send rebalanced partition information in the HeartbeatResponse, up
>>>>>>>
>>>>>>>> front to all (or a sub-set) consumers, and avoid the rejoin round
>>>>>>>
>>>>>>>> trip?  I mean, push the rebalance and not pull it with a secondary
>>>>>>>
>>>>>>>> JoinGroupRequest.  This would reduce cost and lower the throttle
>>>>>>>
>>>>>>>> point, but poorly timed fetch requests may fail.  If the consumer were
>>>>>>>
>>>>>>>> resilient to fetch failures, might it work?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>>> In addition, the rebalance logic in the new consumer will be
>>>>>>>
>>>>>>>> customizable
>>>>>>>
>>>>>>>>>> instead of hard-written in a per-topic round-robin manner. Users
>>>>>>>
>>>>>>>>>> will be able to write their own assignment class following a common
>>>>>>>
>>>>>>>>>> interface,
>>>>>>>
>>>>>>>> and
>>>>>>>
>>>>>>>>>> consumers upon registering themselves will specify in their
>>>>>>>
>>>>>>>>>> registration request the assigner class name they wanted to use. If
>>>>>>>
>>>>>>>>>> consumers within
>>>>>>>
>>>>>>>> the
>>>>>>>
>>>>>>>>>> same group specify different algorithms the registration will be
>>>>>>>
>>>>>>>> rejected.
>>>>>>>
>>>>>>>>>> We are not sure if it is the best way of customizing rebalance
>>>>>>>
>>>>>>>>>> logic
>>>>>>>
>>>>>>>> yet,
>>>>>>>
>>>>>>>>>> so any feedbacks are more than welcome.
>>>>>>>
>>>>>>>
>>>>>>>>>> For wildcard consumption, the consumers now will capture new topics
>>>>>>>
>>>>>>>>>> that are available for fetching through the topic metadata request.
>>>>>>>
>>>>>>>>>> That is, periodically the consumers will update its topic metadata
>>>>>>>
>>>>>>>>>> for fetching,
>>>>>>>
>>>>>>>> and
>>>>>>>
>>>>>>>>>> if new topics are returned in the metadata response matching its
>>>>>>>
>>>>>>>> wildcard
>>>>>>>
>>>>>>>>>> regex, it will notify the coordinator to let it trigger a new
>>>>>>>
>>>>>>>>>> rebalance
>>>>>>>
>>>>>>>> to
>>>>>>>
>>>>>>>>>> assign partitions for this new topic.
>>>>>>>
>>>>>>>
>>>>>>>>>> There are some failure handling cases during the rebalancing
>>>>>>>
>>>>>>>>>> process discussed in the consumer rewrite design wiki. We would
>>>>>>>
>>>>>>>>>> encourage
>>>>>>>
>>>>>>>> people to
>>>>>>>
>>>>>>>>>> read it and let us know if there are any other corner cases not
>>>>>>> covered.
>>>>>>>
>>>>>>>
>>>>>>>>>> Moving forward, we are thinking about making the coordinator to
>>>>>>>
>>>>>>>>>> handle
>>>>>>>
>>>>>>>> both
>>>>>>>
>>>>>>>>>> the group management and offset management for the given groups,
>>>> i.e.
>>>>>>>
>>>>>>>> the
>>>>>>>
>>>>>>>>>> leader of the offset log partition will naturally become the
>>>>>>>
>>>>>>>> coordinator of
>>>>>>>
>>>>>>>>>> the corresponding consumer. This allows the coordinator to reject
>>>>>>>
>>>>>>>>>> offset commit requests if its sender is not part of the group. To
>>>>>>>
>>>>>>>>>> do so a
>>>>>>>
>>>>>>>> consumer
>>>>>>>
>>>>>>>>>> id and generation id are applied to control the group member
>>>>>>>
>>>>>>>> generations.
>>>>>>>
>>>>>>>>>> Details of this design is included in the write design wiki page,
>>>>>>>
>>>>>>>>>> and
>>>>>>>
>>>>>>>> again
>>>>>>>
>>>>>>>>>> any feedbacks are appreciated.
>>>>>>>
>>>>>>>
>>>>>>>>>> *3. Non-blocking Network IO*
>>>>>>>
>>>>>>>
>>>>>>>>>> With a single-threaded consumer, we will instead use a non-blocking
>>>>>>>
>>>>>>>> network
>>>>>>>
>>>>>>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer
>>>>>>>
>>>>>>>>>> does
>>>>>>>
>>>>>>>> for
>>>>>>>
>>>>>>>>>> asynchronous sending data. In terms of implementation, we will let
>>>>>>>
>>>>>>>>>> the
>>>>>>>
>>>>>>>> new
>>>>>>>
>>>>>>>>>> consumer/producer clients be based on a common non-blocking IO
>>>>>>>
>>>>>>>>>> thread class. Details of this refactoring can be found in
>>>> KAFKA-1316.
>>>>>>>
>>>>>>>
>>>>>>>>>> *4. Consumer API*
>>>>>>>
>>>>>>>
>>>>>>>>>> As a result of the single-threaded non-blocking network IO
>>>>>>>
>>>>>>>> implementation,
>>>>>>>
>>>>>>>>>> the new consumer API will no longer base on a blocking stream
>>>>>>>
>>>>>>>>>> iterator interface but a poll(timeout) interface. In addition, the
>>>>>>>
>>>>>>>>>> consumer APIs
>>>>>>>
>>>>>>>> are
>>>>>>>
>>>>>>>>>> designed to be flexible so that people can choose to either use the
>>>>>>>
>>>>>>>> default
>>>>>>>
>>>>>>>>>> offset and group membership management utilities mentioned above or
>>>>>>>
>>>>>>>>>> implement their own offset/group management logic. For example, for
>>>>>>>
>>>>>>>>>> scenarios that needs fixed partition assignment or that prefer to
>>>>>>>
>>>>>>>>>> store offsets locally in a data store, the new consumer APIs would
>>>>>>>
>>>>>>>>>> make it
>>>>>>>
>>>>>>>> much
>>>>>>>
>>>>>>>>>> easier to customize. Details of the API design can be found in
>>>>>>>
>>>>>>>> KAFKA-1328.
>>>>>>>
>>>>>>>>>> A question to think about is are there any other potential use
>>>>>>>
>>>>>>>>>> cases
>>>>>>>
>>>>>>>> that
>>>>>>>
>>>>>>>>>> can not be easily supported by this API.
>>>>>>>
>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> -- Guozhang
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>
>>>
>>>
>>> --
>>> -- Guozhang

Reply via email to