...live assignments!  :)

> On Jul 21, 2014, at 7:44 PM, Robert Withers <robert.w.with...@gmail.com> 
> wrote:
> 
> Thanks, Jay, for the good summary.  Regarding point 2, I would think the 
> heartbeat would still be desired, to give control over liveness detection 
> parameters and to directly inform clients when gaining or losing a partition 
> (especially when gaining a partition).  There would be no barrier and the 
> rebalancer would be an offline scheduler, issuing SwitchPartition commands.  
> The evaluation of a SwitchPartition command would await the consumer losing a 
> partition to commit offset and any local commit work needed before confirming 
> completion to the co-ordinator, which would then inform the new consumer and 
> ISR brokers about the partition gain.  Broker security would be the master 
> record of love assignments.  
> 
> Thanks,
> Rob
> 
>> On Jul 21, 2014, at 6:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>> 
>> This thread is a bit long, but let me see if I can restate it
>> correctly (not sure I fully follow).
>> 
>> There are two suggestions:
>> 1. Allow partial rebalances that move just some partitions. I.e. if a
>> consumer fails and has only one partition only one other consumer
>> should be effected (the one who picks up the new partition). If there
>> are many partitions to be reassigned there will obviously be a
>> tradeoff between impacting all consumers and balancing load evenly.
>> I.e. if you moved all load to one other consumer that would cause
>> little rebalancing interruption but poor load balancing.
>> 2. Have the co-ordinator communicate the assignments to the brokers
>> rather than to the client directly. This could potentially simplify
>> the consumer. Perhaps it would be possible to have the leader track
>> liveness using the fetch requests rather than needing an artificial
>> heartbeat.
>> 
>> These are interesting ideas.
>> 
>> -Jay
>> 
>> 
>> 
>>> On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>> Hello Rob,
>>> 
>>> If I get your idea right, the idea is that if the rebalance only changes
>>> the ownership of a few consumers in the group, the coordinator can just
>>> sync with them and do not interrupt with other consumers.
>>> 
>>> I think this approach may work. However it will likely complicates the
>>> logic of coordinator after we sketch out all the details since the
>>> rebalance results is basically depend on two variables: 1) partitions for
>>> the subscribed topics, 2) consumers inside the group, hence following this
>>> approach by the time the coordinator decides to trigger a rebalance, it
>>> must correctly keep track of which variable changes triggers the current
>>> rebalance process; on the other hand, the normal rebalance process even
>>> with a global barrier should usually be very fast, with a few hundreds of
>>> millis. So I am not sure if this is a worthy optimization that we would
>>> want for now. What do you think?
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers <robert.w.with...@gmail.com
>>>> wrote:
>>> 
>>>> Lock is a bad way to say it; a barrier is better.  I don't think what I am
>>>> saying is even a barrier, since the rebalance would just need to recompute
>>>> a rebalance schedule and submit it.  The only processing delay is to allow
>>>> a soft remove to let the client cleanup, before you turn on the new guy, so
>>>> it lags a bit.  Do you think this could this work?
>>>> 
>>>> Thanks,
>>>> Rob
>>>> 
>>>>>> On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>> Hi Guozhang,
>>>>> 
>>>>> Thank you for considering my suggestions.  The security layer sounds
>>>> like the right facet to design for these sorts of capabilities.  Have you
>>>> considered a chained ocap security model for the broker using hash tokens?
>>>> This would provide for per-partition read/write capabilities with QoS
>>>> context including leases, revocation, debug level and monitoring.  Overkill
>>>> disappears as no domain specific info needs to be stored at the brokers,
>>>> like consumer/partition assignments.  The read ocap for consumer 7/topic
>>>> bingo/partition 131 could be revoked at the brokers for a partition and
>>>> subsequent reads would fail the fetch for requests with that ocap token.
>>>> You could also dynamically change the log level for a specific
>>>> consumer/partition.
>>>>> 
>>>>> There are advantages we could discuss to having finer grained control.
>>>> Consider that scheduled partition rebalancing could be implemented with no
>>>> pauses from the perspective of the consumer threads; it looks like single
>>>> partition lag, as the offset commit occurs before rotation, with no lag to
>>>> non-rebalanced partitions: rebalance 1 partition per second so as to creep
>>>> load to a newbie consumer.  It would eliminate a global read lock and even
>>>> the internal Kafka consumer would never block on IO protocol other than the
>>>> normal fetch request (and the initial join group request).
>>>>> 
>>>>> A global lock acquired through a pull protocol (HeartbeatRequest
>>>> followed by a JoinGroupRequest) for all live consumers is a much bigger
>>>> lock than security-based push protocol, as I assume the coordinator will
>>>> have open sockets to all brokers in order to reach out as needed.  As well,
>>>> each lock would be independent between partitions and be with only those
>>>> brokers in ISR for a given partition.  It is a much smaller lock.
>>>>> 
>>>>> I had some time to consider my suggestion that it be viewed as a
>>>> relativistic frame of reference.  Consider the model where each dimension
>>>> of the frame of reference for each consumer is each partition, actually a
>>>> sub-space with the dimensionality of the replication factor, but with a
>>>> single leader election, so consider it 1 dimension.  The total
>>>> dimensionality of the consumers frame of reference is the number of
>>>> partitions, but only assigned partitions are open to a given consumers
>>>> viewpoint.  The offset is the partition dimension's coordinate and only
>>>> consumers with an open dimension can translate the offset.  A rebalance
>>>> opens or closes a dimension for a given consumer and can be viewed as a
>>>> rotation.  Could Kafka consumption and rebalance (and ISR leader election)
>>>> be reduced to matrix operations?
>>>>> 
>>>>> Rob
>>>>> 
>>>>>> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi Rob,
>>>>>> 
>>>>>> Sorry for the late reply.
>>>>>> 
>>>>>> If I understand your approach correctly, it requires all brokers to
>>>>>> remember the partition assignment of each consumer in order to decide
>>>>>> whether or not authorizing the fetch request, correct? If we are indeed
>>>>>> going to do such authorization for the security project then maybe it
>>>> is a
>>>>>> good way to go, but otherwise might be an overkill to just support finer
>>>>>> grained partition assignment. In addition, instead of requiring a round
>>>>>> trip between the coordinator and the consumers for the synchronization
>>>>>> barrier, now the coordinator needs to wait for a round trip between
>>>> itself
>>>>>> and other brokers before it can return the join-group request, right?
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers <
>>>> robert.w.with...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Guozhang,
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Currently, the brokers do not know which high-level consumers are
>>>> reading
>>>>>>> which partitions and it is the rebalance between the consumers and the
>>>>>>> coordinator which would authorize a consumer to fetch a particular
>>>>>>> partition, I think.  Does this mean that when a rebalance occurs, all
>>>>>>> consumers must send a JoinGroupRequest and that the coordinator will
>>>> not
>>>>>>> respond to any consumers until all consumers have sent the
>>>>>>> JoinGroupRequest, to enable the synchronization barrier?  That has the
>>>>>>> potential to be a sizable global delay.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On the assumption that there is only one coordinator for a group, why
>>>>>>> couldn't the synchronization barrier be per partition and internal to
>>>> kafka
>>>>>>> and mostly not involve the consumers, other than a chance for
>>>> offsetCommit
>>>>>>> by the consumer losing a partition?   If the brokers have session
>>>> state and
>>>>>>> knows the new assignment before the consumer is notified with a
>>>>>>> HeartbeatResponse, it could fail the fetch request from that consumer
>>>> for
>>>>>>> an invalidly assigned partition.  The consumer could take an invalid
>>>>>>> partition failure of the fetch request as if it were a
>>>> HeartbeatResponse
>>>>>>> partition removal.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> The only gap seems to be: when does a consumer that is losing a
>>>> partition
>>>>>>> get a chance to commit its offset?  If there were a
>>>>>>> PartitionCommittedNotification message that a consumer could send to
>>>> the
>>>>>>> coordinator after committing its offsets, then the coordinator could
>>>> send
>>>>>>> the add partition HeartbeatResponse, after receiving the
>>>>>>> PartitionCommittedNotification, to the consumer gaining the partition
>>>> and
>>>>>>> the offset management is stable.  The advantage is that none of the
>>>> other
>>>>>>> consumers would be paused on any other partitions.  Only partitions
>>>> being
>>>>>>> rebalanced would see any consumption pause.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> So, something like:
>>>>>>> 
>>>>>>> 1.  3 consumers running with 12 partitions balanced, 4 each
>>>>>>> 
>>>>>>> 2.  New consumer starts and sends JoinGroupRequest to coordinator
>>>>>>> 
>>>>>>> 3.  Coordinator computes rebalance with 4 consumers: each existing
>>>>>>> consumer will lose a partition assigned to the new consumer
>>>>>>> 
>>>>>>> 4.  Coordinator informs all live brokers of partition reassignments
>>>>>>> 
>>>>>>> 5.  Brokers receive reassignments, starts failing unauthorized fetch
>>>>>>> requests and acks back to the coordinator
>>>>>>> 
>>>>>>> 6.  Coordinator receives all broker acks and sends HeartbeatResponses
>>>> with
>>>>>>> partition removals to existing consumers and awaits
>>>>>>> PartitionCommittedNotifications from consumers losing partitions.
>>>>>>> 
>>>>>>> 7.  Existing consumers can continue to fetch messages from correctly
>>>>>>> assigned partitions
>>>>>>> 
>>>>>>> 8.  When an existing consumer fails a fetch for a partition or gets a
>>>>>>> HeartbeatResponse with a partition removal, it would commitOffsets for
>>>> that
>>>>>>> partition and then send a PartitionCommittedNotification to the
>>>> coordinator.
>>>>>>> 
>>>>>>> 9.  As the Coordinator receives the PartitionCommittedNotification,
>>>> for a
>>>>>>> particular partition from an existing consumer, it sends the
>>>> addPartition
>>>>>>> to consumer 4, in a HeartbeatResponse and the new consumer can start
>>>>>>> fetching that partition.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> If a consumer drops HeartbeatRequests within a session timeout, the
>>>>>>> coordinator would inform the brokers and they would fail fetchRequests
>>>> for
>>>>>>> those partitions from that consumer.   There is no chance to send
>>>>>>> removePartitions since no Heartbeat is occurring, but the addPartitions
>>>>>>> could be sent and the offset is what it is.  This seems no different
>>>> than
>>>>>>> this sort of failure, today.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Instead of a global synchronization barrier isn’t it possible to have
>>>> an
>>>>>>> incremental per-partition synchronization barrier?  The brokers would
>>>> have
>>>>>>> to be aware of this.  I think of it as relativistic from each
>>>> acceleration
>>>>>>> frame of reference, which is each consumer: event horizons.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Regards,
>>>>>>> 
>>>>>>> Rob
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -----Original Message-----
>>>>>>> From: Guozhang Wang [mailto:wangg...@gmail.com]
>>>>>>> Sent: Wednesday, July 16, 2014 9:20 AM
>>>>>>> To: users@kafka.apache.org
>>>>>>> Subject: Re: New Consumer Design
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Hi Rob,
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Piggy-back the rebalance partition info in HeartbeatResponse may cause
>>>>>>> inconsistency of the partition assignments to consumers with
>>>> consecutive
>>>>>>> triggering of rebalances, since the coordinator no longer has a
>>>>>>> synchronization barrier any more for re-compute the distribution with a
>>>>>>> consistent view.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto:
>>>>>>> robert.w.with...@gmail.com> robert.w.with...@gmail.com>
>>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto:
>>>>>>> ba...@tinkerhq.com> ba...@tinkerhq.com>
>>>>>>> 
>>>>>>>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto:
>>>>>>> wangg...@gmail.com> wangg...@gmail.com> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>>>>> Hi All,
>>>>>>> 
>>>>>>> 
>>>>>>>>>> We have written a wiki a few weeks back proposing a single-threaded
>>>>>>> 
>>>>>>>> ZK-free
>>>>>>> 
>>>>>>>>>> consumer client design for 0.9:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R
>>>>>>> 
>>>>>>>> ewrite+Design
>>>>>>> 
>>>>>>> 
>>>>>>>>>> We want to share some of the ideas that came up for this design to
>>>>>>> 
>>>>>>>>>> get
>>>>>>> 
>>>>>>>> some
>>>>>>> 
>>>>>>>>>> early feedback. The below discussion assumes you have read the
>>>>>>> 
>>>>>>>>>> above
>>>>>>> 
>>>>>>>> wiki.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> *Offset Management*
>>>>>>> 
>>>>>>> 
>>>>>>>>>> To make consumer clients ZK-free we need to move the offset
>>>>>>> 
>>>>>>>>>> management utility from ZooKeeper into the Kafka servers, which
>>>>>>> 
>>>>>>>>>> then store offsets
>>>>>>> 
>>>>>>>> as
>>>>>>> 
>>>>>>>>>> a special log. Key-based log compaction will be used to keep this
>>>>>>> 
>>>>>>>>>> ever-growing log's size bounded. Brokers who are responsible for
>>>>>>> 
>>>>>>>>>> the
>>>>>>> 
>>>>>>>> offset
>>>>>>> 
>>>>>>>>>> management for a given consumer group will be the leader of this
>>>>>>> 
>>>>>>>>>> special offset log partition where the partition key will be
>>>>>>> 
>>>>>>>>>> consumer group
>>>>>>> 
>>>>>>>> names.
>>>>>>> 
>>>>>>>>>> On the consumer side, instead of talking to ZK for commit and read
>>>>>>> 
>>>>>>>> offsets,
>>>>>>> 
>>>>>>>>>> it talks to the servers with new offset commit and offset fetch
>>>>>>> 
>>>>>>>>>> request types for offset management. This work has been done and
>>>>>>> 
>>>>>>>>>> will be
>>>>>>> 
>>>>>>>> included
>>>>>>> 
>>>>>>>>>> in the 0.8.2 release. Details of the implementation can be found in
>>>>>>> 
>>>>>>>>>> KAFKA-1000 and this wiki:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off
>>>>>>> 
>>>>>>>> set+Management
>>>>>>> 
>>>>>>> 
>>>>>>>>>> *Group Membership Management*
>>>>>>> 
>>>>>>> 
>>>>>>>>>> The next step is to move the membership management and rebalancing
>>>>>>> 
>>>>>>>> utility
>>>>>>> 
>>>>>>>>>> out of ZooKeeper as well. To do this we introduced a consumer
>>>>>>> 
>>>>>>>> coordinator
>>>>>>> 
>>>>>>>>>> hosted on the Kafka servers which is responsible for keeping track
>>>>>>> 
>>>>>>>>>> of
>>>>>>> 
>>>>>>>> group
>>>>>>> 
>>>>>>>>>> membership and consumption partition changes, and the corresponding
>>>>>>> 
>>>>>>>>>> rebalancing process.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> *1. Failure Detection*
>>>>>>> 
>>>>>>> 
>>>>>>>>>> More specifically, we will use a heartbeating protocol for consumer
>>>>>>> 
>>>>>>>>>> and coordinator failure detections. The heartbeating protocol would
>>>>>>> 
>>>>>>>>>> be
>>>>>>> 
>>>>>>>> similar
>>>>>>> 
>>>>>>>>>> to the one ZK used, i.e. the consumer will set a session timeout
>>>>>>> 
>>>>>>>>>> value
>>>>>>> 
>>>>>>>> to
>>>>>>> 
>>>>>>>>>> the coordinator on startup, and keep sending heartbeats to the
>>>>>>> 
>>>>>>>> coordinator
>>>>>>> 
>>>>>>>>>> afterwards every session-timeout / heartbeat-frequency. The
>>>>>>> 
>>>>>>>>>> coordinator will treat a consumer as failed when it has not heard
>>>>>>> 
>>>>>>>>>> from the consumer after session-timeout, and the consumer will
>>>>>>> 
>>>>>>>>>> treat its coordinator as failed after it has not heard its
>>>>>>> 
>>>>>>>>>> heartbeat responses after
>>>>>>> 
>>>>>>>> session-timeout.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> One difference with ZK heartbeat protocol is that instead of fixing
>>>>>>> 
>>>>>>>>>> the heartbeat-frequency as three, we make this value configurable
>>>>>>> 
>>>>>>>>>> in
>>>>>>> 
>>>>>>>> consumer
>>>>>>> 
>>>>>>>>>> clients. This is because the rebalancing process (we will discuss
>>>>>>> 
>>>>>>>>>> later) latency is lower bounded by the heartbeat frequency , so we
>>>>>>> 
>>>>>>>>>> want this
>>>>>>> 
>>>>>>>> value
>>>>>>> 
>>>>>>>>>> to be large while not DDoSing the servers.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> *2. Consumer Rebalance*
>>>>>>> 
>>>>>>> 
>>>>>>>>>> A bunch of events can trigger a rebalance process, 1) consumer
>>>>>>> 
>>>>>>>>>> failure,
>>>>>>> 
>>>>>>>> 2)
>>>>>>> 
>>>>>>>>>> new consumer joining group, 3) existing consumer change consume
>>>>>>> 
>>>>>>>>>> topic/partitions, 4) partition change for the consuming topics.
>>>>>>> 
>>>>>>>>>> Once the coordinator decides to trigger a rebalance it will notify
>>>>>>> 
>>>>>>>>>> the consumers within to resend their topic/partition subscription
>>>>>>> 
>>>>>>>>>> information, and
>>>>>>> 
>>>>>>>> then
>>>>>>> 
>>>>>>>>>> assign existing partitions to these consumers. On the consumers
>>>>>>> 
>>>>>>>>>> end,
>>>>>>> 
>>>>>>>> they
>>>>>>> 
>>>>>>>>>> no long run any rebalance logic in a distributed manner but follows
>>>>>>> 
>>>>>>>>>> the partition assignment it receives from the coordinator. Since
>>>>>>> 
>>>>>>>>>> the
>>>>>>> 
>>>>>>>> consumers
>>>>>>> 
>>>>>>>>>> can only be notified of a rebalance triggering via heartbeat
>>>>>>> 
>>>>>>>>>> responses,
>>>>>>> 
>>>>>>>> the
>>>>>>> 
>>>>>>>>>> rebalance latency is lower bounded by the heartbeat frequency; that
>>>>>>> 
>>>>>>>>>> is
>>>>>>> 
>>>>>>>> why
>>>>>>> 
>>>>>>>>>> we allow consumers to negotiate this value with the coordinator
>>>>>>> 
>>>>>>>>>> upon registration, and we would like to hear people's thoughts
>>>>>>> 
>>>>>>>>>> about this protocol.
>>>>>>> 
>>>>>>> 
>>>>>>>>> One question on this topic, my planned usage pattern is to have a
>>>>>>> 
>>>>>>>>> high
>>>>>>> 
>>>>>>>> number of consumers that are ephemeral.  Imagine a consumer coming
>>>>>>> 
>>>>>>>> online, consuming messages for a short duration, then disappearing
>>>>>>> 
>>>>>>>> forever.  Is this use case supported?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> If consumers are coming and going, there will be a high rate of
>>>>>>> 
>>>>>>>> rebalances and that will really start throttling consumption at some
>>>>>>> 
>>>>>>>> point.  You may have a use that hits that detrimentally.  A custom
>>>>>>> 
>>>>>>>> rebalance algorithm may be able to rebalance a sub-set of the
>>>>>>> 
>>>>>>>> consumers and not inform others of a need to rebalance them, which
>>>>>>> 
>>>>>>>> would ameliorate the cost.  Still the consumers that need to rebalance
>>>>>>> would pay.
>>>>>>> 
>>>>>>> 
>>>>>>>> If the initial JoinGroupRequest was preserved, would it be possible to
>>>>>>> 
>>>>>>>> send rebalanced partition information in the HeartbeatResponse, up
>>>>>>> 
>>>>>>>> front to all (or a sub-set) consumers, and avoid the rejoin round
>>>>>>> 
>>>>>>>> trip?  I mean, push the rebalance and not pull it with a secondary
>>>>>>> 
>>>>>>>> JoinGroupRequest.  This would reduce cost and lower the throttle
>>>>>>> 
>>>>>>>> point, but poorly timed fetch requests may fail.  If the consumer were
>>>>>>> 
>>>>>>>> resilient to fetch failures, might it work?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>>>> In addition, the rebalance logic in the new consumer will be
>>>>>>> 
>>>>>>>> customizable
>>>>>>> 
>>>>>>>>>> instead of hard-written in a per-topic round-robin manner. Users
>>>>>>> 
>>>>>>>>>> will be able to write their own assignment class following a common
>>>>>>> 
>>>>>>>>>> interface,
>>>>>>> 
>>>>>>>> and
>>>>>>> 
>>>>>>>>>> consumers upon registering themselves will specify in their
>>>>>>> 
>>>>>>>>>> registration request the assigner class name they wanted to use. If
>>>>>>> 
>>>>>>>>>> consumers within
>>>>>>> 
>>>>>>>> the
>>>>>>> 
>>>>>>>>>> same group specify different algorithms the registration will be
>>>>>>> 
>>>>>>>> rejected.
>>>>>>> 
>>>>>>>>>> We are not sure if it is the best way of customizing rebalance
>>>>>>> 
>>>>>>>>>> logic
>>>>>>> 
>>>>>>>> yet,
>>>>>>> 
>>>>>>>>>> so any feedbacks are more than welcome.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> For wildcard consumption, the consumers now will capture new topics
>>>>>>> 
>>>>>>>>>> that are available for fetching through the topic metadata request.
>>>>>>> 
>>>>>>>>>> That is, periodically the consumers will update its topic metadata
>>>>>>> 
>>>>>>>>>> for fetching,
>>>>>>> 
>>>>>>>> and
>>>>>>> 
>>>>>>>>>> if new topics are returned in the metadata response matching its
>>>>>>> 
>>>>>>>> wildcard
>>>>>>> 
>>>>>>>>>> regex, it will notify the coordinator to let it trigger a new
>>>>>>> 
>>>>>>>>>> rebalance
>>>>>>> 
>>>>>>>> to
>>>>>>> 
>>>>>>>>>> assign partitions for this new topic.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> There are some failure handling cases during the rebalancing
>>>>>>> 
>>>>>>>>>> process discussed in the consumer rewrite design wiki. We would
>>>>>>> 
>>>>>>>>>> encourage
>>>>>>> 
>>>>>>>> people to
>>>>>>> 
>>>>>>>>>> read it and let us know if there are any other corner cases not
>>>>>>> covered.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> Moving forward, we are thinking about making the coordinator to
>>>>>>> 
>>>>>>>>>> handle
>>>>>>> 
>>>>>>>> both
>>>>>>> 
>>>>>>>>>> the group management and offset management for the given groups,
>>>> i.e.
>>>>>>> 
>>>>>>>> the
>>>>>>> 
>>>>>>>>>> leader of the offset log partition will naturally become the
>>>>>>> 
>>>>>>>> coordinator of
>>>>>>> 
>>>>>>>>>> the corresponding consumer. This allows the coordinator to reject
>>>>>>> 
>>>>>>>>>> offset commit requests if its sender is not part of the group. To
>>>>>>> 
>>>>>>>>>> do so a
>>>>>>> 
>>>>>>>> consumer
>>>>>>> 
>>>>>>>>>> id and generation id are applied to control the group member
>>>>>>> 
>>>>>>>> generations.
>>>>>>> 
>>>>>>>>>> Details of this design is included in the write design wiki page,
>>>>>>> 
>>>>>>>>>> and
>>>>>>> 
>>>>>>>> again
>>>>>>> 
>>>>>>>>>> any feedbacks are appreciated.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> *3. Non-blocking Network IO*
>>>>>>> 
>>>>>>> 
>>>>>>>>>> With a single-threaded consumer, we will instead use a non-blocking
>>>>>>> 
>>>>>>>> network
>>>>>>> 
>>>>>>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer
>>>>>>> 
>>>>>>>>>> does
>>>>>>> 
>>>>>>>> for
>>>>>>> 
>>>>>>>>>> asynchronous sending data. In terms of implementation, we will let
>>>>>>> 
>>>>>>>>>> the
>>>>>>> 
>>>>>>>> new
>>>>>>> 
>>>>>>>>>> consumer/producer clients be based on a common non-blocking IO
>>>>>>> 
>>>>>>>>>> thread class. Details of this refactoring can be found in
>>>> KAFKA-1316.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> *4. Consumer API*
>>>>>>> 
>>>>>>> 
>>>>>>>>>> As a result of the single-threaded non-blocking network IO
>>>>>>> 
>>>>>>>> implementation,
>>>>>>> 
>>>>>>>>>> the new consumer API will no longer base on a blocking stream
>>>>>>> 
>>>>>>>>>> iterator interface but a poll(timeout) interface. In addition, the
>>>>>>> 
>>>>>>>>>> consumer APIs
>>>>>>> 
>>>>>>>> are
>>>>>>> 
>>>>>>>>>> designed to be flexible so that people can choose to either use the
>>>>>>> 
>>>>>>>> default
>>>>>>> 
>>>>>>>>>> offset and group membership management utilities mentioned above or
>>>>>>> 
>>>>>>>>>> implement their own offset/group management logic. For example, for
>>>>>>> 
>>>>>>>>>> scenarios that needs fixed partition assignment or that prefer to
>>>>>>> 
>>>>>>>>>> store offsets locally in a data store, the new consumer APIs would
>>>>>>> 
>>>>>>>>>> make it
>>>>>>> 
>>>>>>>> much
>>>>>>> 
>>>>>>>>>> easier to customize. Details of the API design can be found in
>>>>>>> 
>>>>>>>> KAFKA-1328.
>>>>>>> 
>>>>>>>>>> A question to think about is are there any other potential use
>>>>>>> 
>>>>>>>>>> cases
>>>>>>> 
>>>>>>>> that
>>>>>>> 
>>>>>>>>>> can not be easily supported by this API.
>>>>>>> 
>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>> 
>>>>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> 
>>>>>>> -- Guozhang
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang

Reply via email to