Thanks Becket. Quick comment - there seem to be a bunch of images that the wiki refers to, but none loaded for me. Just making sure if its just me or can everyone not see the pictures?
On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> wrote: > I agree with Ewen that a single threaded model will be tricky to implement > the same conventional semantic of async or Future. We just drafted the > following wiki which explains our thoughts in LinkedIn on the new consumer > API and threading model. > > > https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal > > We were trying to see: > 1. If we can use some kind of methodology to help us think about what API > we want to provide to user for different use cases. > 2. What is the pros and cons of current single threaded model. Is there a > way that we can maintain the benefits while solve the issues we are facing > now with single threaded model. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <e...@confluent.io > > wrote: > >> >> >> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >>> I think Ewen has proposed these APIs for using callbacks along with >>> returning future in the commit calls, i.e. something similar to: >>> >>> public Future<void> commit(ConsumerCommitCallback callback); >>> >>> public Future<void> commit(Map<TopicPartition, Long> offsets, >>> ConsumerCommitCallback callback); >>> >>> At that time I was slightly intending not to include the Future besides >>> adding the callback mainly because of the implementation complexity I feel >>> it could introduce along with the retry settings after looking through the >>> code base. I would happy to change my mind if we could propose a prototype >>> implementation that is simple enough. >>> >>> >> One of the reasons that interface ended up being difficult (or maybe >> impossible) to make work reasonably is because the consumer was thread-safe >> at the time. That made it impossible to know what should be done when >> Future.get() is called -- should the implementation call poll() itself, or >> would the fact that the user is calling get() imply that there's a >> background thread running the poll() loop and we just need to wait for it? >> >> The consumer is no longer thread safe, but I think the same problem >> remains because the expectation with Futures is that they are thread safe. >> Which means that even if the consumer isn't thread safe, I would expect to >> be able to hand that Future off to some other thread, have the second >> thread call get(), and then continue driving the poll loop in my thread >> (which in turn would eventually resolve the Future). >> >> I quite dislike the sync/async enum. While both operations commit >> offsets, their semantics are so different that overloading a single method >> with both is messy. That said, I don't think we should consider this an >> inconsistency wrt the new producer API's use of Future because the two APIs >> have a much more fundamental difference that justifies it: they have >> completely different threading and execution models. >> >> I think a Future-based API only makes sense if you can guarantee the >> operations that Futures are waiting on will continue to make progress >> regardless of what the thread using the Future does. The producer API makes >> that work by processing asynchronous requests in a background thread. The >> new consumer does not, and so it becomes difficult/impossible to implement >> the Future correctly. (Or, you have to make assumptions which break other >> use cases; if you want to support the simple use case of just making a >> commit() synchronous by calling get(), the Future has to call poll() >> internally; but if you do that, then if any user ever wants to add >> synchronization to the consumer via some external mechanism, then the >> implementation of the Future's get() method will not be subject to that >> synchronization and things will break). >> >> -Ewen >> >> >>> Guozhang >>> >>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io> >>> wrote: >>> >>>> Hey Adi, >>>> >>>> When we designed the initial version, the producer API was still >>>> changing. I thought about adding the Future and then just didn't get to it. >>>> I agree that we should look into adding it for consistency. >>>> >>>> Thanks, >>>> Neha >>>> >>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar < >>>> aaurad...@linkedin.com> wrote: >>>> >>>>> Great discussion everyone! >>>>> >>>>> One general comment on the sync/async API's on the new consumer. I >>>>> think the producer tackles sync vs async API's well. For API's that can >>>>> either be sync or async, can we simply return a future? That seems more >>>>> elegant for the API's that make sense either in both flavors. From the >>>>> users perspective, it is more consistent with the new producer. One easy >>>>> example is the commit call with the CommitType enum.. we can make that >>>>> call >>>>> always async and users can block on the future if they want to make sure >>>>> their offsets are committed. >>>>> >>>>> Aditya >>>>> >>>>> >>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <okara...@linkedin.com> >>>>> wrote: >>>>> >>>>>> Thanks for the great responses, everyone! >>>>>> >>>>>> To expand a tiny bit on my initial post: while I did bring up old >>>>>> high level consumers, the teams we spoke to were actually not the types >>>>>> of >>>>>> services that simply wanted an easy way to get ConsumerRecords. We spoke >>>>>> to >>>>>> infrastructure teams that I would consider to be closer to the >>>>>> "power-user" >>>>>> end of the spectrum and would want KafkaConsumer's level of granularity. >>>>>> Some would use auto group management. Some would use explicit group >>>>>> management. All of them would turn off auto offset commits. Yes, the >>>>>> Samza >>>>>> team had prior experience with the old SimpleConsumer, but this is the >>>>>> first kafka consumer being used by the Databus team. So I don't really >>>>>> think the feedback received was about the simpler times or wanting >>>>>> additional higher-level clients. >>>>>> >>>>>> - Onur >>>>>> >>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <ja...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> I think if we recommend a longer session timeout, then we should >>>>>>> expose the heartbeat frequency in configuration since this generally >>>>>>> controls how long normal rebalances will take. I think it's currently >>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be nice to >>>>>>> have an explicit LeaveGroup request to implement clean shutdown of a >>>>>>> consumer. Then the coordinator doesn't have to wait for the timeout to >>>>>>> reassign partitions. >>>>>>> >>>>>>> -Jason >>>>>>> >>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> wrote: >>>>>>> >>>>>>>> Hey Kartik, >>>>>>>> >>>>>>>> Totally agree we don't want people tuning timeouts in the common >>>>>>>> case. >>>>>>>> >>>>>>>> However there are two ways to avoid this: >>>>>>>> 1. Default the timeout high >>>>>>>> 2. Put the heartbeat in a separate thread >>>>>>>> >>>>>>>> When we were doing the consumer design we discussed this tradeoff >>>>>>>> and I think the conclusion we came to was that defaulting to a high >>>>>>>> timeout >>>>>>>> was actually better. This means it takes a little longer to detect a >>>>>>>> failure, but usually that is not a big problem and people who want >>>>>>>> faster >>>>>>>> failure detection can tune it down. This seemed better than having the >>>>>>>> failure detection not really cover the consumption and just be a >>>>>>>> background >>>>>>>> ping. The two reasons where (a) you still have the GC problem even for >>>>>>>> the >>>>>>>> background thread, (b) consumption is in some sense a better >>>>>>>> definition of >>>>>>>> an active healthy consumer and a lot of problems crop up when you have >>>>>>>> an >>>>>>>> inactive consumer with an active background thread (as today). >>>>>>>> >>>>>>>> When we had the discussion I think what we realized was that most >>>>>>>> people who were worried about the timeout where imagining a very low >>>>>>>> default (500ms) say. But in fact just setting this to 60 seconds or >>>>>>>> higher >>>>>>>> as a default would be okay, this adds to the failure detection time but >>>>>>>> only apps that care about this need to tune. This should largely >>>>>>>> eliminate >>>>>>>> false positives since after all if you disappear for 60 seconds that >>>>>>>> actually starts to be more of a true positive, even if you come >>>>>>>> back... :-) >>>>>>>> >>>>>>>> -Jay >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < >>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>> >>>>>>>>> adding the open source alias. This email started off as a broader >>>>>>>>> discussion around the new consumer. I was zooming into only the >>>>>>>>> aspect of >>>>>>>>> poll() being the only mechanism for driving the heartbeats. >>>>>>>>> >>>>>>>>> Yes the lag is the effect of the problem (not the problem). >>>>>>>>> Monitoring the lag is important as it is the primary way to tell if >>>>>>>>> the >>>>>>>>> application is wedged. There might be other metrics which can >>>>>>>>> possibly >>>>>>>>> capture the same essence. Yes the lag is at the consumer group level, >>>>>>>>> but >>>>>>>>> you can tell that one of the consumers is messed up if one of the >>>>>>>>> partitions in the application start generating lag and others are >>>>>>>>> good for >>>>>>>>> e.g. >>>>>>>>> >>>>>>>>> Monitoring aside, I think the main point of concern is that in the >>>>>>>>> old consumer most customers don't have to worry about unnecessary >>>>>>>>> rebalances and most of the things that they do in their app doesn't >>>>>>>>> have an >>>>>>>>> impact on the session timeout.. (i.e. the only thing that causes >>>>>>>>> rebalances is when the GC is out of whack). For the handful of >>>>>>>>> customers >>>>>>>>> who are impacted by GC related rebalances, i would imagine that all >>>>>>>>> of them >>>>>>>>> would really want us to make the system more resilient. I agree >>>>>>>>> that the >>>>>>>>> GC problem can't be solved easily in the java client, however it >>>>>>>>> appears >>>>>>>>> that now we would be expecting the consuming applications to be even >>>>>>>>> more >>>>>>>>> careful with ongoing tuning of the timeouts. At LinkedIn, we have >>>>>>>>> seen >>>>>>>>> that most kafka applications don't have much of a clue about >>>>>>>>> configuring >>>>>>>>> the timeouts and just end up calling the Kafka team when their >>>>>>>>> application >>>>>>>>> sees rebalances. >>>>>>>>> >>>>>>>>> The other side effect of poll driving the heartbeats is that we >>>>>>>>> have to make sure that people don't set a poll timeout that is larger >>>>>>>>> than >>>>>>>>> the session timeout. If we had a notion of implicit heartbeats then >>>>>>>>> we >>>>>>>>> could also automatically make this work for consumers by sending >>>>>>>>> hearbeats >>>>>>>>> at the appropriate interval even though the customers want to do a >>>>>>>>> long >>>>>>>>> poll. >>>>>>>>> >>>>>>>>> We could surely work around this in LinkedIn if either we have the >>>>>>>>> Pause() api or an explicit HeartBeat() api on the consumer. >>>>>>>>> >>>>>>>>> Would love to hear how other people think about this subject ? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Kartik >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Agree with the dilemma you are pointing out, which is that there >>>>>>>>>> are many ways the application's message processing could fail and we >>>>>>>>>> wouldn't be able to model all of those in the consumer's failure >>>>>>>>>> detection >>>>>>>>>> mechanism. So we should try to model as much of it as we can so the >>>>>>>>>> consumer's failure detection is meaningful. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Point being that the only absolute way to really detect that an >>>>>>>>>>> app is healthy is to monitor lag. If the lag increases then for sure >>>>>>>>>>> something is wrong. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The lag is merely the effect of the problem, not the problem >>>>>>>>>> itself. Lag is also a consumer group level concept and the problem >>>>>>>>>> we have >>>>>>>>>> is being able to detect failures at the level of individual consumer >>>>>>>>>> instances. >>>>>>>>>> >>>>>>>>>> As you pointed out, a consumer that poll() is a stronger >>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma then >>>>>>>>>> is who >>>>>>>>>> defines what a healthy poll() frequency is. No one else but the >>>>>>>>>> application >>>>>>>>>> owner can define what a "normal" processing latency is for their >>>>>>>>>> application. Now the question is what's the easiest way for the user >>>>>>>>>> to >>>>>>>>>> define this without having to tune and fine tune this too often. The >>>>>>>>>> heartbeat interval certainly does not have to be *exactly* >>>>>>>>>> 99tile of processing latency but could be in the ballpark + an error >>>>>>>>>> delta. >>>>>>>>>> The error delta is the application owner's acceptable risk threshold >>>>>>>>>> during >>>>>>>>>> which they would be ok if the application remains part of the group >>>>>>>>>> despite >>>>>>>>>> being dead. It is ultimately a tradeoff between operational ease and >>>>>>>>>> more >>>>>>>>>> accurate failure detection. >>>>>>>>>> >>>>>>>>>> With quotas the write latencies to kafka could range from a few >>>>>>>>>>> milliseconds all the way to a tens of seconds. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This is actually no different from the GC problem. Most most of >>>>>>>>>> the times, the normal GC falls in the few ms range and there are many >>>>>>>>>> applications even at LinkedIn for which the max GC falls in the >>>>>>>>>> multiple >>>>>>>>>> seconds range. Note that it also can't be predicted, so has to be an >>>>>>>>>> observed value. One way or the other, you have to observe what this >>>>>>>>>> acceptable "max" is for your application and then set the appropriate >>>>>>>>>> timeouts. >>>>>>>>>> >>>>>>>>>> Since this is not something that can be automated, this is a >>>>>>>>>> config that the application owner has to set based on the expected >>>>>>>>>> behavior >>>>>>>>>> of their application. Not wanting to do that leads to ending up with >>>>>>>>>> bad >>>>>>>>>> consumption semantics where the application process continues to be >>>>>>>>>> part of >>>>>>>>>> a group owning partitions but not consuming since it has halted due >>>>>>>>>> to a >>>>>>>>>> problem. The fact that the design requires them to express that in >>>>>>>>>> poll() >>>>>>>>>> frequency or not doesn't change the fact that the application owner >>>>>>>>>> has to >>>>>>>>>> go through the process of measuring and then defining this "max". >>>>>>>>>> >>>>>>>>>> The reverse where they don't do this and the application remains >>>>>>>>>> in the group despite being dead is super confusing and frustrating >>>>>>>>>> too. So >>>>>>>>>> the due diligence up front is actually worth. And as long as the >>>>>>>>>> poll() >>>>>>>>>> latency and processing latency can be monitored, it should be easy >>>>>>>>>> to tell >>>>>>>>>> the reason for a rebalance, whether that is valid or not and how that >>>>>>>>>> should be tuned. >>>>>>>>>> >>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source that >>>>>>>>>> will hide this complexity and I agree that LI is unblocked since you >>>>>>>>>> can do >>>>>>>>>> this in TrackerConsumer in the meantime. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Neha >>>>>>>>>> >>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>> >>>>>>>>>>> For commit(), I think it should hopefully be an easier >>>>>>>>>>> discussion, so maybe we can follow up when we meet up next. >>>>>>>>>>> >>>>>>>>>>> As far as the heartbeat is concerned, I think the points you >>>>>>>>>>> discuss are all very valid. >>>>>>>>>>> >>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. However >>>>>>>>>>> there are a smaller percentage of memory hungry apps that get hit >>>>>>>>>>> by it. >>>>>>>>>>> >>>>>>>>>>> The broader issue whereby even if the heartbeats are healthy, >>>>>>>>>>> the app might not be behaving correctly is also real. If the app is >>>>>>>>>>> calling poll() then the probability that the app is healthy is >>>>>>>>>>> surely >>>>>>>>>>> higher. But this again isn't an absolute measure that the app is >>>>>>>>>>> processing correctly. >>>>>>>>>>> In other cases the app might have even died in which case this >>>>>>>>>>> discussion is moot. >>>>>>>>>>> >>>>>>>>>>> Point being that the only absolute way to really detect that an >>>>>>>>>>> app is healthy is to monitor lag. If the lag increases then for sure >>>>>>>>>>> something is wrong. >>>>>>>>>>> >>>>>>>>>>> The proposal seems to be that the application needs to tune >>>>>>>>>>> their session timeout based on the 99tile of the time they take to >>>>>>>>>>> process >>>>>>>>>>> events after every poll. This turns out is a nontrivial thing to >>>>>>>>>>> do for >>>>>>>>>>> an application todo. To start with when an application is new their >>>>>>>>>>> data is >>>>>>>>>>> going to be based on tests that they have done on synthetic data. >>>>>>>>>>> This >>>>>>>>>>> often times doesn't represent what they will see in production. >>>>>>>>>>> Once the >>>>>>>>>>> app is in production their processing latencies will potentially >>>>>>>>>>> vary over >>>>>>>>>>> time. It is extremely unlikely that the application owner does a >>>>>>>>>>> careful >>>>>>>>>>> job of monitoring the 99tile of latencies over time and readjust the >>>>>>>>>>> settings. Often times the latencies vary because of variance is >>>>>>>>>>> other >>>>>>>>>>> services that are called by the consumer as part of processing the >>>>>>>>>>> events. >>>>>>>>>>> >>>>>>>>>>> Case in point would be a simple app which reads events and >>>>>>>>>>> writes to Kafka. With quotas the write latencies to kafka could >>>>>>>>>>> range from >>>>>>>>>>> a few milliseconds all the way to a tens of seconds. As the scale >>>>>>>>>>> of >>>>>>>>>>> processing for an app increasing the app or that 'user' could now >>>>>>>>>>> get >>>>>>>>>>> quotaed. Instead of slowing down gracefully unless the application >>>>>>>>>>> owner >>>>>>>>>>> has carefully tuned the timeout, now we are looking at a potential >>>>>>>>>>> outage >>>>>>>>>>> where the app could get hit by constant rebalances. >>>>>>>>>>> >>>>>>>>>>> If we expose the pause() Api then It is possible for us to take >>>>>>>>>>> care of this in the linkedin wrapper. Whereby we would keep >>>>>>>>>>> calling poll >>>>>>>>>>> on a separate thread periodically and enqueue the messages. When >>>>>>>>>>> the queue >>>>>>>>>>> is full we would call pause(). >>>>>>>>>>> >>>>>>>>>>> In essence we can work around it in LinkedIn, however I think it >>>>>>>>>>> is vastly better if we address this in the Api as every major >>>>>>>>>>> customer will >>>>>>>>>>> eventually be pained by it. >>>>>>>>>>> >>>>>>>>>>> Kartik >>>>>>>>>>> >>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hey guys, >>>>>>>>>>> >>>>>>>>>>> Happy to discuss. I agree there may be some rough edges and now >>>>>>>>>>> is definitely the time to clean them up. >>>>>>>>>>> >>>>>>>>>>> I'm pretty reluctant to change the threading model or undergo a >>>>>>>>>>> big api redesign at this point beyond the group management stuff >>>>>>>>>>> we've >>>>>>>>>>> discussed in the context of Samza/copycat which is already a big >>>>>>>>>>> effort. >>>>>>>>>>> >>>>>>>>>>> Overall I agree that we have done a poor job of documenting >>>>>>>>>>> which apis block and which don't and when people are surprised >>>>>>>>>>> because we >>>>>>>>>>> haven't labeled something that will be unintuitive. But the overall >>>>>>>>>>> style >>>>>>>>>>> of poll/select-based apis is quite common in programming going back >>>>>>>>>>> to unix >>>>>>>>>>> select so I don't think it's beyond people if explained well (after >>>>>>>>>>> all we >>>>>>>>>>> need to mix sync and async apis and if we don't say which is which >>>>>>>>>>> any >>>>>>>>>>> scheme will be confusing). >>>>>>>>>>> >>>>>>>>>>> For what it's worth the experience with this api has actually >>>>>>>>>>> been about 1000x better than the issues people had around >>>>>>>>>>> intuitiveness >>>>>>>>>>> with the high-level api. The crazy blocking iterator, impossible >>>>>>>>>>> internal >>>>>>>>>>> queue sizing, baroque threading model, etc have all caused endless >>>>>>>>>>> amounts >>>>>>>>>>> of anger. Not to mention that that client effectively disqualifies >>>>>>>>>>> about >>>>>>>>>>> 50% of the use cases people want to try to use it for (plus I >>>>>>>>>>> regularly >>>>>>>>>>> hear people tell me they've heard not to use it at all for various >>>>>>>>>>> reasons >>>>>>>>>>> ranging from data loss to lack of features). It's important to have >>>>>>>>>>> that >>>>>>>>>>> context when people need to switch and they say "oh the old way was >>>>>>>>>>> so >>>>>>>>>>> simple and the new way complex!" :-) >>>>>>>>>>> >>>>>>>>>>> Let me give some context related to your points, based on our >>>>>>>>>>> previous discussions: >>>>>>>>>>> >>>>>>>>>>> For commit, let's discuss, that is easy either way. >>>>>>>>>>> >>>>>>>>>>> The motivation for avoiding additional threading was two-fold. >>>>>>>>>>> First this client is really intended to be the lowest level client. >>>>>>>>>>> There >>>>>>>>>>> are many, many possible higher level processing abstractions. One >>>>>>>>>>> thing we >>>>>>>>>>> found to be a big problem with the high-level client was that it >>>>>>>>>>> coupled >>>>>>>>>>> things everyone must have--failover, etc--with things that are >>>>>>>>>>> different in >>>>>>>>>>> each use case like the appropriate threading model. If you do this >>>>>>>>>>> you need >>>>>>>>>>> to also maintain a thread free low-level consumer api for people to >>>>>>>>>>> get >>>>>>>>>>> around whatever you have done. >>>>>>>>>>> >>>>>>>>>>> The second reason was that the internal threading in the client >>>>>>>>>>> became quite complex. The answer with threading is always that "it >>>>>>>>>>> won't be >>>>>>>>>>> complex this time", but it always is. >>>>>>>>>>> >>>>>>>>>>> For the heartbeat you correctly describe the downside to >>>>>>>>>>> coupling heartbeat with poll--the contract is that the application >>>>>>>>>>> must >>>>>>>>>>> regularly consume to be considered an active consumer. This allows >>>>>>>>>>> the >>>>>>>>>>> possibility of false positive failure detections. However it's >>>>>>>>>>> important to >>>>>>>>>>> understand the downside of the alternative. If you do background >>>>>>>>>>> polling a >>>>>>>>>>> consumer is considered active as long as it isn't shutdown. This >>>>>>>>>>> leads to >>>>>>>>>>> all kinds of active consumers that aren't consuming because they >>>>>>>>>>> have >>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions and >>>>>>>>>>> heart-beating. This failure mode is actually far far worse. If you >>>>>>>>>>> allow >>>>>>>>>>> false positives the user sees the frequent rebalances and knows >>>>>>>>>>> they aren't >>>>>>>>>>> consuming frequently enough to be considered active but if you >>>>>>>>>>> allows false >>>>>>>>>>> negatives you end up having weeks go by before someone notices that >>>>>>>>>>> a >>>>>>>>>>> partition has been unconsumed the whole time at which point the >>>>>>>>>>> data is >>>>>>>>>>> gone. Plus of course even if you do this you still have regular >>>>>>>>>>> false >>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this in some >>>>>>>>>>> depth >>>>>>>>>>> at the time and decided that it is better to have the liveness >>>>>>>>>>> notion tied >>>>>>>>>>> to *actual* consumption which is the actual definition of >>>>>>>>>>> liveness. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> >>>>>>>>>>> -Jay >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman < >>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Confluent Team. >>>>>>>>>>>> >>>>>>>>>>>> There has recently been a lot of open source activity regarding >>>>>>>>>>>> the new KafkaConsumer: >>>>>>>>>>>> >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>>>>>>>>> >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>>>>>>>>> >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>>>>>>>>> >>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza, >>>>>>>>>>>> and some other teams and we got similar feedback. >>>>>>>>>>>> >>>>>>>>>>>> To summarize the feedback we received from other teams: >>>>>>>>>>>> >>>>>>>>>>>> 1. >>>>>>>>>>>> >>>>>>>>>>>> The current behavior is not intuitive. For example, >>>>>>>>>>>> KafkaConsumer.poll drives everything. The other methods like >>>>>>>>>>>> subscribe, >>>>>>>>>>>> unsubscribe, seek, commit(async) don’t do anything without a >>>>>>>>>>>> KafkaConsumer.poll call. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 1. >>>>>>>>>>>> >>>>>>>>>>>> The semantics of a commit() call should be consistent >>>>>>>>>>>> between sync and async operations. Currently, sync commit is a >>>>>>>>>>>> blocking >>>>>>>>>>>> call which actually sends out an OffsetCommitRequest and waits >>>>>>>>>>>> for the >>>>>>>>>>>> response upon the user’s KafkaConsumer.commit call. However, >>>>>>>>>>>> the async >>>>>>>>>>>> commit is a nonblocking call which just queues up the >>>>>>>>>>>> OffsetCommitRequest. >>>>>>>>>>>> The request itself is later sent out in the next poll. The >>>>>>>>>>>> teams we talked >>>>>>>>>>>> to found this misleading. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 1. >>>>>>>>>>>> >>>>>>>>>>>> Heartbeats are dependent on user application behavior (i.e. >>>>>>>>>>>> user applications calling poll). This can be a big problem as >>>>>>>>>>>> we don’t >>>>>>>>>>>> control how different applications behave. For example, we >>>>>>>>>>>> might have an >>>>>>>>>>>> application which reads from Kafka and writes to Espresso. If >>>>>>>>>>>> Espresso is >>>>>>>>>>>> slow for whatever reason, then in rebalances could happen. >>>>>>>>>>>> >>>>>>>>>>>> Generally speaking, we feel that the current KafkaConsumer API >>>>>>>>>>>> design is more of a wrapping around the old simple consumer, i.e. >>>>>>>>>>>> in old >>>>>>>>>>>> consumer we ask users to deal with raw protocols and error >>>>>>>>>>>> handlings while >>>>>>>>>>>> in KafkaConsumer we do that for users. However, for old high level >>>>>>>>>>>> consumer >>>>>>>>>>>> users (which are the majority of users), the experience is a >>>>>>>>>>>> noticeable >>>>>>>>>>>> regression. The old high level consumer interface is simple and >>>>>>>>>>>> easy to use >>>>>>>>>>>> for end user, while KafkaConsumer requires users to be aware of >>>>>>>>>>>> many >>>>>>>>>>>> underlying details and is becoming prohibitive for users to adopt. >>>>>>>>>>>> This is >>>>>>>>>>>> hinted by the javadoc growing bigger and bigger. >>>>>>>>>>>> >>>>>>>>>>>> We think it's getting to the point where we should take a step >>>>>>>>>>>> back and look at the big picture. >>>>>>>>>>>> >>>>>>>>>>>> The current state of KafkaConsumer is that it's >>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called by the >>>>>>>>>>>> user >>>>>>>>>>>> which pretty much drives everything: >>>>>>>>>>>> >>>>>>>>>>>> - data fetches >>>>>>>>>>>> >>>>>>>>>>>> - heartbeats >>>>>>>>>>>> >>>>>>>>>>>> - join groups (new consumer joining a group, topic subscription >>>>>>>>>>>> changes, reacting to group rebalance) >>>>>>>>>>>> >>>>>>>>>>>> - async offset commits >>>>>>>>>>>> >>>>>>>>>>>> - executing callbacks >>>>>>>>>>>> >>>>>>>>>>>> Given that the selector's poll is being driven by the end user, >>>>>>>>>>>> this ends up making us educate users on NIO and the consequences >>>>>>>>>>>> of not >>>>>>>>>>>> calling KafkaConsumer.poll frequently enough: >>>>>>>>>>>> >>>>>>>>>>>> - Coordinator will mark the consumer dead >>>>>>>>>>>> >>>>>>>>>>>> - async commits won't send >>>>>>>>>>>> >>>>>>>>>>>> - callbacks won't fire >>>>>>>>>>>> >>>>>>>>>>>> More generally speaking, there are many surprises with the >>>>>>>>>>>> current KafkaConsumer implementation. >>>>>>>>>>>> >>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer: >>>>>>>>>>>> >>>>>>>>>>>> - NIO >>>>>>>>>>>> >>>>>>>>>>>> - ability to commit, manipulate offsets, and consume messages >>>>>>>>>>>> >>>>>>>>>>>> - a way to subscribe to topics(auto group management) or >>>>>>>>>>>> partitions(explicit group management) >>>>>>>>>>>> >>>>>>>>>>>> - no surprises in the user experience >>>>>>>>>>>> >>>>>>>>>>>> The last point is the big one that we think we aren't hitting. >>>>>>>>>>>> We think the most important example is that there should be no >>>>>>>>>>>> requirement >>>>>>>>>>>> from the end user to consistently KafkaConsumer.poll in order for >>>>>>>>>>>> all of >>>>>>>>>>>> the above tasks to happen. We think it would be better to split >>>>>>>>>>>> those tasks >>>>>>>>>>>> into tasks that should not rely on KafkaConsumer.poll and tasks >>>>>>>>>>>> that should >>>>>>>>>>>> rely on KafkaConsumer.poll. >>>>>>>>>>>> >>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>>>>>>>>> >>>>>>>>>>>> - heartbeats >>>>>>>>>>>> >>>>>>>>>>>> - join groups >>>>>>>>>>>> >>>>>>>>>>>> - commits >>>>>>>>>>>> >>>>>>>>>>>> - executing callbacks >>>>>>>>>>>> >>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll >>>>>>>>>>>> >>>>>>>>>>>> This would help reduce the amount of surprises to the end user. >>>>>>>>>>>> >>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to you guys >>>>>>>>>>>> early next week. We’d like to meet up with you at LinkedIn on *July >>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it to open >>>>>>>>>>>> source. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> LinkedIn Kafka Dev Team >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Thanks, >>>>>>>>>> Neha >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Thanks, >>>> Neha >>>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> >> >> -- >> Thanks, >> Ewen >> > > -- Thanks, Neha