Hi Jay,

For sure and a capability protocol adds even more.  It's down to cost/benefit, 
a nice place to be.

- Rob

> On Jul 22, 2014, at 10:46 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> 
> Hey Robert,
> 
> I think the issue is that you lose much of the simplification if you
> still need to heart beat and you need a new commit notification
> message. I think that would make this proposal more complex than the
> current one.
> 
> -Jay
> 
> On Mon, Jul 21, 2014 at 6:44 PM, Robert Withers
> <robert.w.with...@gmail.com> wrote:
>> Thanks, Jay, for the good summary.  Regarding point 2, I would think the 
>> heartbeat would still be desired, to give control over liveness detection 
>> parameters and to directly inform clients when gaining or losing a partition 
>> (especially when gaining a partition).  There would be no barrier and the 
>> rebalancer would be an offline scheduler, issuing SwitchPartition commands.  
>> The evaluation of a SwitchPartition command would await the consumer losing 
>> a partition to commit offset and any local commit work needed before 
>> confirming completion to the co-ordinator, which would then inform the new 
>> consumer and ISR brokers about the partition gain.  Broker security would be 
>> the master record of love assignments.
>> 
>> Thanks,
>> Rob
>> 
>>> On Jul 21, 2014, at 6:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>> 
>>> This thread is a bit long, but let me see if I can restate it
>>> correctly (not sure I fully follow).
>>> 
>>> There are two suggestions:
>>> 1. Allow partial rebalances that move just some partitions. I.e. if a
>>> consumer fails and has only one partition only one other consumer
>>> should be effected (the one who picks up the new partition). If there
>>> are many partitions to be reassigned there will obviously be a
>>> tradeoff between impacting all consumers and balancing load evenly.
>>> I.e. if you moved all load to one other consumer that would cause
>>> little rebalancing interruption but poor load balancing.
>>> 2. Have the co-ordinator communicate the assignments to the brokers
>>> rather than to the client directly. This could potentially simplify
>>> the consumer. Perhaps it would be possible to have the leader track
>>> liveness using the fetch requests rather than needing an artificial
>>> heartbeat.
>>> 
>>> These are interesting ideas.
>>> 
>>> -Jay
>>> 
>>> 
>>> 
>>>> On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>> Hello Rob,
>>>> 
>>>> If I get your idea right, the idea is that if the rebalance only changes
>>>> the ownership of a few consumers in the group, the coordinator can just
>>>> sync with them and do not interrupt with other consumers.
>>>> 
>>>> I think this approach may work. However it will likely complicates the
>>>> logic of coordinator after we sketch out all the details since the
>>>> rebalance results is basically depend on two variables: 1) partitions for
>>>> the subscribed topics, 2) consumers inside the group, hence following this
>>>> approach by the time the coordinator decides to trigger a rebalance, it
>>>> must correctly keep track of which variable changes triggers the current
>>>> rebalance process; on the other hand, the normal rebalance process even
>>>> with a global barrier should usually be very fast, with a few hundreds of
>>>> millis. So I am not sure if this is a worthy optimization that we would
>>>> want for now. What do you think?
>>>> 
>>>> Guozhang
>>>> 
>>>> 
>>>> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers 
>>>> <robert.w.with...@gmail.com
>>>>> wrote:
>>>> 
>>>>> Lock is a bad way to say it; a barrier is better.  I don't think what I am
>>>>> saying is even a barrier, since the rebalance would just need to recompute
>>>>> a rebalance schedule and submit it.  The only processing delay is to allow
>>>>> a soft remove to let the client cleanup, before you turn on the new guy, 
>>>>> so
>>>>> it lags a bit.  Do you think this could this work?
>>>>> 
>>>>> Thanks,
>>>>> Rob
>>>>> 
>>>>>>> On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>> Hi Guozhang,
>>>>>> 
>>>>>> Thank you for considering my suggestions.  The security layer sounds
>>>>> like the right facet to design for these sorts of capabilities.  Have you
>>>>> considered a chained ocap security model for the broker using hash tokens?
>>>>> This would provide for per-partition read/write capabilities with QoS
>>>>> context including leases, revocation, debug level and monitoring.  
>>>>> Overkill
>>>>> disappears as no domain specific info needs to be stored at the brokers,
>>>>> like consumer/partition assignments.  The read ocap for consumer 7/topic
>>>>> bingo/partition 131 could be revoked at the brokers for a partition and
>>>>> subsequent reads would fail the fetch for requests with that ocap token.
>>>>> You could also dynamically change the log level for a specific
>>>>> consumer/partition.
>>>>>> 
>>>>>> There are advantages we could discuss to having finer grained control.
>>>>> Consider that scheduled partition rebalancing could be implemented with no
>>>>> pauses from the perspective of the consumer threads; it looks like single
>>>>> partition lag, as the offset commit occurs before rotation, with no lag to
>>>>> non-rebalanced partitions: rebalance 1 partition per second so as to creep
>>>>> load to a newbie consumer.  It would eliminate a global read lock and even
>>>>> the internal Kafka consumer would never block on IO protocol other than 
>>>>> the
>>>>> normal fetch request (and the initial join group request).
>>>>>> 
>>>>>> A global lock acquired through a pull protocol (HeartbeatRequest
>>>>> followed by a JoinGroupRequest) for all live consumers is a much bigger
>>>>> lock than security-based push protocol, as I assume the coordinator will
>>>>> have open sockets to all brokers in order to reach out as needed.  As 
>>>>> well,
>>>>> each lock would be independent between partitions and be with only those
>>>>> brokers in ISR for a given partition.  It is a much smaller lock.
>>>>>> 
>>>>>> I had some time to consider my suggestion that it be viewed as a
>>>>> relativistic frame of reference.  Consider the model where each dimension
>>>>> of the frame of reference for each consumer is each partition, actually a
>>>>> sub-space with the dimensionality of the replication factor, but with a
>>>>> single leader election, so consider it 1 dimension.  The total
>>>>> dimensionality of the consumers frame of reference is the number of
>>>>> partitions, but only assigned partitions are open to a given consumers
>>>>> viewpoint.  The offset is the partition dimension's coordinate and only
>>>>> consumers with an open dimension can translate the offset.  A rebalance
>>>>> opens or closes a dimension for a given consumer and can be viewed as a
>>>>> rotation.  Could Kafka consumption and rebalance (and ISR leader election)
>>>>> be reduced to matrix operations?
>>>>>> 
>>>>>> Rob
>>>>>> 
>>>>>>> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Rob,
>>>>>>> 
>>>>>>> Sorry for the late reply.
>>>>>>> 
>>>>>>> If I understand your approach correctly, it requires all brokers to
>>>>>>> remember the partition assignment of each consumer in order to decide
>>>>>>> whether or not authorizing the fetch request, correct? If we are indeed
>>>>>>> going to do such authorization for the security project then maybe it
>>>>> is a
>>>>>>> good way to go, but otherwise might be an overkill to just support finer
>>>>>>> grained partition assignment. In addition, instead of requiring a round
>>>>>>> trip between the coordinator and the consumers for the synchronization
>>>>>>> barrier, now the coordinator needs to wait for a round trip between
>>>>> itself
>>>>>>> and other brokers before it can return the join-group request, right?
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers <
>>>>> robert.w.with...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Guozhang,
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Currently, the brokers do not know which high-level consumers are
>>>>> reading
>>>>>>>> which partitions and it is the rebalance between the consumers and the
>>>>>>>> coordinator which would authorize a consumer to fetch a particular
>>>>>>>> partition, I think.  Does this mean that when a rebalance occurs, all
>>>>>>>> consumers must send a JoinGroupRequest and that the coordinator will
>>>>> not
>>>>>>>> respond to any consumers until all consumers have sent the
>>>>>>>> JoinGroupRequest, to enable the synchronization barrier?  That has the
>>>>>>>> potential to be a sizable global delay.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On the assumption that there is only one coordinator for a group, why
>>>>>>>> couldn't the synchronization barrier be per partition and internal to
>>>>> kafka
>>>>>>>> and mostly not involve the consumers, other than a chance for
>>>>> offsetCommit
>>>>>>>> by the consumer losing a partition?   If the brokers have session
>>>>> state and
>>>>>>>> knows the new assignment before the consumer is notified with a
>>>>>>>> HeartbeatResponse, it could fail the fetch request from that consumer
>>>>> for
>>>>>>>> an invalidly assigned partition.  The consumer could take an invalid
>>>>>>>> partition failure of the fetch request as if it were a
>>>>> HeartbeatResponse
>>>>>>>> partition removal.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> The only gap seems to be: when does a consumer that is losing a
>>>>> partition
>>>>>>>> get a chance to commit its offset?  If there were a
>>>>>>>> PartitionCommittedNotification message that a consumer could send to
>>>>> the
>>>>>>>> coordinator after committing its offsets, then the coordinator could
>>>>> send
>>>>>>>> the add partition HeartbeatResponse, after receiving the
>>>>>>>> PartitionCommittedNotification, to the consumer gaining the partition
>>>>> and
>>>>>>>> the offset management is stable.  The advantage is that none of the
>>>>> other
>>>>>>>> consumers would be paused on any other partitions.  Only partitions
>>>>> being
>>>>>>>> rebalanced would see any consumption pause.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> So, something like:
>>>>>>>> 
>>>>>>>> 1.  3 consumers running with 12 partitions balanced, 4 each
>>>>>>>> 
>>>>>>>> 2.  New consumer starts and sends JoinGroupRequest to coordinator
>>>>>>>> 
>>>>>>>> 3.  Coordinator computes rebalance with 4 consumers: each existing
>>>>>>>> consumer will lose a partition assigned to the new consumer
>>>>>>>> 
>>>>>>>> 4.  Coordinator informs all live brokers of partition reassignments
>>>>>>>> 
>>>>>>>> 5.  Brokers receive reassignments, starts failing unauthorized fetch
>>>>>>>> requests and acks back to the coordinator
>>>>>>>> 
>>>>>>>> 6.  Coordinator receives all broker acks and sends HeartbeatResponses
>>>>> with
>>>>>>>> partition removals to existing consumers and awaits
>>>>>>>> PartitionCommittedNotifications from consumers losing partitions.
>>>>>>>> 
>>>>>>>> 7.  Existing consumers can continue to fetch messages from correctly
>>>>>>>> assigned partitions
>>>>>>>> 
>>>>>>>> 8.  When an existing consumer fails a fetch for a partition or gets a
>>>>>>>> HeartbeatResponse with a partition removal, it would commitOffsets for
>>>>> that
>>>>>>>> partition and then send a PartitionCommittedNotification to the
>>>>> coordinator.
>>>>>>>> 
>>>>>>>> 9.  As the Coordinator receives the PartitionCommittedNotification,
>>>>> for a
>>>>>>>> particular partition from an existing consumer, it sends the
>>>>> addPartition
>>>>>>>> to consumer 4, in a HeartbeatResponse and the new consumer can start
>>>>>>>> fetching that partition.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> If a consumer drops HeartbeatRequests within a session timeout, the
>>>>>>>> coordinator would inform the brokers and they would fail fetchRequests
>>>>> for
>>>>>>>> those partitions from that consumer.   There is no chance to send
>>>>>>>> removePartitions since no Heartbeat is occurring, but the addPartitions
>>>>>>>> could be sent and the offset is what it is.  This seems no different
>>>>> than
>>>>>>>> this sort of failure, today.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Instead of a global synchronization barrier isn’t it possible to have
>>>>> an
>>>>>>>> incremental per-partition synchronization barrier?  The brokers would
>>>>> have
>>>>>>>> to be aware of this.  I think of it as relativistic from each
>>>>> acceleration
>>>>>>>> frame of reference, which is each consumer: event horizons.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> 
>>>>>>>> Rob
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -----Original Message-----
>>>>>>>> From: Guozhang Wang [mailto:wangg...@gmail.com]
>>>>>>>> Sent: Wednesday, July 16, 2014 9:20 AM
>>>>>>>> To: users@kafka.apache.org
>>>>>>>> Subject: Re: New Consumer Design
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Hi Rob,
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Piggy-back the rebalance partition info in HeartbeatResponse may cause
>>>>>>>> inconsistency of the partition assignments to consumers with
>>>>> consecutive
>>>>>>>> triggering of rebalances, since the coordinator no longer has a
>>>>>>>> synchronization barrier any more for re-compute the distribution with a
>>>>>>>> consistent view.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Guozhang
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto:
>>>>>>>> robert.w.with...@gmail.com> robert.w.with...@gmail.com>
>>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto:
>>>>>>>> ba...@tinkerhq.com> ba...@tinkerhq.com>
>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto:
>>>>>>>> wangg...@gmail.com> wangg...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> Hi All,
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> We have written a wiki a few weeks back proposing a single-threaded
>>>>>>>> 
>>>>>>>>> ZK-free
>>>>>>>> 
>>>>>>>>>>> consumer client design for 0.9:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> <
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R
>>>>>>>> 
>>>>>>>>> ewrite+Design
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> We want to share some of the ideas that came up for this design to
>>>>>>>> 
>>>>>>>>>>> get
>>>>>>>> 
>>>>>>>>> some
>>>>>>>> 
>>>>>>>>>>> early feedback. The below discussion assumes you have read the
>>>>>>>> 
>>>>>>>>>>> above
>>>>>>>> 
>>>>>>>>> wiki.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> *Offset Management*
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> To make consumer clients ZK-free we need to move the offset
>>>>>>>> 
>>>>>>>>>>> management utility from ZooKeeper into the Kafka servers, which
>>>>>>>> 
>>>>>>>>>>> then store offsets
>>>>>>>> 
>>>>>>>>> as
>>>>>>>> 
>>>>>>>>>>> a special log. Key-based log compaction will be used to keep this
>>>>>>>> 
>>>>>>>>>>> ever-growing log's size bounded. Brokers who are responsible for
>>>>>>>> 
>>>>>>>>>>> the
>>>>>>>> 
>>>>>>>>> offset
>>>>>>>> 
>>>>>>>>>>> management for a given consumer group will be the leader of this
>>>>>>>> 
>>>>>>>>>>> special offset log partition where the partition key will be
>>>>>>>> 
>>>>>>>>>>> consumer group
>>>>>>>> 
>>>>>>>>> names.
>>>>>>>> 
>>>>>>>>>>> On the consumer side, instead of talking to ZK for commit and read
>>>>>>>> 
>>>>>>>>> offsets,
>>>>>>>> 
>>>>>>>>>>> it talks to the servers with new offset commit and offset fetch
>>>>>>>> 
>>>>>>>>>>> request types for offset management. This work has been done and
>>>>>>>> 
>>>>>>>>>>> will be
>>>>>>>> 
>>>>>>>>> included
>>>>>>>> 
>>>>>>>>>>> in the 0.8.2 release. Details of the implementation can be found in
>>>>>>>> 
>>>>>>>>>>> KAFKA-1000 and this wiki:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> <
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off
>>>>>>>> 
>>>>>>>>> set+Management
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> *Group Membership Management*
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> The next step is to move the membership management and rebalancing
>>>>>>>> 
>>>>>>>>> utility
>>>>>>>> 
>>>>>>>>>>> out of ZooKeeper as well. To do this we introduced a consumer
>>>>>>>> 
>>>>>>>>> coordinator
>>>>>>>> 
>>>>>>>>>>> hosted on the Kafka servers which is responsible for keeping track
>>>>>>>> 
>>>>>>>>>>> of
>>>>>>>> 
>>>>>>>>> group
>>>>>>>> 
>>>>>>>>>>> membership and consumption partition changes, and the corresponding
>>>>>>>> 
>>>>>>>>>>> rebalancing process.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> *1. Failure Detection*
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> More specifically, we will use a heartbeating protocol for consumer
>>>>>>>> 
>>>>>>>>>>> and coordinator failure detections. The heartbeating protocol would
>>>>>>>> 
>>>>>>>>>>> be
>>>>>>>> 
>>>>>>>>> similar
>>>>>>>> 
>>>>>>>>>>> to the one ZK used, i.e. the consumer will set a session timeout
>>>>>>>> 
>>>>>>>>>>> value
>>>>>>>> 
>>>>>>>>> to
>>>>>>>> 
>>>>>>>>>>> the coordinator on startup, and keep sending heartbeats to the
>>>>>>>> 
>>>>>>>>> coordinator
>>>>>>>> 
>>>>>>>>>>> afterwards every session-timeout / heartbeat-frequency. The
>>>>>>>> 
>>>>>>>>>>> coordinator will treat a consumer as failed when it has not heard
>>>>>>>> 
>>>>>>>>>>> from the consumer after session-timeout, and the consumer will
>>>>>>>> 
>>>>>>>>>>> treat its coordinator as failed after it has not heard its
>>>>>>>> 
>>>>>>>>>>> heartbeat responses after
>>>>>>>> 
>>>>>>>>> session-timeout.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> One difference with ZK heartbeat protocol is that instead of fixing
>>>>>>>> 
>>>>>>>>>>> the heartbeat-frequency as three, we make this value configurable
>>>>>>>> 
>>>>>>>>>>> in
>>>>>>>> 
>>>>>>>>> consumer
>>>>>>>> 
>>>>>>>>>>> clients. This is because the rebalancing process (we will discuss
>>>>>>>> 
>>>>>>>>>>> later) latency is lower bounded by the heartbeat frequency , so we
>>>>>>>> 
>>>>>>>>>>> want this
>>>>>>>> 
>>>>>>>>> value
>>>>>>>> 
>>>>>>>>>>> to be large while not DDoSing the servers.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> *2. Consumer Rebalance*
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> A bunch of events can trigger a rebalance process, 1) consumer
>>>>>>>> 
>>>>>>>>>>> failure,
>>>>>>>> 
>>>>>>>>> 2)
>>>>>>>> 
>>>>>>>>>>> new consumer joining group, 3) existing consumer change consume
>>>>>>>> 
>>>>>>>>>>> topic/partitions, 4) partition change for the consuming topics.
>>>>>>>> 
>>>>>>>>>>> Once the coordinator decides to trigger a rebalance it will notify
>>>>>>>> 
>>>>>>>>>>> the consumers within to resend their topic/partition subscription
>>>>>>>> 
>>>>>>>>>>> information, and
>>>>>>>> 
>>>>>>>>> then
>>>>>>>> 
>>>>>>>>>>> assign existing partitions to these consumers. On the consumers
>>>>>>>> 
>>>>>>>>>>> end,
>>>>>>>> 
>>>>>>>>> they
>>>>>>>> 
>>>>>>>>>>> no long run any rebalance logic in a distributed manner but follows
>>>>>>>> 
>>>>>>>>>>> the partition assignment it receives from the coordinator. Since
>>>>>>>> 
>>>>>>>>>>> the
>>>>>>>> 
>>>>>>>>> consumers
>>>>>>>> 
>>>>>>>>>>> can only be notified of a rebalance triggering via heartbeat
>>>>>>>> 
>>>>>>>>>>> responses,
>>>>>>>> 
>>>>>>>>> the
>>>>>>>> 
>>>>>>>>>>> rebalance latency is lower bounded by the heartbeat frequency; that
>>>>>>>> 
>>>>>>>>>>> is
>>>>>>>> 
>>>>>>>>> why
>>>>>>>> 
>>>>>>>>>>> we allow consumers to negotiate this value with the coordinator
>>>>>>>> 
>>>>>>>>>>> upon registration, and we would like to hear people's thoughts
>>>>>>>> 
>>>>>>>>>>> about this protocol.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>> One question on this topic, my planned usage pattern is to have a
>>>>>>>> 
>>>>>>>>>> high
>>>>>>>> 
>>>>>>>>> number of consumers that are ephemeral.  Imagine a consumer coming
>>>>>>>> 
>>>>>>>>> online, consuming messages for a short duration, then disappearing
>>>>>>>> 
>>>>>>>>> forever.  Is this use case supported?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> If consumers are coming and going, there will be a high rate of
>>>>>>>> 
>>>>>>>>> rebalances and that will really start throttling consumption at some
>>>>>>>> 
>>>>>>>>> point.  You may have a use that hits that detrimentally.  A custom
>>>>>>>> 
>>>>>>>>> rebalance algorithm may be able to rebalance a sub-set of the
>>>>>>>> 
>>>>>>>>> consumers and not inform others of a need to rebalance them, which
>>>>>>>> 
>>>>>>>>> would ameliorate the cost.  Still the consumers that need to rebalance
>>>>>>>> would pay.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> If the initial JoinGroupRequest was preserved, would it be possible to
>>>>>>>> 
>>>>>>>>> send rebalanced partition information in the HeartbeatResponse, up
>>>>>>>> 
>>>>>>>>> front to all (or a sub-set) consumers, and avoid the rejoin round
>>>>>>>> 
>>>>>>>>> trip?  I mean, push the rebalance and not pull it with a secondary
>>>>>>>> 
>>>>>>>>> JoinGroupRequest.  This would reduce cost and lower the throttle
>>>>>>>> 
>>>>>>>>> point, but poorly timed fetch requests may fail.  If the consumer were
>>>>>>>> 
>>>>>>>>> resilient to fetch failures, might it work?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> In addition, the rebalance logic in the new consumer will be
>>>>>>>> 
>>>>>>>>> customizable
>>>>>>>> 
>>>>>>>>>>> instead of hard-written in a per-topic round-robin manner. Users
>>>>>>>> 
>>>>>>>>>>> will be able to write their own assignment class following a common
>>>>>>>> 
>>>>>>>>>>> interface,
>>>>>>>> 
>>>>>>>>> and
>>>>>>>> 
>>>>>>>>>>> consumers upon registering themselves will specify in their
>>>>>>>> 
>>>>>>>>>>> registration request the assigner class name they wanted to use. If
>>>>>>>> 
>>>>>>>>>>> consumers within
>>>>>>>> 
>>>>>>>>> the
>>>>>>>> 
>>>>>>>>>>> same group specify different algorithms the registration will be
>>>>>>>> 
>>>>>>>>> rejected.
>>>>>>>> 
>>>>>>>>>>> We are not sure if it is the best way of customizing rebalance
>>>>>>>> 
>>>>>>>>>>> logic
>>>>>>>> 
>>>>>>>>> yet,
>>>>>>>> 
>>>>>>>>>>> so any feedbacks are more than welcome.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> For wildcard consumption, the consumers now will capture new topics
>>>>>>>> 
>>>>>>>>>>> that are available for fetching through the topic metadata request.
>>>>>>>> 
>>>>>>>>>>> That is, periodically the consumers will update its topic metadata
>>>>>>>> 
>>>>>>>>>>> for fetching,
>>>>>>>> 
>>>>>>>>> and
>>>>>>>> 
>>>>>>>>>>> if new topics are returned in the metadata response matching its
>>>>>>>> 
>>>>>>>>> wildcard
>>>>>>>> 
>>>>>>>>>>> regex, it will notify the coordinator to let it trigger a new
>>>>>>>> 
>>>>>>>>>>> rebalance
>>>>>>>> 
>>>>>>>>> to
>>>>>>>> 
>>>>>>>>>>> assign partitions for this new topic.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> There are some failure handling cases during the rebalancing
>>>>>>>> 
>>>>>>>>>>> process discussed in the consumer rewrite design wiki. We would
>>>>>>>> 
>>>>>>>>>>> encourage
>>>>>>>> 
>>>>>>>>> people to
>>>>>>>> 
>>>>>>>>>>> read it and let us know if there are any other corner cases not
>>>>>>>> covered.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> Moving forward, we are thinking about making the coordinator to
>>>>>>>> 
>>>>>>>>>>> handle
>>>>>>>> 
>>>>>>>>> both
>>>>>>>> 
>>>>>>>>>>> the group management and offset management for the given groups,
>>>>> i.e.
>>>>>>>> 
>>>>>>>>> the
>>>>>>>> 
>>>>>>>>>>> leader of the offset log partition will naturally become the
>>>>>>>> 
>>>>>>>>> coordinator of
>>>>>>>> 
>>>>>>>>>>> the corresponding consumer. This allows the coordinator to reject
>>>>>>>> 
>>>>>>>>>>> offset commit requests if its sender is not part of the group. To
>>>>>>>> 
>>>>>>>>>>> do so a
>>>>>>>> 
>>>>>>>>> consumer
>>>>>>>> 
>>>>>>>>>>> id and generation id are applied to control the group member
>>>>>>>> 
>>>>>>>>> generations.
>>>>>>>> 
>>>>>>>>>>> Details of this design is included in the write design wiki page,
>>>>>>>> 
>>>>>>>>>>> and
>>>>>>>> 
>>>>>>>>> again
>>>>>>>> 
>>>>>>>>>>> any feedbacks are appreciated.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> *3. Non-blocking Network IO*
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> With a single-threaded consumer, we will instead use a non-blocking
>>>>>>>> 
>>>>>>>>> network
>>>>>>>> 
>>>>>>>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer
>>>>>>>> 
>>>>>>>>>>> does
>>>>>>>> 
>>>>>>>>> for
>>>>>>>> 
>>>>>>>>>>> asynchronous sending data. In terms of implementation, we will let
>>>>>>>> 
>>>>>>>>>>> the
>>>>>>>> 
>>>>>>>>> new
>>>>>>>> 
>>>>>>>>>>> consumer/producer clients be based on a common non-blocking IO
>>>>>>>> 
>>>>>>>>>>> thread class. Details of this refactoring can be found in
>>>>> KAFKA-1316.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> *4. Consumer API*
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> As a result of the single-threaded non-blocking network IO
>>>>>>>> 
>>>>>>>>> implementation,
>>>>>>>> 
>>>>>>>>>>> the new consumer API will no longer base on a blocking stream
>>>>>>>> 
>>>>>>>>>>> iterator interface but a poll(timeout) interface. In addition, the
>>>>>>>> 
>>>>>>>>>>> consumer APIs
>>>>>>>> 
>>>>>>>>> are
>>>>>>>> 
>>>>>>>>>>> designed to be flexible so that people can choose to either use the
>>>>>>>> 
>>>>>>>>> default
>>>>>>>> 
>>>>>>>>>>> offset and group membership management utilities mentioned above or
>>>>>>>> 
>>>>>>>>>>> implement their own offset/group management logic. For example, for
>>>>>>>> 
>>>>>>>>>>> scenarios that needs fixed partition assignment or that prefer to
>>>>>>>> 
>>>>>>>>>>> store offsets locally in a data store, the new consumer APIs would
>>>>>>>> 
>>>>>>>>>>> make it
>>>>>>>> 
>>>>>>>>> much
>>>>>>>> 
>>>>>>>>>>> easier to customize. Details of the API design can be found in
>>>>>>>> 
>>>>>>>>> KAFKA-1328.
>>>>>>>> 
>>>>>>>>>>> A question to think about is are there any other potential use
>>>>>>>> 
>>>>>>>>>>> cases
>>>>>>>> 
>>>>>>>>> that
>>>>>>>> 
>>>>>>>>>>> can not be easily supported by this API.
>>>>>>>> 
>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>> 
>>>>>>>>>>> Guozhang
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> 
>>>>>>>> -- Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang

Reply via email to