The images load for me well. On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> wrote:
> Thanks Becket. Quick comment - there seem to be a bunch of images that the > wiki refers to, but none loaded for me. Just making sure if its just me or > can everyone not see the pictures? > > On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> wrote: > >> I agree with Ewen that a single threaded model will be tricky to >> implement the same conventional semantic of async or Future. We just >> drafted the following wiki which explains our thoughts in LinkedIn on the >> new consumer API and threading model. >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal >> >> We were trying to see: >> 1. If we can use some kind of methodology to help us think about what API >> we want to provide to user for different use cases. >> 2. What is the pros and cons of current single threaded model. Is there a >> way that we can maintain the benefits while solve the issues we are facing >> now with single threaded model. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava < >> e...@confluent.io> wrote: >> >>> >>> >>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>>> I think Ewen has proposed these APIs for using callbacks along with >>>> returning future in the commit calls, i.e. something similar to: >>>> >>>> public Future<void> commit(ConsumerCommitCallback callback); >>>> >>>> public Future<void> commit(Map<TopicPartition, Long> offsets, >>>> ConsumerCommitCallback callback); >>>> >>>> At that time I was slightly intending not to include the Future besides >>>> adding the callback mainly because of the implementation complexity I feel >>>> it could introduce along with the retry settings after looking through the >>>> code base. I would happy to change my mind if we could propose a prototype >>>> implementation that is simple enough. >>>> >>>> >>> One of the reasons that interface ended up being difficult (or maybe >>> impossible) to make work reasonably is because the consumer was thread-safe >>> at the time. That made it impossible to know what should be done when >>> Future.get() is called -- should the implementation call poll() itself, or >>> would the fact that the user is calling get() imply that there's a >>> background thread running the poll() loop and we just need to wait for it? >>> >>> The consumer is no longer thread safe, but I think the same problem >>> remains because the expectation with Futures is that they are thread safe. >>> Which means that even if the consumer isn't thread safe, I would expect to >>> be able to hand that Future off to some other thread, have the second >>> thread call get(), and then continue driving the poll loop in my thread >>> (which in turn would eventually resolve the Future). >>> >>> I quite dislike the sync/async enum. While both operations commit >>> offsets, their semantics are so different that overloading a single method >>> with both is messy. That said, I don't think we should consider this an >>> inconsistency wrt the new producer API's use of Future because the two APIs >>> have a much more fundamental difference that justifies it: they have >>> completely different threading and execution models. >>> >>> I think a Future-based API only makes sense if you can guarantee the >>> operations that Futures are waiting on will continue to make progress >>> regardless of what the thread using the Future does. The producer API makes >>> that work by processing asynchronous requests in a background thread. The >>> new consumer does not, and so it becomes difficult/impossible to implement >>> the Future correctly. (Or, you have to make assumptions which break other >>> use cases; if you want to support the simple use case of just making a >>> commit() synchronous by calling get(), the Future has to call poll() >>> internally; but if you do that, then if any user ever wants to add >>> synchronization to the consumer via some external mechanism, then the >>> implementation of the Future's get() method will not be subject to that >>> synchronization and things will break). >>> >>> -Ewen >>> >>> >>>> Guozhang >>>> >>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io> >>>> wrote: >>>> >>>>> Hey Adi, >>>>> >>>>> When we designed the initial version, the producer API was still >>>>> changing. I thought about adding the Future and then just didn't get to >>>>> it. >>>>> I agree that we should look into adding it for consistency. >>>>> >>>>> Thanks, >>>>> Neha >>>>> >>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar < >>>>> aaurad...@linkedin.com> wrote: >>>>> >>>>>> Great discussion everyone! >>>>>> >>>>>> One general comment on the sync/async API's on the new consumer. I >>>>>> think the producer tackles sync vs async API's well. For API's that can >>>>>> either be sync or async, can we simply return a future? That seems more >>>>>> elegant for the API's that make sense either in both flavors. From the >>>>>> users perspective, it is more consistent with the new producer. One easy >>>>>> example is the commit call with the CommitType enum.. we can make that >>>>>> call >>>>>> always async and users can block on the future if they want to make sure >>>>>> their offsets are committed. >>>>>> >>>>>> Aditya >>>>>> >>>>>> >>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <okara...@linkedin.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the great responses, everyone! >>>>>>> >>>>>>> To expand a tiny bit on my initial post: while I did bring up old >>>>>>> high level consumers, the teams we spoke to were actually not the types >>>>>>> of >>>>>>> services that simply wanted an easy way to get ConsumerRecords. We >>>>>>> spoke to >>>>>>> infrastructure teams that I would consider to be closer to the >>>>>>> "power-user" >>>>>>> end of the spectrum and would want KafkaConsumer's level of granularity. >>>>>>> Some would use auto group management. Some would use explicit group >>>>>>> management. All of them would turn off auto offset commits. Yes, the >>>>>>> Samza >>>>>>> team had prior experience with the old SimpleConsumer, but this is the >>>>>>> first kafka consumer being used by the Databus team. So I don't really >>>>>>> think the feedback received was about the simpler times or wanting >>>>>>> additional higher-level clients. >>>>>>> >>>>>>> - Onur >>>>>>> >>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <ja...@confluent.io >>>>>>> > wrote: >>>>>>> >>>>>>>> I think if we recommend a longer session timeout, then we should >>>>>>>> expose the heartbeat frequency in configuration since this generally >>>>>>>> controls how long normal rebalances will take. I think it's currently >>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be nice >>>>>>>> to >>>>>>>> have an explicit LeaveGroup request to implement clean shutdown of a >>>>>>>> consumer. Then the coordinator doesn't have to wait for the timeout to >>>>>>>> reassign partitions. >>>>>>>> >>>>>>>> -Jason >>>>>>>> >>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Kartik, >>>>>>>>> >>>>>>>>> Totally agree we don't want people tuning timeouts in the common >>>>>>>>> case. >>>>>>>>> >>>>>>>>> However there are two ways to avoid this: >>>>>>>>> 1. Default the timeout high >>>>>>>>> 2. Put the heartbeat in a separate thread >>>>>>>>> >>>>>>>>> When we were doing the consumer design we discussed this tradeoff >>>>>>>>> and I think the conclusion we came to was that defaulting to a high >>>>>>>>> timeout >>>>>>>>> was actually better. This means it takes a little longer to detect a >>>>>>>>> failure, but usually that is not a big problem and people who want >>>>>>>>> faster >>>>>>>>> failure detection can tune it down. This seemed better than having the >>>>>>>>> failure detection not really cover the consumption and just be a >>>>>>>>> background >>>>>>>>> ping. The two reasons where (a) you still have the GC problem even >>>>>>>>> for the >>>>>>>>> background thread, (b) consumption is in some sense a better >>>>>>>>> definition of >>>>>>>>> an active healthy consumer and a lot of problems crop up when you >>>>>>>>> have an >>>>>>>>> inactive consumer with an active background thread (as today). >>>>>>>>> >>>>>>>>> When we had the discussion I think what we realized was that most >>>>>>>>> people who were worried about the timeout where imagining a very low >>>>>>>>> default (500ms) say. But in fact just setting this to 60 seconds or >>>>>>>>> higher >>>>>>>>> as a default would be okay, this adds to the failure detection time >>>>>>>>> but >>>>>>>>> only apps that care about this need to tune. This should largely >>>>>>>>> eliminate >>>>>>>>> false positives since after all if you disappear for 60 seconds that >>>>>>>>> actually starts to be more of a true positive, even if you come >>>>>>>>> back... :-) >>>>>>>>> >>>>>>>>> -Jay >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < >>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>> >>>>>>>>>> adding the open source alias. This email started off as a >>>>>>>>>> broader discussion around the new consumer. I was zooming into only >>>>>>>>>> the >>>>>>>>>> aspect of poll() being the only mechanism for driving the heartbeats. >>>>>>>>>> >>>>>>>>>> Yes the lag is the effect of the problem (not the problem). >>>>>>>>>> Monitoring the lag is important as it is the primary way to tell if >>>>>>>>>> the >>>>>>>>>> application is wedged. There might be other metrics which can >>>>>>>>>> possibly >>>>>>>>>> capture the same essence. Yes the lag is at the consumer group >>>>>>>>>> level, but >>>>>>>>>> you can tell that one of the consumers is messed up if one of the >>>>>>>>>> partitions in the application start generating lag and others are >>>>>>>>>> good for >>>>>>>>>> e.g. >>>>>>>>>> >>>>>>>>>> Monitoring aside, I think the main point of concern is that in >>>>>>>>>> the old consumer most customers don't have to worry about unnecessary >>>>>>>>>> rebalances and most of the things that they do in their app doesn't >>>>>>>>>> have an >>>>>>>>>> impact on the session timeout.. (i.e. the only thing that causes >>>>>>>>>> rebalances is when the GC is out of whack). For the handful of >>>>>>>>>> customers >>>>>>>>>> who are impacted by GC related rebalances, i would imagine that all >>>>>>>>>> of them >>>>>>>>>> would really want us to make the system more resilient. I agree >>>>>>>>>> that the >>>>>>>>>> GC problem can't be solved easily in the java client, however it >>>>>>>>>> appears >>>>>>>>>> that now we would be expecting the consuming applications to be even >>>>>>>>>> more >>>>>>>>>> careful with ongoing tuning of the timeouts. At LinkedIn, we have >>>>>>>>>> seen >>>>>>>>>> that most kafka applications don't have much of a clue about >>>>>>>>>> configuring >>>>>>>>>> the timeouts and just end up calling the Kafka team when their >>>>>>>>>> application >>>>>>>>>> sees rebalances. >>>>>>>>>> >>>>>>>>>> The other side effect of poll driving the heartbeats is that we >>>>>>>>>> have to make sure that people don't set a poll timeout that is >>>>>>>>>> larger than >>>>>>>>>> the session timeout. If we had a notion of implicit heartbeats >>>>>>>>>> then we >>>>>>>>>> could also automatically make this work for consumers by sending >>>>>>>>>> hearbeats >>>>>>>>>> at the appropriate interval even though the customers want to do a >>>>>>>>>> long >>>>>>>>>> poll. >>>>>>>>>> >>>>>>>>>> We could surely work around this in LinkedIn if either we have >>>>>>>>>> the Pause() api or an explicit HeartBeat() api on the consumer. >>>>>>>>>> >>>>>>>>>> Would love to hear how other people think about this subject ? >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Kartik >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Agree with the dilemma you are pointing out, which is that there >>>>>>>>>>> are many ways the application's message processing could fail and we >>>>>>>>>>> wouldn't be able to model all of those in the consumer's failure >>>>>>>>>>> detection >>>>>>>>>>> mechanism. So we should try to model as much of it as we can so the >>>>>>>>>>> consumer's failure detection is meaningful. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Point being that the only absolute way to really detect that an >>>>>>>>>>>> app is healthy is to monitor lag. If the lag increases then for >>>>>>>>>>>> sure >>>>>>>>>>>> something is wrong. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The lag is merely the effect of the problem, not the problem >>>>>>>>>>> itself. Lag is also a consumer group level concept and the problem >>>>>>>>>>> we have >>>>>>>>>>> is being able to detect failures at the level of individual consumer >>>>>>>>>>> instances. >>>>>>>>>>> >>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger >>>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma then >>>>>>>>>>> is who >>>>>>>>>>> defines what a healthy poll() frequency is. No one else but the >>>>>>>>>>> application >>>>>>>>>>> owner can define what a "normal" processing latency is for their >>>>>>>>>>> application. Now the question is what's the easiest way for the >>>>>>>>>>> user to >>>>>>>>>>> define this without having to tune and fine tune this too often. The >>>>>>>>>>> heartbeat interval certainly does not have to be *exactly* >>>>>>>>>>> 99tile of processing latency but could be in the ballpark + an >>>>>>>>>>> error delta. >>>>>>>>>>> The error delta is the application owner's acceptable risk >>>>>>>>>>> threshold during >>>>>>>>>>> which they would be ok if the application remains part of the group >>>>>>>>>>> despite >>>>>>>>>>> being dead. It is ultimately a tradeoff between operational ease >>>>>>>>>>> and more >>>>>>>>>>> accurate failure detection. >>>>>>>>>>> >>>>>>>>>>> With quotas the write latencies to kafka could range from a few >>>>>>>>>>>> milliseconds all the way to a tens of seconds. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is actually no different from the GC problem. Most most of >>>>>>>>>>> the times, the normal GC falls in the few ms range and there are >>>>>>>>>>> many >>>>>>>>>>> applications even at LinkedIn for which the max GC falls in the >>>>>>>>>>> multiple >>>>>>>>>>> seconds range. Note that it also can't be predicted, so has to be an >>>>>>>>>>> observed value. One way or the other, you have to observe what this >>>>>>>>>>> acceptable "max" is for your application and then set the >>>>>>>>>>> appropriate >>>>>>>>>>> timeouts. >>>>>>>>>>> >>>>>>>>>>> Since this is not something that can be automated, this is a >>>>>>>>>>> config that the application owner has to set based on the expected >>>>>>>>>>> behavior >>>>>>>>>>> of their application. Not wanting to do that leads to ending up >>>>>>>>>>> with bad >>>>>>>>>>> consumption semantics where the application process continues to be >>>>>>>>>>> part of >>>>>>>>>>> a group owning partitions but not consuming since it has halted due >>>>>>>>>>> to a >>>>>>>>>>> problem. The fact that the design requires them to express that in >>>>>>>>>>> poll() >>>>>>>>>>> frequency or not doesn't change the fact that the application owner >>>>>>>>>>> has to >>>>>>>>>>> go through the process of measuring and then defining this "max". >>>>>>>>>>> >>>>>>>>>>> The reverse where they don't do this and the application remains >>>>>>>>>>> in the group despite being dead is super confusing and frustrating >>>>>>>>>>> too. So >>>>>>>>>>> the due diligence up front is actually worth. And as long as the >>>>>>>>>>> poll() >>>>>>>>>>> latency and processing latency can be monitored, it should be easy >>>>>>>>>>> to tell >>>>>>>>>>> the reason for a rebalance, whether that is valid or not and how >>>>>>>>>>> that >>>>>>>>>>> should be tuned. >>>>>>>>>>> >>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source that >>>>>>>>>>> will hide this complexity and I agree that LI is unblocked since >>>>>>>>>>> you can do >>>>>>>>>>> this in TrackerConsumer in the meantime. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Neha >>>>>>>>>>> >>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> For commit(), I think it should hopefully be an easier >>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next. >>>>>>>>>>>> >>>>>>>>>>>> As far as the heartbeat is concerned, I think the points you >>>>>>>>>>>> discuss are all very valid. >>>>>>>>>>>> >>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. However >>>>>>>>>>>> there are a smaller percentage of memory hungry apps that get hit >>>>>>>>>>>> by it. >>>>>>>>>>>> >>>>>>>>>>>> The broader issue whereby even if the heartbeats are healthy, >>>>>>>>>>>> the app might not be behaving correctly is also real. If the app >>>>>>>>>>>> is >>>>>>>>>>>> calling poll() then the probability that the app is healthy is >>>>>>>>>>>> surely >>>>>>>>>>>> higher. But this again isn't an absolute measure that the app is >>>>>>>>>>>> processing correctly. >>>>>>>>>>>> In other cases the app might have even died in which case this >>>>>>>>>>>> discussion is moot. >>>>>>>>>>>> >>>>>>>>>>>> Point being that the only absolute way to really detect that an >>>>>>>>>>>> app is healthy is to monitor lag. If the lag increases then for >>>>>>>>>>>> sure >>>>>>>>>>>> something is wrong. >>>>>>>>>>>> >>>>>>>>>>>> The proposal seems to be that the application needs to tune >>>>>>>>>>>> their session timeout based on the 99tile of the time they take to >>>>>>>>>>>> process >>>>>>>>>>>> events after every poll. This turns out is a nontrivial thing to >>>>>>>>>>>> do for >>>>>>>>>>>> an application todo. To start with when an application is new >>>>>>>>>>>> their data is >>>>>>>>>>>> going to be based on tests that they have done on synthetic data. >>>>>>>>>>>> This >>>>>>>>>>>> often times doesn't represent what they will see in production. >>>>>>>>>>>> Once the >>>>>>>>>>>> app is in production their processing latencies will potentially >>>>>>>>>>>> vary over >>>>>>>>>>>> time. It is extremely unlikely that the application owner does a >>>>>>>>>>>> careful >>>>>>>>>>>> job of monitoring the 99tile of latencies over time and readjust >>>>>>>>>>>> the >>>>>>>>>>>> settings. Often times the latencies vary because of variance is >>>>>>>>>>>> other >>>>>>>>>>>> services that are called by the consumer as part of processing the >>>>>>>>>>>> events. >>>>>>>>>>>> >>>>>>>>>>>> Case in point would be a simple app which reads events and >>>>>>>>>>>> writes to Kafka. With quotas the write latencies to kafka could >>>>>>>>>>>> range from >>>>>>>>>>>> a few milliseconds all the way to a tens of seconds. As the scale >>>>>>>>>>>> of >>>>>>>>>>>> processing for an app increasing the app or that 'user' could now >>>>>>>>>>>> get >>>>>>>>>>>> quotaed. Instead of slowing down gracefully unless the >>>>>>>>>>>> application owner >>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a potential >>>>>>>>>>>> outage >>>>>>>>>>>> where the app could get hit by constant rebalances. >>>>>>>>>>>> >>>>>>>>>>>> If we expose the pause() Api then It is possible for us to take >>>>>>>>>>>> care of this in the linkedin wrapper. Whereby we would keep >>>>>>>>>>>> calling poll >>>>>>>>>>>> on a separate thread periodically and enqueue the messages. When >>>>>>>>>>>> the queue >>>>>>>>>>>> is full we would call pause(). >>>>>>>>>>>> >>>>>>>>>>>> In essence we can work around it in LinkedIn, however I think >>>>>>>>>>>> it is vastly better if we address this in the Api as every major >>>>>>>>>>>> customer >>>>>>>>>>>> will eventually be pained by it. >>>>>>>>>>>> >>>>>>>>>>>> Kartik >>>>>>>>>>>> >>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hey guys, >>>>>>>>>>>> >>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges and now >>>>>>>>>>>> is definitely the time to clean them up. >>>>>>>>>>>> >>>>>>>>>>>> I'm pretty reluctant to change the threading model or undergo a >>>>>>>>>>>> big api redesign at this point beyond the group management stuff >>>>>>>>>>>> we've >>>>>>>>>>>> discussed in the context of Samza/copycat which is already a big >>>>>>>>>>>> effort. >>>>>>>>>>>> >>>>>>>>>>>> Overall I agree that we have done a poor job of documenting >>>>>>>>>>>> which apis block and which don't and when people are surprised >>>>>>>>>>>> because we >>>>>>>>>>>> haven't labeled something that will be unintuitive. But the >>>>>>>>>>>> overall style >>>>>>>>>>>> of poll/select-based apis is quite common in programming going >>>>>>>>>>>> back to unix >>>>>>>>>>>> select so I don't think it's beyond people if explained well >>>>>>>>>>>> (after all we >>>>>>>>>>>> need to mix sync and async apis and if we don't say which is which >>>>>>>>>>>> any >>>>>>>>>>>> scheme will be confusing). >>>>>>>>>>>> >>>>>>>>>>>> For what it's worth the experience with this api has actually >>>>>>>>>>>> been about 1000x better than the issues people had around >>>>>>>>>>>> intuitiveness >>>>>>>>>>>> with the high-level api. The crazy blocking iterator, impossible >>>>>>>>>>>> internal >>>>>>>>>>>> queue sizing, baroque threading model, etc have all caused >>>>>>>>>>>> endless amounts >>>>>>>>>>>> of anger. Not to mention that that client effectively disqualifies >>>>>>>>>>>> about >>>>>>>>>>>> 50% of the use cases people want to try to use it for (plus I >>>>>>>>>>>> regularly >>>>>>>>>>>> hear people tell me they've heard not to use it at all for various >>>>>>>>>>>> reasons >>>>>>>>>>>> ranging from data loss to lack of features). It's important to >>>>>>>>>>>> have that >>>>>>>>>>>> context when people need to switch and they say "oh the old way >>>>>>>>>>>> was so >>>>>>>>>>>> simple and the new way complex!" :-) >>>>>>>>>>>> >>>>>>>>>>>> Let me give some context related to your points, based on our >>>>>>>>>>>> previous discussions: >>>>>>>>>>>> >>>>>>>>>>>> For commit, let's discuss, that is easy either way. >>>>>>>>>>>> >>>>>>>>>>>> The motivation for avoiding additional threading was two-fold. >>>>>>>>>>>> First this client is really intended to be the lowest level >>>>>>>>>>>> client. There >>>>>>>>>>>> are many, many possible higher level processing abstractions. One >>>>>>>>>>>> thing we >>>>>>>>>>>> found to be a big problem with the high-level client was that it >>>>>>>>>>>> coupled >>>>>>>>>>>> things everyone must have--failover, etc--with things that are >>>>>>>>>>>> different in >>>>>>>>>>>> each use case like the appropriate threading model. If you do this >>>>>>>>>>>> you need >>>>>>>>>>>> to also maintain a thread free low-level consumer api for people >>>>>>>>>>>> to get >>>>>>>>>>>> around whatever you have done. >>>>>>>>>>>> >>>>>>>>>>>> The second reason was that the internal threading in the client >>>>>>>>>>>> became quite complex. The answer with threading is always that "it >>>>>>>>>>>> won't be >>>>>>>>>>>> complex this time", but it always is. >>>>>>>>>>>> >>>>>>>>>>>> For the heartbeat you correctly describe the downside to >>>>>>>>>>>> coupling heartbeat with poll--the contract is that the application >>>>>>>>>>>> must >>>>>>>>>>>> regularly consume to be considered an active consumer. This allows >>>>>>>>>>>> the >>>>>>>>>>>> possibility of false positive failure detections. However it's >>>>>>>>>>>> important to >>>>>>>>>>>> understand the downside of the alternative. If you do background >>>>>>>>>>>> polling a >>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. This >>>>>>>>>>>> leads to >>>>>>>>>>>> all kinds of active consumers that aren't consuming because they >>>>>>>>>>>> have >>>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions and >>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. If you >>>>>>>>>>>> allow >>>>>>>>>>>> false positives the user sees the frequent rebalances and knows >>>>>>>>>>>> they aren't >>>>>>>>>>>> consuming frequently enough to be considered active but if you >>>>>>>>>>>> allows false >>>>>>>>>>>> negatives you end up having weeks go by before someone notices >>>>>>>>>>>> that a >>>>>>>>>>>> partition has been unconsumed the whole time at which point the >>>>>>>>>>>> data is >>>>>>>>>>>> gone. Plus of course even if you do this you still have regular >>>>>>>>>>>> false >>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this in >>>>>>>>>>>> some depth >>>>>>>>>>>> at the time and decided that it is better to have the liveness >>>>>>>>>>>> notion tied >>>>>>>>>>>> to *actual* consumption which is the actual definition of >>>>>>>>>>>> liveness. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> >>>>>>>>>>>> -Jay >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman < >>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Confluent Team. >>>>>>>>>>>>> >>>>>>>>>>>>> There has recently been a lot of open source activity >>>>>>>>>>>>> regarding the new KafkaConsumer: >>>>>>>>>>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>>>>>>>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>>>>>>>>>> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>>>>>>>>>> >>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza, >>>>>>>>>>>>> and some other teams and we got similar feedback. >>>>>>>>>>>>> >>>>>>>>>>>>> To summarize the feedback we received from other teams: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. >>>>>>>>>>>>> >>>>>>>>>>>>> The current behavior is not intuitive. For example, >>>>>>>>>>>>> KafkaConsumer.poll drives everything. The other methods like >>>>>>>>>>>>> subscribe, >>>>>>>>>>>>> unsubscribe, seek, commit(async) don’t do anything without a >>>>>>>>>>>>> KafkaConsumer.poll call. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 1. >>>>>>>>>>>>> >>>>>>>>>>>>> The semantics of a commit() call should be consistent >>>>>>>>>>>>> between sync and async operations. Currently, sync commit is a >>>>>>>>>>>>> blocking >>>>>>>>>>>>> call which actually sends out an OffsetCommitRequest and waits >>>>>>>>>>>>> for the >>>>>>>>>>>>> response upon the user’s KafkaConsumer.commit call. However, >>>>>>>>>>>>> the async >>>>>>>>>>>>> commit is a nonblocking call which just queues up the >>>>>>>>>>>>> OffsetCommitRequest. >>>>>>>>>>>>> The request itself is later sent out in the next poll. The >>>>>>>>>>>>> teams we talked >>>>>>>>>>>>> to found this misleading. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 1. >>>>>>>>>>>>> >>>>>>>>>>>>> Heartbeats are dependent on user application behavior >>>>>>>>>>>>> (i.e. user applications calling poll). This can be a big >>>>>>>>>>>>> problem as we >>>>>>>>>>>>> don’t control how different applications behave. For example, >>>>>>>>>>>>> we might have >>>>>>>>>>>>> an application which reads from Kafka and writes to Espresso. >>>>>>>>>>>>> If Espresso >>>>>>>>>>>>> is slow for whatever reason, then in rebalances could happen. >>>>>>>>>>>>> >>>>>>>>>>>>> Generally speaking, we feel that the current KafkaConsumer API >>>>>>>>>>>>> design is more of a wrapping around the old simple consumer, i.e. >>>>>>>>>>>>> in old >>>>>>>>>>>>> consumer we ask users to deal with raw protocols and error >>>>>>>>>>>>> handlings while >>>>>>>>>>>>> in KafkaConsumer we do that for users. However, for old high >>>>>>>>>>>>> level consumer >>>>>>>>>>>>> users (which are the majority of users), the experience is a >>>>>>>>>>>>> noticeable >>>>>>>>>>>>> regression. The old high level consumer interface is simple and >>>>>>>>>>>>> easy to use >>>>>>>>>>>>> for end user, while KafkaConsumer requires users to be aware of >>>>>>>>>>>>> many >>>>>>>>>>>>> underlying details and is becoming prohibitive for users to >>>>>>>>>>>>> adopt. This is >>>>>>>>>>>>> hinted by the javadoc growing bigger and bigger. >>>>>>>>>>>>> >>>>>>>>>>>>> We think it's getting to the point where we should take a step >>>>>>>>>>>>> back and look at the big picture. >>>>>>>>>>>>> >>>>>>>>>>>>> The current state of KafkaConsumer is that it's >>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called by the >>>>>>>>>>>>> user >>>>>>>>>>>>> which pretty much drives everything: >>>>>>>>>>>>> >>>>>>>>>>>>> - data fetches >>>>>>>>>>>>> >>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>> >>>>>>>>>>>>> - join groups (new consumer joining a group, topic >>>>>>>>>>>>> subscription changes, reacting to group rebalance) >>>>>>>>>>>>> >>>>>>>>>>>>> - async offset commits >>>>>>>>>>>>> >>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>> >>>>>>>>>>>>> Given that the selector's poll is being driven by the end >>>>>>>>>>>>> user, this ends up making us educate users on NIO and the >>>>>>>>>>>>> consequences of >>>>>>>>>>>>> not calling KafkaConsumer.poll frequently enough: >>>>>>>>>>>>> >>>>>>>>>>>>> - Coordinator will mark the consumer dead >>>>>>>>>>>>> >>>>>>>>>>>>> - async commits won't send >>>>>>>>>>>>> >>>>>>>>>>>>> - callbacks won't fire >>>>>>>>>>>>> >>>>>>>>>>>>> More generally speaking, there are many surprises with the >>>>>>>>>>>>> current KafkaConsumer implementation. >>>>>>>>>>>>> >>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer: >>>>>>>>>>>>> >>>>>>>>>>>>> - NIO >>>>>>>>>>>>> >>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume messages >>>>>>>>>>>>> >>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or >>>>>>>>>>>>> partitions(explicit group management) >>>>>>>>>>>>> >>>>>>>>>>>>> - no surprises in the user experience >>>>>>>>>>>>> >>>>>>>>>>>>> The last point is the big one that we think we aren't hitting. >>>>>>>>>>>>> We think the most important example is that there should be no >>>>>>>>>>>>> requirement >>>>>>>>>>>>> from the end user to consistently KafkaConsumer.poll in order for >>>>>>>>>>>>> all of >>>>>>>>>>>>> the above tasks to happen. We think it would be better to split >>>>>>>>>>>>> those tasks >>>>>>>>>>>>> into tasks that should not rely on KafkaConsumer.poll and tasks >>>>>>>>>>>>> that should >>>>>>>>>>>>> rely on KafkaConsumer.poll. >>>>>>>>>>>>> >>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>>>>>>>>>> >>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>> >>>>>>>>>>>>> - join groups >>>>>>>>>>>>> >>>>>>>>>>>>> - commits >>>>>>>>>>>>> >>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>> >>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll >>>>>>>>>>>>> >>>>>>>>>>>>> This would help reduce the amount of surprises to the end user. >>>>>>>>>>>>> >>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to you >>>>>>>>>>>>> guys early next week. We’d like to meet up with you at LinkedIn >>>>>>>>>>>>> on *July >>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it to open >>>>>>>>>>>>> source. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> LinkedIn Kafka Dev Team >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Thanks, >>>>>>>>>>> Neha >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Neha >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >>> >>> >>> -- >>> Thanks, >>> Ewen >>> >> >> > > > -- > Thanks, > Neha > -- -- Guozhang