Hey Robert, I think the issue is that you lose much of the simplification if you still need to heart beat and you need a new commit notification message. I think that would make this proposal more complex than the current one.
-Jay On Mon, Jul 21, 2014 at 6:44 PM, Robert Withers <robert.w.with...@gmail.com> wrote: > Thanks, Jay, for the good summary. Regarding point 2, I would think the > heartbeat would still be desired, to give control over liveness detection > parameters and to directly inform clients when gaining or losing a partition > (especially when gaining a partition). There would be no barrier and the > rebalancer would be an offline scheduler, issuing SwitchPartition commands. > The evaluation of a SwitchPartition command would await the consumer losing a > partition to commit offset and any local commit work needed before confirming > completion to the co-ordinator, which would then inform the new consumer and > ISR brokers about the partition gain. Broker security would be the master > record of love assignments. > > Thanks, > Rob > >> On Jul 21, 2014, at 6:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >> This thread is a bit long, but let me see if I can restate it >> correctly (not sure I fully follow). >> >> There are two suggestions: >> 1. Allow partial rebalances that move just some partitions. I.e. if a >> consumer fails and has only one partition only one other consumer >> should be effected (the one who picks up the new partition). If there >> are many partitions to be reassigned there will obviously be a >> tradeoff between impacting all consumers and balancing load evenly. >> I.e. if you moved all load to one other consumer that would cause >> little rebalancing interruption but poor load balancing. >> 2. Have the co-ordinator communicate the assignments to the brokers >> rather than to the client directly. This could potentially simplify >> the consumer. Perhaps it would be possible to have the leader track >> liveness using the fetch requests rather than needing an artificial >> heartbeat. >> >> These are interesting ideas. >> >> -Jay >> >> >> >>> On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang <wangg...@gmail.com> wrote: >>> Hello Rob, >>> >>> If I get your idea right, the idea is that if the rebalance only changes >>> the ownership of a few consumers in the group, the coordinator can just >>> sync with them and do not interrupt with other consumers. >>> >>> I think this approach may work. However it will likely complicates the >>> logic of coordinator after we sketch out all the details since the >>> rebalance results is basically depend on two variables: 1) partitions for >>> the subscribed topics, 2) consumers inside the group, hence following this >>> approach by the time the coordinator decides to trigger a rebalance, it >>> must correctly keep track of which variable changes triggers the current >>> rebalance process; on the other hand, the normal rebalance process even >>> with a global barrier should usually be very fast, with a few hundreds of >>> millis. So I am not sure if this is a worthy optimization that we would >>> want for now. What do you think? >>> >>> Guozhang >>> >>> >>> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers <robert.w.with...@gmail.com >>>> wrote: >>> >>>> Lock is a bad way to say it; a barrier is better. I don't think what I am >>>> saying is even a barrier, since the rebalance would just need to recompute >>>> a rebalance schedule and submit it. The only processing delay is to allow >>>> a soft remove to let the client cleanup, before you turn on the new guy, so >>>> it lags a bit. Do you think this could this work? >>>> >>>> Thanks, >>>> Rob >>>> >>>>>> On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com> >>>>> wrote: >>>>> >>>>> Hi Guozhang, >>>>> >>>>> Thank you for considering my suggestions. The security layer sounds >>>> like the right facet to design for these sorts of capabilities. Have you >>>> considered a chained ocap security model for the broker using hash tokens? >>>> This would provide for per-partition read/write capabilities with QoS >>>> context including leases, revocation, debug level and monitoring. Overkill >>>> disappears as no domain specific info needs to be stored at the brokers, >>>> like consumer/partition assignments. The read ocap for consumer 7/topic >>>> bingo/partition 131 could be revoked at the brokers for a partition and >>>> subsequent reads would fail the fetch for requests with that ocap token. >>>> You could also dynamically change the log level for a specific >>>> consumer/partition. >>>>> >>>>> There are advantages we could discuss to having finer grained control. >>>> Consider that scheduled partition rebalancing could be implemented with no >>>> pauses from the perspective of the consumer threads; it looks like single >>>> partition lag, as the offset commit occurs before rotation, with no lag to >>>> non-rebalanced partitions: rebalance 1 partition per second so as to creep >>>> load to a newbie consumer. It would eliminate a global read lock and even >>>> the internal Kafka consumer would never block on IO protocol other than the >>>> normal fetch request (and the initial join group request). >>>>> >>>>> A global lock acquired through a pull protocol (HeartbeatRequest >>>> followed by a JoinGroupRequest) for all live consumers is a much bigger >>>> lock than security-based push protocol, as I assume the coordinator will >>>> have open sockets to all brokers in order to reach out as needed. As well, >>>> each lock would be independent between partitions and be with only those >>>> brokers in ISR for a given partition. It is a much smaller lock. >>>>> >>>>> I had some time to consider my suggestion that it be viewed as a >>>> relativistic frame of reference. Consider the model where each dimension >>>> of the frame of reference for each consumer is each partition, actually a >>>> sub-space with the dimensionality of the replication factor, but with a >>>> single leader election, so consider it 1 dimension. The total >>>> dimensionality of the consumers frame of reference is the number of >>>> partitions, but only assigned partitions are open to a given consumers >>>> viewpoint. The offset is the partition dimension's coordinate and only >>>> consumers with an open dimension can translate the offset. A rebalance >>>> opens or closes a dimension for a given consumer and can be viewed as a >>>> rotation. Could Kafka consumption and rebalance (and ISR leader election) >>>> be reduced to matrix operations? >>>>> >>>>> Rob >>>>> >>>>>> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote: >>>>>> >>>>>> Hi Rob, >>>>>> >>>>>> Sorry for the late reply. >>>>>> >>>>>> If I understand your approach correctly, it requires all brokers to >>>>>> remember the partition assignment of each consumer in order to decide >>>>>> whether or not authorizing the fetch request, correct? If we are indeed >>>>>> going to do such authorization for the security project then maybe it >>>> is a >>>>>> good way to go, but otherwise might be an overkill to just support finer >>>>>> grained partition assignment. In addition, instead of requiring a round >>>>>> trip between the coordinator and the consumers for the synchronization >>>>>> barrier, now the coordinator needs to wait for a round trip between >>>> itself >>>>>> and other brokers before it can return the join-group request, right? >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers < >>>> robert.w.with...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Guozhang, >>>>>>> >>>>>>> >>>>>>> >>>>>>> Currently, the brokers do not know which high-level consumers are >>>> reading >>>>>>> which partitions and it is the rebalance between the consumers and the >>>>>>> coordinator which would authorize a consumer to fetch a particular >>>>>>> partition, I think. Does this mean that when a rebalance occurs, all >>>>>>> consumers must send a JoinGroupRequest and that the coordinator will >>>> not >>>>>>> respond to any consumers until all consumers have sent the >>>>>>> JoinGroupRequest, to enable the synchronization barrier? That has the >>>>>>> potential to be a sizable global delay. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On the assumption that there is only one coordinator for a group, why >>>>>>> couldn't the synchronization barrier be per partition and internal to >>>> kafka >>>>>>> and mostly not involve the consumers, other than a chance for >>>> offsetCommit >>>>>>> by the consumer losing a partition? If the brokers have session >>>> state and >>>>>>> knows the new assignment before the consumer is notified with a >>>>>>> HeartbeatResponse, it could fail the fetch request from that consumer >>>> for >>>>>>> an invalidly assigned partition. The consumer could take an invalid >>>>>>> partition failure of the fetch request as if it were a >>>> HeartbeatResponse >>>>>>> partition removal. >>>>>>> >>>>>>> >>>>>>> >>>>>>> The only gap seems to be: when does a consumer that is losing a >>>> partition >>>>>>> get a chance to commit its offset? If there were a >>>>>>> PartitionCommittedNotification message that a consumer could send to >>>> the >>>>>>> coordinator after committing its offsets, then the coordinator could >>>> send >>>>>>> the add partition HeartbeatResponse, after receiving the >>>>>>> PartitionCommittedNotification, to the consumer gaining the partition >>>> and >>>>>>> the offset management is stable. The advantage is that none of the >>>> other >>>>>>> consumers would be paused on any other partitions. Only partitions >>>> being >>>>>>> rebalanced would see any consumption pause. >>>>>>> >>>>>>> >>>>>>> >>>>>>> So, something like: >>>>>>> >>>>>>> 1. 3 consumers running with 12 partitions balanced, 4 each >>>>>>> >>>>>>> 2. New consumer starts and sends JoinGroupRequest to coordinator >>>>>>> >>>>>>> 3. Coordinator computes rebalance with 4 consumers: each existing >>>>>>> consumer will lose a partition assigned to the new consumer >>>>>>> >>>>>>> 4. Coordinator informs all live brokers of partition reassignments >>>>>>> >>>>>>> 5. Brokers receive reassignments, starts failing unauthorized fetch >>>>>>> requests and acks back to the coordinator >>>>>>> >>>>>>> 6. Coordinator receives all broker acks and sends HeartbeatResponses >>>> with >>>>>>> partition removals to existing consumers and awaits >>>>>>> PartitionCommittedNotifications from consumers losing partitions. >>>>>>> >>>>>>> 7. Existing consumers can continue to fetch messages from correctly >>>>>>> assigned partitions >>>>>>> >>>>>>> 8. When an existing consumer fails a fetch for a partition or gets a >>>>>>> HeartbeatResponse with a partition removal, it would commitOffsets for >>>> that >>>>>>> partition and then send a PartitionCommittedNotification to the >>>> coordinator. >>>>>>> >>>>>>> 9. As the Coordinator receives the PartitionCommittedNotification, >>>> for a >>>>>>> particular partition from an existing consumer, it sends the >>>> addPartition >>>>>>> to consumer 4, in a HeartbeatResponse and the new consumer can start >>>>>>> fetching that partition. >>>>>>> >>>>>>> >>>>>>> >>>>>>> If a consumer drops HeartbeatRequests within a session timeout, the >>>>>>> coordinator would inform the brokers and they would fail fetchRequests >>>> for >>>>>>> those partitions from that consumer. There is no chance to send >>>>>>> removePartitions since no Heartbeat is occurring, but the addPartitions >>>>>>> could be sent and the offset is what it is. This seems no different >>>> than >>>>>>> this sort of failure, today. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Instead of a global synchronization barrier isn’t it possible to have >>>> an >>>>>>> incremental per-partition synchronization barrier? The brokers would >>>> have >>>>>>> to be aware of this. I think of it as relativistic from each >>>> acceleration >>>>>>> frame of reference, which is each consumer: event horizons. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Rob >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -----Original Message----- >>>>>>> From: Guozhang Wang [mailto:wangg...@gmail.com] >>>>>>> Sent: Wednesday, July 16, 2014 9:20 AM >>>>>>> To: users@kafka.apache.org >>>>>>> Subject: Re: New Consumer Design >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hi Rob, >>>>>>> >>>>>>> >>>>>>> >>>>>>> Piggy-back the rebalance partition info in HeartbeatResponse may cause >>>>>>> inconsistency of the partition assignments to consumers with >>>> consecutive >>>>>>> triggering of rebalances, since the coordinator no longer has a >>>>>>> synchronization barrier any more for re-compute the distribution with a >>>>>>> consistent view. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto: >>>>>>> robert.w.with...@gmail.com> robert.w.with...@gmail.com> >>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto: >>>>>>> ba...@tinkerhq.com> ba...@tinkerhq.com> >>>>>>> >>>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto: >>>>>>> wangg...@gmail.com> wangg...@gmail.com> wrote: >>>>>>> >>>>>>> >>>>>>>>>> Hi All, >>>>>>> >>>>>>> >>>>>>>>>> We have written a wiki a few weeks back proposing a single-threaded >>>>>>> >>>>>>>> ZK-free >>>>>>> >>>>>>>>>> consumer client design for 0.9: >>>>>>> >>>>>>> >>>>>>> >>>>>>>> < >>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R >>>>>>> >>>>>>>> ewrite+Design >>>>>>> >>>>>>> >>>>>>>>>> We want to share some of the ideas that came up for this design to >>>>>>> >>>>>>>>>> get >>>>>>> >>>>>>>> some >>>>>>> >>>>>>>>>> early feedback. The below discussion assumes you have read the >>>>>>> >>>>>>>>>> above >>>>>>> >>>>>>>> wiki. >>>>>>> >>>>>>> >>>>>>>>>> *Offset Management* >>>>>>> >>>>>>> >>>>>>>>>> To make consumer clients ZK-free we need to move the offset >>>>>>> >>>>>>>>>> management utility from ZooKeeper into the Kafka servers, which >>>>>>> >>>>>>>>>> then store offsets >>>>>>> >>>>>>>> as >>>>>>> >>>>>>>>>> a special log. Key-based log compaction will be used to keep this >>>>>>> >>>>>>>>>> ever-growing log's size bounded. Brokers who are responsible for >>>>>>> >>>>>>>>>> the >>>>>>> >>>>>>>> offset >>>>>>> >>>>>>>>>> management for a given consumer group will be the leader of this >>>>>>> >>>>>>>>>> special offset log partition where the partition key will be >>>>>>> >>>>>>>>>> consumer group >>>>>>> >>>>>>>> names. >>>>>>> >>>>>>>>>> On the consumer side, instead of talking to ZK for commit and read >>>>>>> >>>>>>>> offsets, >>>>>>> >>>>>>>>>> it talks to the servers with new offset commit and offset fetch >>>>>>> >>>>>>>>>> request types for offset management. This work has been done and >>>>>>> >>>>>>>>>> will be >>>>>>> >>>>>>>> included >>>>>>> >>>>>>>>>> in the 0.8.2 release. Details of the implementation can be found in >>>>>>> >>>>>>>>>> KAFKA-1000 and this wiki: >>>>>>> >>>>>>> >>>>>>> >>>>>>>> < >>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off >>>>>>> >>>>>>>> set+Management >>>>>>> >>>>>>> >>>>>>>>>> *Group Membership Management* >>>>>>> >>>>>>> >>>>>>>>>> The next step is to move the membership management and rebalancing >>>>>>> >>>>>>>> utility >>>>>>> >>>>>>>>>> out of ZooKeeper as well. To do this we introduced a consumer >>>>>>> >>>>>>>> coordinator >>>>>>> >>>>>>>>>> hosted on the Kafka servers which is responsible for keeping track >>>>>>> >>>>>>>>>> of >>>>>>> >>>>>>>> group >>>>>>> >>>>>>>>>> membership and consumption partition changes, and the corresponding >>>>>>> >>>>>>>>>> rebalancing process. >>>>>>> >>>>>>> >>>>>>>>>> *1. Failure Detection* >>>>>>> >>>>>>> >>>>>>>>>> More specifically, we will use a heartbeating protocol for consumer >>>>>>> >>>>>>>>>> and coordinator failure detections. The heartbeating protocol would >>>>>>> >>>>>>>>>> be >>>>>>> >>>>>>>> similar >>>>>>> >>>>>>>>>> to the one ZK used, i.e. the consumer will set a session timeout >>>>>>> >>>>>>>>>> value >>>>>>> >>>>>>>> to >>>>>>> >>>>>>>>>> the coordinator on startup, and keep sending heartbeats to the >>>>>>> >>>>>>>> coordinator >>>>>>> >>>>>>>>>> afterwards every session-timeout / heartbeat-frequency. The >>>>>>> >>>>>>>>>> coordinator will treat a consumer as failed when it has not heard >>>>>>> >>>>>>>>>> from the consumer after session-timeout, and the consumer will >>>>>>> >>>>>>>>>> treat its coordinator as failed after it has not heard its >>>>>>> >>>>>>>>>> heartbeat responses after >>>>>>> >>>>>>>> session-timeout. >>>>>>> >>>>>>> >>>>>>>>>> One difference with ZK heartbeat protocol is that instead of fixing >>>>>>> >>>>>>>>>> the heartbeat-frequency as three, we make this value configurable >>>>>>> >>>>>>>>>> in >>>>>>> >>>>>>>> consumer >>>>>>> >>>>>>>>>> clients. This is because the rebalancing process (we will discuss >>>>>>> >>>>>>>>>> later) latency is lower bounded by the heartbeat frequency , so we >>>>>>> >>>>>>>>>> want this >>>>>>> >>>>>>>> value >>>>>>> >>>>>>>>>> to be large while not DDoSing the servers. >>>>>>> >>>>>>> >>>>>>>>>> *2. Consumer Rebalance* >>>>>>> >>>>>>> >>>>>>>>>> A bunch of events can trigger a rebalance process, 1) consumer >>>>>>> >>>>>>>>>> failure, >>>>>>> >>>>>>>> 2) >>>>>>> >>>>>>>>>> new consumer joining group, 3) existing consumer change consume >>>>>>> >>>>>>>>>> topic/partitions, 4) partition change for the consuming topics. >>>>>>> >>>>>>>>>> Once the coordinator decides to trigger a rebalance it will notify >>>>>>> >>>>>>>>>> the consumers within to resend their topic/partition subscription >>>>>>> >>>>>>>>>> information, and >>>>>>> >>>>>>>> then >>>>>>> >>>>>>>>>> assign existing partitions to these consumers. On the consumers >>>>>>> >>>>>>>>>> end, >>>>>>> >>>>>>>> they >>>>>>> >>>>>>>>>> no long run any rebalance logic in a distributed manner but follows >>>>>>> >>>>>>>>>> the partition assignment it receives from the coordinator. Since >>>>>>> >>>>>>>>>> the >>>>>>> >>>>>>>> consumers >>>>>>> >>>>>>>>>> can only be notified of a rebalance triggering via heartbeat >>>>>>> >>>>>>>>>> responses, >>>>>>> >>>>>>>> the >>>>>>> >>>>>>>>>> rebalance latency is lower bounded by the heartbeat frequency; that >>>>>>> >>>>>>>>>> is >>>>>>> >>>>>>>> why >>>>>>> >>>>>>>>>> we allow consumers to negotiate this value with the coordinator >>>>>>> >>>>>>>>>> upon registration, and we would like to hear people's thoughts >>>>>>> >>>>>>>>>> about this protocol. >>>>>>> >>>>>>> >>>>>>>>> One question on this topic, my planned usage pattern is to have a >>>>>>> >>>>>>>>> high >>>>>>> >>>>>>>> number of consumers that are ephemeral. Imagine a consumer coming >>>>>>> >>>>>>>> online, consuming messages for a short duration, then disappearing >>>>>>> >>>>>>>> forever. Is this use case supported? >>>>>>> >>>>>>> >>>>>>> >>>>>>>> If consumers are coming and going, there will be a high rate of >>>>>>> >>>>>>>> rebalances and that will really start throttling consumption at some >>>>>>> >>>>>>>> point. You may have a use that hits that detrimentally. A custom >>>>>>> >>>>>>>> rebalance algorithm may be able to rebalance a sub-set of the >>>>>>> >>>>>>>> consumers and not inform others of a need to rebalance them, which >>>>>>> >>>>>>>> would ameliorate the cost. Still the consumers that need to rebalance >>>>>>> would pay. >>>>>>> >>>>>>> >>>>>>>> If the initial JoinGroupRequest was preserved, would it be possible to >>>>>>> >>>>>>>> send rebalanced partition information in the HeartbeatResponse, up >>>>>>> >>>>>>>> front to all (or a sub-set) consumers, and avoid the rejoin round >>>>>>> >>>>>>>> trip? I mean, push the rebalance and not pull it with a secondary >>>>>>> >>>>>>>> JoinGroupRequest. This would reduce cost and lower the throttle >>>>>>> >>>>>>>> point, but poorly timed fetch requests may fail. If the consumer were >>>>>>> >>>>>>>> resilient to fetch failures, might it work? >>>>>>> >>>>>>> >>>>>>> >>>>>>>>>> In addition, the rebalance logic in the new consumer will be >>>>>>> >>>>>>>> customizable >>>>>>> >>>>>>>>>> instead of hard-written in a per-topic round-robin manner. Users >>>>>>> >>>>>>>>>> will be able to write their own assignment class following a common >>>>>>> >>>>>>>>>> interface, >>>>>>> >>>>>>>> and >>>>>>> >>>>>>>>>> consumers upon registering themselves will specify in their >>>>>>> >>>>>>>>>> registration request the assigner class name they wanted to use. If >>>>>>> >>>>>>>>>> consumers within >>>>>>> >>>>>>>> the >>>>>>> >>>>>>>>>> same group specify different algorithms the registration will be >>>>>>> >>>>>>>> rejected. >>>>>>> >>>>>>>>>> We are not sure if it is the best way of customizing rebalance >>>>>>> >>>>>>>>>> logic >>>>>>> >>>>>>>> yet, >>>>>>> >>>>>>>>>> so any feedbacks are more than welcome. >>>>>>> >>>>>>> >>>>>>>>>> For wildcard consumption, the consumers now will capture new topics >>>>>>> >>>>>>>>>> that are available for fetching through the topic metadata request. >>>>>>> >>>>>>>>>> That is, periodically the consumers will update its topic metadata >>>>>>> >>>>>>>>>> for fetching, >>>>>>> >>>>>>>> and >>>>>>> >>>>>>>>>> if new topics are returned in the metadata response matching its >>>>>>> >>>>>>>> wildcard >>>>>>> >>>>>>>>>> regex, it will notify the coordinator to let it trigger a new >>>>>>> >>>>>>>>>> rebalance >>>>>>> >>>>>>>> to >>>>>>> >>>>>>>>>> assign partitions for this new topic. >>>>>>> >>>>>>> >>>>>>>>>> There are some failure handling cases during the rebalancing >>>>>>> >>>>>>>>>> process discussed in the consumer rewrite design wiki. We would >>>>>>> >>>>>>>>>> encourage >>>>>>> >>>>>>>> people to >>>>>>> >>>>>>>>>> read it and let us know if there are any other corner cases not >>>>>>> covered. >>>>>>> >>>>>>> >>>>>>>>>> Moving forward, we are thinking about making the coordinator to >>>>>>> >>>>>>>>>> handle >>>>>>> >>>>>>>> both >>>>>>> >>>>>>>>>> the group management and offset management for the given groups, >>>> i.e. >>>>>>> >>>>>>>> the >>>>>>> >>>>>>>>>> leader of the offset log partition will naturally become the >>>>>>> >>>>>>>> coordinator of >>>>>>> >>>>>>>>>> the corresponding consumer. This allows the coordinator to reject >>>>>>> >>>>>>>>>> offset commit requests if its sender is not part of the group. To >>>>>>> >>>>>>>>>> do so a >>>>>>> >>>>>>>> consumer >>>>>>> >>>>>>>>>> id and generation id are applied to control the group member >>>>>>> >>>>>>>> generations. >>>>>>> >>>>>>>>>> Details of this design is included in the write design wiki page, >>>>>>> >>>>>>>>>> and >>>>>>> >>>>>>>> again >>>>>>> >>>>>>>>>> any feedbacks are appreciated. >>>>>>> >>>>>>> >>>>>>>>>> *3. Non-blocking Network IO* >>>>>>> >>>>>>> >>>>>>>>>> With a single-threaded consumer, we will instead use a non-blocking >>>>>>> >>>>>>>> network >>>>>>> >>>>>>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer >>>>>>> >>>>>>>>>> does >>>>>>> >>>>>>>> for >>>>>>> >>>>>>>>>> asynchronous sending data. In terms of implementation, we will let >>>>>>> >>>>>>>>>> the >>>>>>> >>>>>>>> new >>>>>>> >>>>>>>>>> consumer/producer clients be based on a common non-blocking IO >>>>>>> >>>>>>>>>> thread class. Details of this refactoring can be found in >>>> KAFKA-1316. >>>>>>> >>>>>>> >>>>>>>>>> *4. Consumer API* >>>>>>> >>>>>>> >>>>>>>>>> As a result of the single-threaded non-blocking network IO >>>>>>> >>>>>>>> implementation, >>>>>>> >>>>>>>>>> the new consumer API will no longer base on a blocking stream >>>>>>> >>>>>>>>>> iterator interface but a poll(timeout) interface. In addition, the >>>>>>> >>>>>>>>>> consumer APIs >>>>>>> >>>>>>>> are >>>>>>> >>>>>>>>>> designed to be flexible so that people can choose to either use the >>>>>>> >>>>>>>> default >>>>>>> >>>>>>>>>> offset and group membership management utilities mentioned above or >>>>>>> >>>>>>>>>> implement their own offset/group management logic. For example, for >>>>>>> >>>>>>>>>> scenarios that needs fixed partition assignment or that prefer to >>>>>>> >>>>>>>>>> store offsets locally in a data store, the new consumer APIs would >>>>>>> >>>>>>>>>> make it >>>>>>> >>>>>>>> much >>>>>>> >>>>>>>>>> easier to customize. Details of the API design can be found in >>>>>>> >>>>>>>> KAFKA-1328. >>>>>>> >>>>>>>>>> A question to think about is are there any other potential use >>>>>>> >>>>>>>>>> cases >>>>>>> >>>>>>>> that >>>>>>> >>>>>>>>>> can not be easily supported by this API. >>>>>>> >>>>>>> >>>>>>>>>> Cheers, >>>>>>> >>>>>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> -- Guozhang >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>> >>> >>> >>> -- >>> -- Guozhang