Hey Adi, When we designed the initial version, the producer API was still changing. I thought about adding the Future and then just didn't get to it. I agree that we should look into adding it for consistency.
Thanks, Neha On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <aaurad...@linkedin.com> wrote: > Great discussion everyone! > > One general comment on the sync/async API's on the new consumer. I think > the producer tackles sync vs async API's well. For API's that can either be > sync or async, can we simply return a future? That seems more elegant for > the API's that make sense either in both flavors. From the users > perspective, it is more consistent with the new producer. One easy example > is the commit call with the CommitType enum.. we can make that call always > async and users can block on the future if they want to make sure their > offsets are committed. > > Aditya > > > On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <okara...@linkedin.com> > wrote: > >> Thanks for the great responses, everyone! >> >> To expand a tiny bit on my initial post: while I did bring up old high >> level consumers, the teams we spoke to were actually not the types of >> services that simply wanted an easy way to get ConsumerRecords. We spoke to >> infrastructure teams that I would consider to be closer to the "power-user" >> end of the spectrum and would want KafkaConsumer's level of granularity. >> Some would use auto group management. Some would use explicit group >> management. All of them would turn off auto offset commits. Yes, the Samza >> team had prior experience with the old SimpleConsumer, but this is the >> first kafka consumer being used by the Databus team. So I don't really >> think the feedback received was about the simpler times or wanting >> additional higher-level clients. >> >> - Onur >> >> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >>> I think if we recommend a longer session timeout, then we should expose >>> the heartbeat frequency in configuration since this generally controls how >>> long normal rebalances will take. I think it's currently hard-coded to 3 >>> heartbeats per session timeout. It could also be nice to have an explicit >>> LeaveGroup request to implement clean shutdown of a consumer. Then the >>> coordinator doesn't have to wait for the timeout to reassign partitions. >>> >>> -Jason >>> >>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> wrote: >>> >>>> Hey Kartik, >>>> >>>> Totally agree we don't want people tuning timeouts in the common case. >>>> >>>> However there are two ways to avoid this: >>>> 1. Default the timeout high >>>> 2. Put the heartbeat in a separate thread >>>> >>>> When we were doing the consumer design we discussed this tradeoff and I >>>> think the conclusion we came to was that defaulting to a high timeout was >>>> actually better. This means it takes a little longer to detect a failure, >>>> but usually that is not a big problem and people who want faster failure >>>> detection can tune it down. This seemed better than having the failure >>>> detection not really cover the consumption and just be a background ping. >>>> The two reasons where (a) you still have the GC problem even for the >>>> background thread, (b) consumption is in some sense a better definition of >>>> an active healthy consumer and a lot of problems crop up when you have an >>>> inactive consumer with an active background thread (as today). >>>> >>>> When we had the discussion I think what we realized was that most >>>> people who were worried about the timeout where imagining a very low >>>> default (500ms) say. But in fact just setting this to 60 seconds or higher >>>> as a default would be okay, this adds to the failure detection time but >>>> only apps that care about this need to tune. This should largely eliminate >>>> false positives since after all if you disappear for 60 seconds that >>>> actually starts to be more of a true positive, even if you come back... :-) >>>> >>>> -Jay >>>> >>>> >>>> >>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < >>>> kparamasi...@linkedin.com> wrote: >>>> >>>>> adding the open source alias. This email started off as a broader >>>>> discussion around the new consumer. I was zooming into only the aspect of >>>>> poll() being the only mechanism for driving the heartbeats. >>>>> >>>>> Yes the lag is the effect of the problem (not the problem). >>>>> Monitoring the lag is important as it is the primary way to tell if the >>>>> application is wedged. There might be other metrics which can possibly >>>>> capture the same essence. Yes the lag is at the consumer group level, but >>>>> you can tell that one of the consumers is messed up if one of the >>>>> partitions in the application start generating lag and others are good for >>>>> e.g. >>>>> >>>>> Monitoring aside, I think the main point of concern is that in the old >>>>> consumer most customers don't have to worry about unnecessary rebalances >>>>> and most of the things that they do in their app doesn't have an impact on >>>>> the session timeout.. (i.e. the only thing that causes rebalances is when >>>>> the GC is out of whack). For the handful of customers who are impacted >>>>> by GC related rebalances, i would imagine that all of them would really >>>>> want us to make the system more resilient. I agree that the GC problem >>>>> can't be solved easily in the java client, however it appears that now we >>>>> would be expecting the consuming applications to be even more careful with >>>>> ongoing tuning of the timeouts. At LinkedIn, we have seen that most kafka >>>>> applications don't have much of a clue about configuring the timeouts and >>>>> just end up calling the Kafka team when their application sees rebalances. >>>>> >>>>> The other side effect of poll driving the heartbeats is that we have >>>>> to make sure that people don't set a poll timeout that is larger than the >>>>> session timeout. If we had a notion of implicit heartbeats then we could >>>>> also automatically make this work for consumers by sending hearbeats at >>>>> the >>>>> appropriate interval even though the customers want to do a long poll. >>>>> >>>>> We could surely work around this in LinkedIn if either we have the >>>>> Pause() api or an explicit HeartBeat() api on the consumer. >>>>> >>>>> Would love to hear how other people think about this subject ? >>>>> >>>>> Thanks >>>>> Kartik >>>>> >>>>> >>>>> >>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io> >>>>> wrote: >>>>> >>>>>> Agree with the dilemma you are pointing out, which is that there are >>>>>> many ways the application's message processing could fail and we wouldn't >>>>>> be able to model all of those in the consumer's failure detection >>>>>> mechanism. So we should try to model as much of it as we can so the >>>>>> consumer's failure detection is meaningful. >>>>>> >>>>>> >>>>>>> Point being that the only absolute way to really detect that an app >>>>>>> is healthy is to monitor lag. If the lag increases then for sure >>>>>>> something >>>>>>> is wrong. >>>>>> >>>>>> >>>>>> The lag is merely the effect of the problem, not the problem itself. >>>>>> Lag is also a consumer group level concept and the problem we have is >>>>>> being >>>>>> able to detect failures at the level of individual consumer instances. >>>>>> >>>>>> As you pointed out, a consumer that poll() is a stronger indicator of >>>>>> whether the consumer is alive or not. The dilemma then is who defines >>>>>> what >>>>>> a healthy poll() frequency is. No one else but the application owner can >>>>>> define what a "normal" processing latency is for their application. Now >>>>>> the >>>>>> question is what's the easiest way for the user to define this without >>>>>> having to tune and fine tune this too often. The heartbeat interval >>>>>> certainly does not have to be *exactly* 99tile of processing latency >>>>>> but could be in the ballpark + an error delta. The error delta is the >>>>>> application owner's acceptable risk threshold during which they would be >>>>>> ok >>>>>> if the application remains part of the group despite being dead. It is >>>>>> ultimately a tradeoff between operational ease and more accurate failure >>>>>> detection. >>>>>> >>>>>> With quotas the write latencies to kafka could range from a few >>>>>>> milliseconds all the way to a tens of seconds. >>>>>> >>>>>> >>>>>> This is actually no different from the GC problem. Most most of the >>>>>> times, the normal GC falls in the few ms range and there are many >>>>>> applications even at LinkedIn for which the max GC falls in the multiple >>>>>> seconds range. Note that it also can't be predicted, so has to be an >>>>>> observed value. One way or the other, you have to observe what this >>>>>> acceptable "max" is for your application and then set the appropriate >>>>>> timeouts. >>>>>> >>>>>> Since this is not something that can be automated, this is a config >>>>>> that the application owner has to set based on the expected behavior of >>>>>> their application. Not wanting to do that leads to ending up with bad >>>>>> consumption semantics where the application process continues to be part >>>>>> of >>>>>> a group owning partitions but not consuming since it has halted due to a >>>>>> problem. The fact that the design requires them to express that in poll() >>>>>> frequency or not doesn't change the fact that the application owner has >>>>>> to >>>>>> go through the process of measuring and then defining this "max". >>>>>> >>>>>> The reverse where they don't do this and the application remains in >>>>>> the group despite being dead is super confusing and frustrating too. So >>>>>> the >>>>>> due diligence up front is actually worth. And as long as the poll() >>>>>> latency >>>>>> and processing latency can be monitored, it should be easy to tell the >>>>>> reason for a rebalance, whether that is valid or not and how that should >>>>>> be >>>>>> tuned. >>>>>> >>>>>> As for the wrapper, KIP-28 is the wrapper in open source that will >>>>>> hide this complexity and I agree that LI is unblocked since you can do >>>>>> this >>>>>> in TrackerConsumer in the meantime. >>>>>> >>>>>> Thanks, >>>>>> Neha >>>>>> >>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>>>>> kparamasi...@linkedin.com> wrote: >>>>>> >>>>>>> For commit(), I think it should hopefully be an easier discussion, >>>>>>> so maybe we can follow up when we meet up next. >>>>>>> >>>>>>> As far as the heartbeat is concerned, I think the points you discuss >>>>>>> are all very valid. >>>>>>> >>>>>>> GC pauses impacting the heartbeats is a real issue. However there >>>>>>> are a smaller percentage of memory hungry apps that get hit by it. >>>>>>> >>>>>>> The broader issue whereby even if the heartbeats are healthy, the >>>>>>> app might not be behaving correctly is also real. If the app is calling >>>>>>> poll() then the probability that the app is healthy is surely higher. >>>>>>> But >>>>>>> this again isn't an absolute measure that the app is processing >>>>>>> correctly. >>>>>>> In other cases the app might have even died in which case this >>>>>>> discussion is moot. >>>>>>> >>>>>>> Point being that the only absolute way to really detect that an app >>>>>>> is healthy is to monitor lag. If the lag increases then for sure >>>>>>> something >>>>>>> is wrong. >>>>>>> >>>>>>> The proposal seems to be that the application needs to tune their >>>>>>> session timeout based on the 99tile of the time they take to process >>>>>>> events >>>>>>> after every poll. This turns out is a nontrivial thing to do for an >>>>>>> application todo. To start with when an application is new their data is >>>>>>> going to be based on tests that they have done on synthetic data. This >>>>>>> often times doesn't represent what they will see in production. Once >>>>>>> the >>>>>>> app is in production their processing latencies will potentially vary >>>>>>> over >>>>>>> time. It is extremely unlikely that the application owner does a >>>>>>> careful >>>>>>> job of monitoring the 99tile of latencies over time and readjust the >>>>>>> settings. Often times the latencies vary because of variance is other >>>>>>> services that are called by the consumer as part of processing the >>>>>>> events. >>>>>>> >>>>>>> Case in point would be a simple app which reads events and writes to >>>>>>> Kafka. With quotas the write latencies to kafka could range from a few >>>>>>> milliseconds all the way to a tens of seconds. As the scale of >>>>>>> processing >>>>>>> for an app increasing the app or that 'user' could now get quotaed. >>>>>>> Instead of slowing down gracefully unless the application owner has >>>>>>> carefully tuned the timeout, now we are looking at a potential outage >>>>>>> where >>>>>>> the app could get hit by constant rebalances. >>>>>>> >>>>>>> If we expose the pause() Api then It is possible for us to take care >>>>>>> of this in the linkedin wrapper. Whereby we would keep calling poll on >>>>>>> a >>>>>>> separate thread periodically and enqueue the messages. When the queue is >>>>>>> full we would call pause(). >>>>>>> >>>>>>> In essence we can work around it in LinkedIn, however I think it is >>>>>>> vastly better if we address this in the Api as every major customer will >>>>>>> eventually be pained by it. >>>>>>> >>>>>>> Kartik >>>>>>> >>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> wrote: >>>>>>> >>>>>>> Hey guys, >>>>>>> >>>>>>> Happy to discuss. I agree there may be some rough edges and now is >>>>>>> definitely the time to clean them up. >>>>>>> >>>>>>> I'm pretty reluctant to change the threading model or undergo a big >>>>>>> api redesign at this point beyond the group management stuff we've >>>>>>> discussed in the context of Samza/copycat which is already a big effort. >>>>>>> >>>>>>> Overall I agree that we have done a poor job of documenting which >>>>>>> apis block and which don't and when people are surprised because we >>>>>>> haven't >>>>>>> labeled something that will be unintuitive. But the overall style of >>>>>>> poll/select-based apis is quite common in programming going back to unix >>>>>>> select so I don't think it's beyond people if explained well (after all >>>>>>> we >>>>>>> need to mix sync and async apis and if we don't say which is which any >>>>>>> scheme will be confusing). >>>>>>> >>>>>>> For what it's worth the experience with this api has actually been >>>>>>> about 1000x better than the issues people had around intuitiveness with >>>>>>> the >>>>>>> high-level api. The crazy blocking iterator, impossible internal queue >>>>>>> sizing, baroque threading model, etc have all caused endless amounts of >>>>>>> anger. Not to mention that that client effectively disqualifies about >>>>>>> 50% >>>>>>> of the use cases people want to try to use it for (plus I regularly hear >>>>>>> people tell me they've heard not to use it at all for various reasons >>>>>>> ranging from data loss to lack of features). It's important to have that >>>>>>> context when people need to switch and they say "oh the old way was so >>>>>>> simple and the new way complex!" :-) >>>>>>> >>>>>>> Let me give some context related to your points, based on our >>>>>>> previous discussions: >>>>>>> >>>>>>> For commit, let's discuss, that is easy either way. >>>>>>> >>>>>>> The motivation for avoiding additional threading was two-fold. First >>>>>>> this client is really intended to be the lowest level client. There are >>>>>>> many, many possible higher level processing abstractions. One thing we >>>>>>> found to be a big problem with the high-level client was that it coupled >>>>>>> things everyone must have--failover, etc--with things that are >>>>>>> different in >>>>>>> each use case like the appropriate threading model. If you do this you >>>>>>> need >>>>>>> to also maintain a thread free low-level consumer api for people to get >>>>>>> around whatever you have done. >>>>>>> >>>>>>> The second reason was that the internal threading in the client >>>>>>> became quite complex. The answer with threading is always that "it >>>>>>> won't be >>>>>>> complex this time", but it always is. >>>>>>> >>>>>>> For the heartbeat you correctly describe the downside to coupling >>>>>>> heartbeat with poll--the contract is that the application must regularly >>>>>>> consume to be considered an active consumer. This allows the >>>>>>> possibility of >>>>>>> false positive failure detections. However it's important to understand >>>>>>> the >>>>>>> downside of the alternative. If you do background polling a consumer is >>>>>>> considered active as long as it isn't shutdown. This leads to all kinds >>>>>>> of >>>>>>> active consumers that aren't consuming because they have leaked or >>>>>>> otherwise stopped but are still claiming partitions and heart-beating. >>>>>>> This >>>>>>> failure mode is actually far far worse. If you allow false positives the >>>>>>> user sees the frequent rebalances and knows they aren't consuming >>>>>>> frequently enough to be considered active but if you allows false >>>>>>> negatives >>>>>>> you end up having weeks go by before someone notices that a partition >>>>>>> has >>>>>>> been unconsumed the whole time at which point the data is gone. Plus of >>>>>>> course even if you do this you still have regular false positives anyway >>>>>>> from GC pauses (as now). We discussed this in some depth at the time and >>>>>>> decided that it is better to have the liveness notion tied to >>>>>>> *actual* consumption which is the actual definition of liveness. >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> -Jay >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <okara...@linkedin.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Hi Confluent Team. >>>>>>>> >>>>>>>> There has recently been a lot of open source activity regarding the >>>>>>>> new KafkaConsumer: >>>>>>>> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>>>>> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>>>>> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>>>>> >>>>>>>> >>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>>>>> >>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza, and >>>>>>>> some other teams and we got similar feedback. >>>>>>>> >>>>>>>> To summarize the feedback we received from other teams: >>>>>>>> >>>>>>>> 1. >>>>>>>> >>>>>>>> The current behavior is not intuitive. For example, >>>>>>>> KafkaConsumer.poll drives everything. The other methods like >>>>>>>> subscribe, >>>>>>>> unsubscribe, seek, commit(async) don’t do anything without a >>>>>>>> KafkaConsumer.poll call. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 1. >>>>>>>> >>>>>>>> The semantics of a commit() call should be consistent between >>>>>>>> sync and async operations. Currently, sync commit is a blocking >>>>>>>> call which >>>>>>>> actually sends out an OffsetCommitRequest and waits for the >>>>>>>> response upon >>>>>>>> the user’s KafkaConsumer.commit call. However, the async commit is a >>>>>>>> nonblocking call which just queues up the OffsetCommitRequest. The >>>>>>>> request >>>>>>>> itself is later sent out in the next poll. The teams we talked to >>>>>>>> found >>>>>>>> this misleading. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 1. >>>>>>>> >>>>>>>> Heartbeats are dependent on user application behavior (i.e. >>>>>>>> user applications calling poll). This can be a big problem as we >>>>>>>> don’t >>>>>>>> control how different applications behave. For example, we might >>>>>>>> have an >>>>>>>> application which reads from Kafka and writes to Espresso. If >>>>>>>> Espresso is >>>>>>>> slow for whatever reason, then in rebalances could happen. >>>>>>>> >>>>>>>> Generally speaking, we feel that the current KafkaConsumer API >>>>>>>> design is more of a wrapping around the old simple consumer, i.e. in >>>>>>>> old >>>>>>>> consumer we ask users to deal with raw protocols and error handlings >>>>>>>> while >>>>>>>> in KafkaConsumer we do that for users. However, for old high level >>>>>>>> consumer >>>>>>>> users (which are the majority of users), the experience is a noticeable >>>>>>>> regression. The old high level consumer interface is simple and easy >>>>>>>> to use >>>>>>>> for end user, while KafkaConsumer requires users to be aware of many >>>>>>>> underlying details and is becoming prohibitive for users to adopt. >>>>>>>> This is >>>>>>>> hinted by the javadoc growing bigger and bigger. >>>>>>>> >>>>>>>> We think it's getting to the point where we should take a step back >>>>>>>> and look at the big picture. >>>>>>>> >>>>>>>> The current state of KafkaConsumer is that it's single-threaded. >>>>>>>> There's one big KafkaConsumer.poll called by the user which pretty much >>>>>>>> drives everything: >>>>>>>> >>>>>>>> - data fetches >>>>>>>> >>>>>>>> - heartbeats >>>>>>>> >>>>>>>> - join groups (new consumer joining a group, topic subscription >>>>>>>> changes, reacting to group rebalance) >>>>>>>> >>>>>>>> - async offset commits >>>>>>>> >>>>>>>> - executing callbacks >>>>>>>> >>>>>>>> Given that the selector's poll is being driven by the end user, >>>>>>>> this ends up making us educate users on NIO and the consequences of not >>>>>>>> calling KafkaConsumer.poll frequently enough: >>>>>>>> >>>>>>>> - Coordinator will mark the consumer dead >>>>>>>> >>>>>>>> - async commits won't send >>>>>>>> >>>>>>>> - callbacks won't fire >>>>>>>> >>>>>>>> More generally speaking, there are many surprises with the current >>>>>>>> KafkaConsumer implementation. >>>>>>>> >>>>>>>> Here's what we consider to be the goals of KafkaConsumer: >>>>>>>> >>>>>>>> - NIO >>>>>>>> >>>>>>>> - ability to commit, manipulate offsets, and consume messages >>>>>>>> >>>>>>>> - a way to subscribe to topics(auto group management) or >>>>>>>> partitions(explicit group management) >>>>>>>> >>>>>>>> - no surprises in the user experience >>>>>>>> >>>>>>>> The last point is the big one that we think we aren't hitting. We >>>>>>>> think the most important example is that there should be no requirement >>>>>>>> from the end user to consistently KafkaConsumer.poll in order for all >>>>>>>> of >>>>>>>> the above tasks to happen. We think it would be better to split those >>>>>>>> tasks >>>>>>>> into tasks that should not rely on KafkaConsumer.poll and tasks that >>>>>>>> should >>>>>>>> rely on KafkaConsumer.poll. >>>>>>>> >>>>>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>>>>> >>>>>>>> - heartbeats >>>>>>>> >>>>>>>> - join groups >>>>>>>> >>>>>>>> - commits >>>>>>>> >>>>>>>> - executing callbacks >>>>>>>> >>>>>>>> Only data fetches should rely on KafkaConsumer.poll >>>>>>>> >>>>>>>> This would help reduce the amount of surprises to the end user. >>>>>>>> >>>>>>>> We’ve sketched out a proposal and we’ll send it out to you guys >>>>>>>> early next week. We’d like to meet up with you at LinkedIn on *July >>>>>>>> 31, 2015* so we can talk about it before proposing it to open >>>>>>>> source. >>>>>>>> >>>>>>>> Regards, >>>>>>>> LinkedIn Kafka Dev Team >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Neha >>>>>> >>>>> >>>>> >>>> >>> >> > -- Thanks, Neha