Kartik, on your second point about timeouts with poll() and heartbeats, the consumer now handles this properly. KAFKA-2123 introduced a DelayedTaskQueue and that is used internally to handle processing events at the right time even if poll() is called with a large timeout. The same mechanism is used to handle auto commit, which should also occur in a timely fashion even if poll() is called with a large timeout.
On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> wrote: > Hey Kartik, > > Totally agree we don't want people tuning timeouts in the common case. > > However there are two ways to avoid this: > 1. Default the timeout high > 2. Put the heartbeat in a separate thread > > When we were doing the consumer design we discussed this tradeoff and I > think the conclusion we came to was that defaulting to a high timeout was > actually better. This means it takes a little longer to detect a failure, > but usually that is not a big problem and people who want faster failure > detection can tune it down. This seemed better than having the failure > detection not really cover the consumption and just be a background ping. > The two reasons where (a) you still have the GC problem even for the > background thread, (b) consumption is in some sense a better definition of > an active healthy consumer and a lot of problems crop up when you have an > inactive consumer with an active background thread (as today). > > When we had the discussion I think what we realized was that most people > who were worried about the timeout where imagining a very low default > (500ms) say. But in fact just setting this to 60 seconds or higher as a > default would be okay, this adds to the failure detection time but only > apps that care about this need to tune. This should largely eliminate false > positives since after all if you disappear for 60 seconds that actually > starts to be more of a true positive, even if you come back... :-) > > -Jay > > > > On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < > kparamasi...@linkedin.com> wrote: > >> adding the open source alias. This email started off as a broader >> discussion around the new consumer. I was zooming into only the aspect of >> poll() being the only mechanism for driving the heartbeats. >> >> Yes the lag is the effect of the problem (not the problem). Monitoring >> the lag is important as it is the primary way to tell if the application is >> wedged. There might be other metrics which can possibly capture the same >> essence. Yes the lag is at the consumer group level, but you can tell that >> one of the consumers is messed up if one of the partitions in the >> application start generating lag and others are good for e.g. >> >> Monitoring aside, I think the main point of concern is that in the old >> consumer most customers don't have to worry about unnecessary rebalances >> and most of the things that they do in their app doesn't have an impact on >> the session timeout.. (i.e. the only thing that causes rebalances is when >> the GC is out of whack). For the handful of customers who are impacted >> by GC related rebalances, i would imagine that all of them would really >> want us to make the system more resilient. I agree that the GC problem >> can't be solved easily in the java client, however it appears that now we >> would be expecting the consuming applications to be even more careful with >> ongoing tuning of the timeouts. At LinkedIn, we have seen that most kafka >> applications don't have much of a clue about configuring the timeouts and >> just end up calling the Kafka team when their application sees rebalances. >> >> The other side effect of poll driving the heartbeats is that we have to >> make sure that people don't set a poll timeout that is larger than the >> session timeout. If we had a notion of implicit heartbeats then we could >> also automatically make this work for consumers by sending hearbeats at the >> appropriate interval even though the customers want to do a long poll. >> >> We could surely work around this in LinkedIn if either we have the >> Pause() api or an explicit HeartBeat() api on the consumer. >> >> Would love to hear how other people think about this subject ? >> >> Thanks >> Kartik >> >> >> >> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io> wrote: >> >>> Agree with the dilemma you are pointing out, which is that there are >>> many ways the application's message processing could fail and we wouldn't >>> be able to model all of those in the consumer's failure detection >>> mechanism. So we should try to model as much of it as we can so the >>> consumer's failure detection is meaningful. >>> >>> >>>> Point being that the only absolute way to really detect that an app is >>>> healthy is to monitor lag. If the lag increases then for sure something is >>>> wrong. >>> >>> >>> The lag is merely the effect of the problem, not the problem itself. Lag >>> is also a consumer group level concept and the problem we have is being >>> able to detect failures at the level of individual consumer instances. >>> >>> As you pointed out, a consumer that poll() is a stronger indicator of >>> whether the consumer is alive or not. The dilemma then is who defines what >>> a healthy poll() frequency is. No one else but the application owner can >>> define what a "normal" processing latency is for their application. Now the >>> question is what's the easiest way for the user to define this without >>> having to tune and fine tune this too often. The heartbeat interval >>> certainly does not have to be *exactly* 99tile of processing latency >>> but could be in the ballpark + an error delta. The error delta is the >>> application owner's acceptable risk threshold during which they would be ok >>> if the application remains part of the group despite being dead. It is >>> ultimately a tradeoff between operational ease and more accurate failure >>> detection. >>> >>> With quotas the write latencies to kafka could range from a few >>>> milliseconds all the way to a tens of seconds. >>> >>> >>> This is actually no different from the GC problem. Most most of the >>> times, the normal GC falls in the few ms range and there are many >>> applications even at LinkedIn for which the max GC falls in the multiple >>> seconds range. Note that it also can't be predicted, so has to be an >>> observed value. One way or the other, you have to observe what this >>> acceptable "max" is for your application and then set the appropriate >>> timeouts. >>> >>> Since this is not something that can be automated, this is a config that >>> the application owner has to set based on the expected behavior of their >>> application. Not wanting to do that leads to ending up with bad consumption >>> semantics where the application process continues to be part of a group >>> owning partitions but not consuming since it has halted due to a problem. >>> The fact that the design requires them to express that in poll() frequency >>> or not doesn't change the fact that the application owner has to go through >>> the process of measuring and then defining this "max". >>> >>> The reverse where they don't do this and the application remains in the >>> group despite being dead is super confusing and frustrating too. So the due >>> diligence up front is actually worth. And as long as the poll() latency and >>> processing latency can be monitored, it should be easy to tell the reason >>> for a rebalance, whether that is valid or not and how that should be tuned. >>> >>> As for the wrapper, KIP-28 is the wrapper in open source that will hide >>> this complexity and I agree that LI is unblocked since you can do this in >>> TrackerConsumer in the meantime. >>> >>> Thanks, >>> Neha >>> >>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>> kparamasi...@linkedin.com> wrote: >>> >>>> For commit(), I think it should hopefully be an easier discussion, so >>>> maybe we can follow up when we meet up next. >>>> >>>> As far as the heartbeat is concerned, I think the points you discuss >>>> are all very valid. >>>> >>>> GC pauses impacting the heartbeats is a real issue. However there are a >>>> smaller percentage of memory hungry apps that get hit by it. >>>> >>>> The broader issue whereby even if the heartbeats are healthy, the app >>>> might not be behaving correctly is also real. If the app is calling poll() >>>> then the probability that the app is healthy is surely higher. But this >>>> again isn't an absolute measure that the app is processing correctly. >>>> In other cases the app might have even died in which case this >>>> discussion is moot. >>>> >>>> Point being that the only absolute way to really detect that an app is >>>> healthy is to monitor lag. If the lag increases then for sure something is >>>> wrong. >>>> >>>> The proposal seems to be that the application needs to tune their >>>> session timeout based on the 99tile of the time they take to process events >>>> after every poll. This turns out is a nontrivial thing to do for an >>>> application todo. To start with when an application is new their data is >>>> going to be based on tests that they have done on synthetic data. This >>>> often times doesn't represent what they will see in production. Once the >>>> app is in production their processing latencies will potentially vary over >>>> time. It is extremely unlikely that the application owner does a careful >>>> job of monitoring the 99tile of latencies over time and readjust the >>>> settings. Often times the latencies vary because of variance is other >>>> services that are called by the consumer as part of processing the events. >>>> >>>> Case in point would be a simple app which reads events and writes to >>>> Kafka. With quotas the write latencies to kafka could range from a few >>>> milliseconds all the way to a tens of seconds. As the scale of processing >>>> for an app increasing the app or that 'user' could now get quotaed. >>>> Instead of slowing down gracefully unless the application owner has >>>> carefully tuned the timeout, now we are looking at a potential outage where >>>> the app could get hit by constant rebalances. >>>> >>>> If we expose the pause() Api then It is possible for us to take care of >>>> this in the linkedin wrapper. Whereby we would keep calling poll on a >>>> separate thread periodically and enqueue the messages. When the queue is >>>> full we would call pause(). >>>> >>>> In essence we can work around it in LinkedIn, however I think it is >>>> vastly better if we address this in the Api as every major customer will >>>> eventually be pained by it. >>>> >>>> Kartik >>>> >>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> wrote: >>>> >>>> Hey guys, >>>> >>>> Happy to discuss. I agree there may be some rough edges and now is >>>> definitely the time to clean them up. >>>> >>>> I'm pretty reluctant to change the threading model or undergo a big api >>>> redesign at this point beyond the group management stuff we've discussed in >>>> the context of Samza/copycat which is already a big effort. >>>> >>>> Overall I agree that we have done a poor job of documenting which apis >>>> block and which don't and when people are surprised because we haven't >>>> labeled something that will be unintuitive. But the overall style of >>>> poll/select-based apis is quite common in programming going back to unix >>>> select so I don't think it's beyond people if explained well (after all we >>>> need to mix sync and async apis and if we don't say which is which any >>>> scheme will be confusing). >>>> >>>> For what it's worth the experience with this api has actually been >>>> about 1000x better than the issues people had around intuitiveness with the >>>> high-level api. The crazy blocking iterator, impossible internal queue >>>> sizing, baroque threading model, etc have all caused endless amounts of >>>> anger. Not to mention that that client effectively disqualifies about 50% >>>> of the use cases people want to try to use it for (plus I regularly hear >>>> people tell me they've heard not to use it at all for various reasons >>>> ranging from data loss to lack of features). It's important to have that >>>> context when people need to switch and they say "oh the old way was so >>>> simple and the new way complex!" :-) >>>> >>>> Let me give some context related to your points, based on our previous >>>> discussions: >>>> >>>> For commit, let's discuss, that is easy either way. >>>> >>>> The motivation for avoiding additional threading was two-fold. First >>>> this client is really intended to be the lowest level client. There are >>>> many, many possible higher level processing abstractions. One thing we >>>> found to be a big problem with the high-level client was that it coupled >>>> things everyone must have--failover, etc--with things that are different in >>>> each use case like the appropriate threading model. If you do this you need >>>> to also maintain a thread free low-level consumer api for people to get >>>> around whatever you have done. >>>> >>>> The second reason was that the internal threading in the client became >>>> quite complex. The answer with threading is always that "it won't be >>>> complex this time", but it always is. >>>> >>>> For the heartbeat you correctly describe the downside to coupling >>>> heartbeat with poll--the contract is that the application must regularly >>>> consume to be considered an active consumer. This allows the possibility of >>>> false positive failure detections. However it's important to understand the >>>> downside of the alternative. If you do background polling a consumer is >>>> considered active as long as it isn't shutdown. This leads to all kinds of >>>> active consumers that aren't consuming because they have leaked or >>>> otherwise stopped but are still claiming partitions and heart-beating. This >>>> failure mode is actually far far worse. If you allow false positives the >>>> user sees the frequent rebalances and knows they aren't consuming >>>> frequently enough to be considered active but if you allows false negatives >>>> you end up having weeks go by before someone notices that a partition has >>>> been unconsumed the whole time at which point the data is gone. Plus of >>>> course even if you do this you still have regular false positives anyway >>>> from GC pauses (as now). We discussed this in some depth at the time and >>>> decided that it is better to have the liveness notion tied to *actual* >>>> consumption which is the actual definition of liveness. >>>> >>>> Cheers, >>>> >>>> -Jay >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <okara...@linkedin.com> >>>> wrote: >>>> >>>>> Hi Confluent Team. >>>>> >>>>> There has recently been a lot of open source activity regarding the >>>>> new KafkaConsumer: >>>>> >>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>> >>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>> >>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>> >>>>> >>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>> >>>>> We’ve explained the KafkaConsumer API to the Databus, Samza, and some >>>>> other teams and we got similar feedback. >>>>> >>>>> To summarize the feedback we received from other teams: >>>>> >>>>> 1. >>>>> >>>>> The current behavior is not intuitive. For example, >>>>> KafkaConsumer.poll drives everything. The other methods like subscribe, >>>>> unsubscribe, seek, commit(async) don’t do anything without a >>>>> KafkaConsumer.poll call. >>>>> >>>>> >>>>> >>>>> 1. >>>>> >>>>> The semantics of a commit() call should be consistent between sync >>>>> and async operations. Currently, sync commit is a blocking call which >>>>> actually sends out an OffsetCommitRequest and waits for the response >>>>> upon >>>>> the user’s KafkaConsumer.commit call. However, the async commit is a >>>>> nonblocking call which just queues up the OffsetCommitRequest. The >>>>> request >>>>> itself is later sent out in the next poll. The teams we talked to found >>>>> this misleading. >>>>> >>>>> >>>>> >>>>> 1. >>>>> >>>>> Heartbeats are dependent on user application behavior (i.e. user >>>>> applications calling poll). This can be a big problem as we don’t >>>>> control >>>>> how different applications behave. For example, we might have an >>>>> application which reads from Kafka and writes to Espresso. If Espresso >>>>> is >>>>> slow for whatever reason, then in rebalances could happen. >>>>> >>>>> Generally speaking, we feel that the current KafkaConsumer API design >>>>> is more of a wrapping around the old simple consumer, i.e. in old consumer >>>>> we ask users to deal with raw protocols and error handlings while in >>>>> KafkaConsumer we do that for users. However, for old high level consumer >>>>> users (which are the majority of users), the experience is a noticeable >>>>> regression. The old high level consumer interface is simple and easy to >>>>> use >>>>> for end user, while KafkaConsumer requires users to be aware of many >>>>> underlying details and is becoming prohibitive for users to adopt. This is >>>>> hinted by the javadoc growing bigger and bigger. >>>>> >>>>> We think it's getting to the point where we should take a step back >>>>> and look at the big picture. >>>>> >>>>> The current state of KafkaConsumer is that it's single-threaded. >>>>> There's one big KafkaConsumer.poll called by the user which pretty much >>>>> drives everything: >>>>> >>>>> - data fetches >>>>> >>>>> - heartbeats >>>>> >>>>> - join groups (new consumer joining a group, topic subscription >>>>> changes, reacting to group rebalance) >>>>> >>>>> - async offset commits >>>>> >>>>> - executing callbacks >>>>> >>>>> Given that the selector's poll is being driven by the end user, this >>>>> ends up making us educate users on NIO and the consequences of not calling >>>>> KafkaConsumer.poll frequently enough: >>>>> >>>>> - Coordinator will mark the consumer dead >>>>> >>>>> - async commits won't send >>>>> >>>>> - callbacks won't fire >>>>> >>>>> More generally speaking, there are many surprises with the current >>>>> KafkaConsumer implementation. >>>>> >>>>> Here's what we consider to be the goals of KafkaConsumer: >>>>> >>>>> - NIO >>>>> >>>>> - ability to commit, manipulate offsets, and consume messages >>>>> >>>>> - a way to subscribe to topics(auto group management) or >>>>> partitions(explicit group management) >>>>> >>>>> - no surprises in the user experience >>>>> >>>>> The last point is the big one that we think we aren't hitting. We >>>>> think the most important example is that there should be no requirement >>>>> from the end user to consistently KafkaConsumer.poll in order for all of >>>>> the above tasks to happen. We think it would be better to split those >>>>> tasks >>>>> into tasks that should not rely on KafkaConsumer.poll and tasks that >>>>> should >>>>> rely on KafkaConsumer.poll. >>>>> >>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>> >>>>> - heartbeats >>>>> >>>>> - join groups >>>>> >>>>> - commits >>>>> >>>>> - executing callbacks >>>>> >>>>> Only data fetches should rely on KafkaConsumer.poll >>>>> >>>>> This would help reduce the amount of surprises to the end user. >>>>> >>>>> We’ve sketched out a proposal and we’ll send it out to you guys early >>>>> next week. We’d like to meet up with you at LinkedIn on *July 31, >>>>> 2015* so we can talk about it before proposing it to open source. >>>>> >>>>> Regards, >>>>> LinkedIn Kafka Dev Team >>>>> >>>> >>>> >>> >>> >>> -- >>> Thanks, >>> Neha >>> >> >> > -- Thanks, Ewen