Works now. Thanks Becket! On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com> wrote:
> Ah... My bad, forgot to change the URL link for pictures. > Thanks for the quick response, Neha. It should be fixed now, can you try > again? > > Jiangjie (Becket) Qin > > On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> wrote: > >> Thanks Becket. Quick comment - there seem to be a bunch of images that >> the wiki refers to, but none loaded for me. Just making sure if its just me >> or can everyone not see the pictures? >> >> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> wrote: >> >>> I agree with Ewen that a single threaded model will be tricky to >>> implement the same conventional semantic of async or Future. We just >>> drafted the following wiki which explains our thoughts in LinkedIn on the >>> new consumer API and threading model. >>> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal >>> >>> We were trying to see: >>> 1. If we can use some kind of methodology to help us think about what >>> API we want to provide to user for different use cases. >>> 2. What is the pros and cons of current single threaded model. Is there >>> a way that we can maintain the benefits while solve the issues we are >>> facing now with single threaded model. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava < >>> e...@confluent.io> wrote: >>> >>>> >>>> >>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>> >>>>> I think Ewen has proposed these APIs for using callbacks along with >>>>> returning future in the commit calls, i.e. something similar to: >>>>> >>>>> public Future<void> commit(ConsumerCommitCallback callback); >>>>> >>>>> public Future<void> commit(Map<TopicPartition, Long> offsets, >>>>> ConsumerCommitCallback callback); >>>>> >>>>> At that time I was slightly intending not to include the Future >>>>> besides adding the callback mainly because of the implementation >>>>> complexity >>>>> I feel it could introduce along with the retry settings after looking >>>>> through the code base. I would happy to change my mind if we could propose >>>>> a prototype implementation that is simple enough. >>>>> >>>>> >>>> One of the reasons that interface ended up being difficult (or maybe >>>> impossible) to make work reasonably is because the consumer was thread-safe >>>> at the time. That made it impossible to know what should be done when >>>> Future.get() is called -- should the implementation call poll() itself, or >>>> would the fact that the user is calling get() imply that there's a >>>> background thread running the poll() loop and we just need to wait for it? >>>> >>>> The consumer is no longer thread safe, but I think the same problem >>>> remains because the expectation with Futures is that they are thread safe. >>>> Which means that even if the consumer isn't thread safe, I would expect to >>>> be able to hand that Future off to some other thread, have the second >>>> thread call get(), and then continue driving the poll loop in my thread >>>> (which in turn would eventually resolve the Future). >>>> >>>> I quite dislike the sync/async enum. While both operations commit >>>> offsets, their semantics are so different that overloading a single method >>>> with both is messy. That said, I don't think we should consider this an >>>> inconsistency wrt the new producer API's use of Future because the two APIs >>>> have a much more fundamental difference that justifies it: they have >>>> completely different threading and execution models. >>>> >>>> I think a Future-based API only makes sense if you can guarantee the >>>> operations that Futures are waiting on will continue to make progress >>>> regardless of what the thread using the Future does. The producer API makes >>>> that work by processing asynchronous requests in a background thread. The >>>> new consumer does not, and so it becomes difficult/impossible to implement >>>> the Future correctly. (Or, you have to make assumptions which break other >>>> use cases; if you want to support the simple use case of just making a >>>> commit() synchronous by calling get(), the Future has to call poll() >>>> internally; but if you do that, then if any user ever wants to add >>>> synchronization to the consumer via some external mechanism, then the >>>> implementation of the Future's get() method will not be subject to that >>>> synchronization and things will break). >>>> >>>> -Ewen >>>> >>>> >>>>> Guozhang >>>>> >>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io> >>>>> wrote: >>>>> >>>>>> Hey Adi, >>>>>> >>>>>> When we designed the initial version, the producer API was still >>>>>> changing. I thought about adding the Future and then just didn't get to >>>>>> it. >>>>>> I agree that we should look into adding it for consistency. >>>>>> >>>>>> Thanks, >>>>>> Neha >>>>>> >>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar < >>>>>> aaurad...@linkedin.com> wrote: >>>>>> >>>>>>> Great discussion everyone! >>>>>>> >>>>>>> One general comment on the sync/async API's on the new consumer. I >>>>>>> think the producer tackles sync vs async API's well. For API's that can >>>>>>> either be sync or async, can we simply return a future? That seems more >>>>>>> elegant for the API's that make sense either in both flavors. From the >>>>>>> users perspective, it is more consistent with the new producer. One easy >>>>>>> example is the commit call with the CommitType enum.. we can make that >>>>>>> call >>>>>>> always async and users can block on the future if they want to make sure >>>>>>> their offsets are committed. >>>>>>> >>>>>>> Aditya >>>>>>> >>>>>>> >>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <okara...@linkedin.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Thanks for the great responses, everyone! >>>>>>>> >>>>>>>> To expand a tiny bit on my initial post: while I did bring up old >>>>>>>> high level consumers, the teams we spoke to were actually not the >>>>>>>> types of >>>>>>>> services that simply wanted an easy way to get ConsumerRecords. We >>>>>>>> spoke to >>>>>>>> infrastructure teams that I would consider to be closer to the >>>>>>>> "power-user" >>>>>>>> end of the spectrum and would want KafkaConsumer's level of >>>>>>>> granularity. >>>>>>>> Some would use auto group management. Some would use explicit group >>>>>>>> management. All of them would turn off auto offset commits. Yes, the >>>>>>>> Samza >>>>>>>> team had prior experience with the old SimpleConsumer, but this is the >>>>>>>> first kafka consumer being used by the Databus team. So I don't really >>>>>>>> think the feedback received was about the simpler times or wanting >>>>>>>> additional higher-level clients. >>>>>>>> >>>>>>>> - Onur >>>>>>>> >>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson < >>>>>>>> ja...@confluent.io> wrote: >>>>>>>> >>>>>>>>> I think if we recommend a longer session timeout, then we should >>>>>>>>> expose the heartbeat frequency in configuration since this generally >>>>>>>>> controls how long normal rebalances will take. I think it's currently >>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be nice >>>>>>>>> to >>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown of a >>>>>>>>> consumer. Then the coordinator doesn't have to wait for the timeout to >>>>>>>>> reassign partitions. >>>>>>>>> >>>>>>>>> -Jason >>>>>>>>> >>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hey Kartik, >>>>>>>>>> >>>>>>>>>> Totally agree we don't want people tuning timeouts in the common >>>>>>>>>> case. >>>>>>>>>> >>>>>>>>>> However there are two ways to avoid this: >>>>>>>>>> 1. Default the timeout high >>>>>>>>>> 2. Put the heartbeat in a separate thread >>>>>>>>>> >>>>>>>>>> When we were doing the consumer design we discussed this tradeoff >>>>>>>>>> and I think the conclusion we came to was that defaulting to a high >>>>>>>>>> timeout >>>>>>>>>> was actually better. This means it takes a little longer to detect a >>>>>>>>>> failure, but usually that is not a big problem and people who want >>>>>>>>>> faster >>>>>>>>>> failure detection can tune it down. This seemed better than having >>>>>>>>>> the >>>>>>>>>> failure detection not really cover the consumption and just be a >>>>>>>>>> background >>>>>>>>>> ping. The two reasons where (a) you still have the GC problem even >>>>>>>>>> for the >>>>>>>>>> background thread, (b) consumption is in some sense a better >>>>>>>>>> definition of >>>>>>>>>> an active healthy consumer and a lot of problems crop up when you >>>>>>>>>> have an >>>>>>>>>> inactive consumer with an active background thread (as today). >>>>>>>>>> >>>>>>>>>> When we had the discussion I think what we realized was that most >>>>>>>>>> people who were worried about the timeout where imagining a very low >>>>>>>>>> default (500ms) say. But in fact just setting this to 60 seconds or >>>>>>>>>> higher >>>>>>>>>> as a default would be okay, this adds to the failure detection time >>>>>>>>>> but >>>>>>>>>> only apps that care about this need to tune. This should largely >>>>>>>>>> eliminate >>>>>>>>>> false positives since after all if you disappear for 60 seconds that >>>>>>>>>> actually starts to be more of a true positive, even if you come >>>>>>>>>> back... :-) >>>>>>>>>> >>>>>>>>>> -Jay >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < >>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>> >>>>>>>>>>> adding the open source alias. This email started off as a >>>>>>>>>>> broader discussion around the new consumer. I was zooming into >>>>>>>>>>> only the >>>>>>>>>>> aspect of poll() being the only mechanism for driving the >>>>>>>>>>> heartbeats. >>>>>>>>>>> >>>>>>>>>>> Yes the lag is the effect of the problem (not the problem). >>>>>>>>>>> Monitoring the lag is important as it is the primary way to tell if >>>>>>>>>>> the >>>>>>>>>>> application is wedged. There might be other metrics which can >>>>>>>>>>> possibly >>>>>>>>>>> capture the same essence. Yes the lag is at the consumer group >>>>>>>>>>> level, but >>>>>>>>>>> you can tell that one of the consumers is messed up if one of the >>>>>>>>>>> partitions in the application start generating lag and others are >>>>>>>>>>> good for >>>>>>>>>>> e.g. >>>>>>>>>>> >>>>>>>>>>> Monitoring aside, I think the main point of concern is that in >>>>>>>>>>> the old consumer most customers don't have to worry about >>>>>>>>>>> unnecessary >>>>>>>>>>> rebalances and most of the things that they do in their app doesn't >>>>>>>>>>> have an >>>>>>>>>>> impact on the session timeout.. (i.e. the only thing that causes >>>>>>>>>>> rebalances is when the GC is out of whack). For the handful of >>>>>>>>>>> customers >>>>>>>>>>> who are impacted by GC related rebalances, i would imagine that all >>>>>>>>>>> of them >>>>>>>>>>> would really want us to make the system more resilient. I agree >>>>>>>>>>> that the >>>>>>>>>>> GC problem can't be solved easily in the java client, however it >>>>>>>>>>> appears >>>>>>>>>>> that now we would be expecting the consuming applications to be >>>>>>>>>>> even more >>>>>>>>>>> careful with ongoing tuning of the timeouts. At LinkedIn, we have >>>>>>>>>>> seen >>>>>>>>>>> that most kafka applications don't have much of a clue about >>>>>>>>>>> configuring >>>>>>>>>>> the timeouts and just end up calling the Kafka team when their >>>>>>>>>>> application >>>>>>>>>>> sees rebalances. >>>>>>>>>>> >>>>>>>>>>> The other side effect of poll driving the heartbeats is that we >>>>>>>>>>> have to make sure that people don't set a poll timeout that is >>>>>>>>>>> larger than >>>>>>>>>>> the session timeout. If we had a notion of implicit heartbeats >>>>>>>>>>> then we >>>>>>>>>>> could also automatically make this work for consumers by sending >>>>>>>>>>> hearbeats >>>>>>>>>>> at the appropriate interval even though the customers want to do a >>>>>>>>>>> long >>>>>>>>>>> poll. >>>>>>>>>>> >>>>>>>>>>> We could surely work around this in LinkedIn if either we have >>>>>>>>>>> the Pause() api or an explicit HeartBeat() api on the consumer. >>>>>>>>>>> >>>>>>>>>>> Would love to hear how other people think about this subject ? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Kartik >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede < >>>>>>>>>>> n...@confluent.io> wrote: >>>>>>>>>>> >>>>>>>>>>>> Agree with the dilemma you are pointing out, which is that >>>>>>>>>>>> there are many ways the application's message processing could >>>>>>>>>>>> fail and we >>>>>>>>>>>> wouldn't be able to model all of those in the consumer's failure >>>>>>>>>>>> detection >>>>>>>>>>>> mechanism. So we should try to model as much of it as we can so the >>>>>>>>>>>> consumer's failure detection is meaningful. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> Point being that the only absolute way to really detect that >>>>>>>>>>>>> an app is healthy is to monitor lag. If the lag increases then >>>>>>>>>>>>> for sure >>>>>>>>>>>>> something is wrong. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> The lag is merely the effect of the problem, not the problem >>>>>>>>>>>> itself. Lag is also a consumer group level concept and the problem >>>>>>>>>>>> we have >>>>>>>>>>>> is being able to detect failures at the level of individual >>>>>>>>>>>> consumer >>>>>>>>>>>> instances. >>>>>>>>>>>> >>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger >>>>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma >>>>>>>>>>>> then is who >>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but the >>>>>>>>>>>> application >>>>>>>>>>>> owner can define what a "normal" processing latency is for their >>>>>>>>>>>> application. Now the question is what's the easiest way for the >>>>>>>>>>>> user to >>>>>>>>>>>> define this without having to tune and fine tune this too often. >>>>>>>>>>>> The >>>>>>>>>>>> heartbeat interval certainly does not have to be *exactly* >>>>>>>>>>>> 99tile of processing latency but could be in the ballpark + an >>>>>>>>>>>> error delta. >>>>>>>>>>>> The error delta is the application owner's acceptable risk >>>>>>>>>>>> threshold during >>>>>>>>>>>> which they would be ok if the application remains part of the >>>>>>>>>>>> group despite >>>>>>>>>>>> being dead. It is ultimately a tradeoff between operational ease >>>>>>>>>>>> and more >>>>>>>>>>>> accurate failure detection. >>>>>>>>>>>> >>>>>>>>>>>> With quotas the write latencies to kafka could range from a few >>>>>>>>>>>>> milliseconds all the way to a tens of seconds. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This is actually no different from the GC problem. Most most of >>>>>>>>>>>> the times, the normal GC falls in the few ms range and there are >>>>>>>>>>>> many >>>>>>>>>>>> applications even at LinkedIn for which the max GC falls in the >>>>>>>>>>>> multiple >>>>>>>>>>>> seconds range. Note that it also can't be predicted, so has to be >>>>>>>>>>>> an >>>>>>>>>>>> observed value. One way or the other, you have to observe what this >>>>>>>>>>>> acceptable "max" is for your application and then set the >>>>>>>>>>>> appropriate >>>>>>>>>>>> timeouts. >>>>>>>>>>>> >>>>>>>>>>>> Since this is not something that can be automated, this is a >>>>>>>>>>>> config that the application owner has to set based on the expected >>>>>>>>>>>> behavior >>>>>>>>>>>> of their application. Not wanting to do that leads to ending up >>>>>>>>>>>> with bad >>>>>>>>>>>> consumption semantics where the application process continues to >>>>>>>>>>>> be part of >>>>>>>>>>>> a group owning partitions but not consuming since it has halted >>>>>>>>>>>> due to a >>>>>>>>>>>> problem. The fact that the design requires them to express that in >>>>>>>>>>>> poll() >>>>>>>>>>>> frequency or not doesn't change the fact that the application >>>>>>>>>>>> owner has to >>>>>>>>>>>> go through the process of measuring and then defining this "max". >>>>>>>>>>>> >>>>>>>>>>>> The reverse where they don't do this and the application >>>>>>>>>>>> remains in the group despite being dead is super confusing and >>>>>>>>>>>> frustrating >>>>>>>>>>>> too. So the due diligence up front is actually worth. And as long >>>>>>>>>>>> as the >>>>>>>>>>>> poll() latency and processing latency can be monitored, it should >>>>>>>>>>>> be easy >>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or not >>>>>>>>>>>> and how >>>>>>>>>>>> that should be tuned. >>>>>>>>>>>> >>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source that >>>>>>>>>>>> will hide this complexity and I agree that LI is unblocked since >>>>>>>>>>>> you can do >>>>>>>>>>>> this in TrackerConsumer in the meantime. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Neha >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> For commit(), I think it should hopefully be an easier >>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next. >>>>>>>>>>>>> >>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points you >>>>>>>>>>>>> discuss are all very valid. >>>>>>>>>>>>> >>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. However >>>>>>>>>>>>> there are a smaller percentage of memory hungry apps that get hit >>>>>>>>>>>>> by it. >>>>>>>>>>>>> >>>>>>>>>>>>> The broader issue whereby even if the heartbeats are healthy, >>>>>>>>>>>>> the app might not be behaving correctly is also real. If the app >>>>>>>>>>>>> is >>>>>>>>>>>>> calling poll() then the probability that the app is healthy is >>>>>>>>>>>>> surely >>>>>>>>>>>>> higher. But this again isn't an absolute measure that the app is >>>>>>>>>>>>> processing correctly. >>>>>>>>>>>>> In other cases the app might have even died in which case this >>>>>>>>>>>>> discussion is moot. >>>>>>>>>>>>> >>>>>>>>>>>>> Point being that the only absolute way to really detect that >>>>>>>>>>>>> an app is healthy is to monitor lag. If the lag increases then >>>>>>>>>>>>> for sure >>>>>>>>>>>>> something is wrong. >>>>>>>>>>>>> >>>>>>>>>>>>> The proposal seems to be that the application needs to tune >>>>>>>>>>>>> their session timeout based on the 99tile of the time they take >>>>>>>>>>>>> to process >>>>>>>>>>>>> events after every poll. This turns out is a nontrivial thing >>>>>>>>>>>>> to do for >>>>>>>>>>>>> an application todo. To start with when an application is new >>>>>>>>>>>>> their data is >>>>>>>>>>>>> going to be based on tests that they have done on synthetic data. >>>>>>>>>>>>> This >>>>>>>>>>>>> often times doesn't represent what they will see in production. >>>>>>>>>>>>> Once the >>>>>>>>>>>>> app is in production their processing latencies will potentially >>>>>>>>>>>>> vary over >>>>>>>>>>>>> time. It is extremely unlikely that the application owner does a >>>>>>>>>>>>> careful >>>>>>>>>>>>> job of monitoring the 99tile of latencies over time and readjust >>>>>>>>>>>>> the >>>>>>>>>>>>> settings. Often times the latencies vary because of variance is >>>>>>>>>>>>> other >>>>>>>>>>>>> services that are called by the consumer as part of processing >>>>>>>>>>>>> the events. >>>>>>>>>>>>> >>>>>>>>>>>>> Case in point would be a simple app which reads events and >>>>>>>>>>>>> writes to Kafka. With quotas the write latencies to kafka could >>>>>>>>>>>>> range from >>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds. As the >>>>>>>>>>>>> scale of >>>>>>>>>>>>> processing for an app increasing the app or that 'user' could now >>>>>>>>>>>>> get >>>>>>>>>>>>> quotaed. Instead of slowing down gracefully unless the >>>>>>>>>>>>> application owner >>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a >>>>>>>>>>>>> potential outage >>>>>>>>>>>>> where the app could get hit by constant rebalances. >>>>>>>>>>>>> >>>>>>>>>>>>> If we expose the pause() Api then It is possible for us to >>>>>>>>>>>>> take care of this in the linkedin wrapper. Whereby we would keep >>>>>>>>>>>>> calling >>>>>>>>>>>>> poll on a separate thread periodically and enqueue the messages. >>>>>>>>>>>>> When the >>>>>>>>>>>>> queue is full we would call pause(). >>>>>>>>>>>>> >>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I think >>>>>>>>>>>>> it is vastly better if we address this in the Api as every major >>>>>>>>>>>>> customer >>>>>>>>>>>>> will eventually be pained by it. >>>>>>>>>>>>> >>>>>>>>>>>>> Kartik >>>>>>>>>>>>> >>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hey guys, >>>>>>>>>>>>> >>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges and >>>>>>>>>>>>> now is definitely the time to clean them up. >>>>>>>>>>>>> >>>>>>>>>>>>> I'm pretty reluctant to change the threading model or undergo >>>>>>>>>>>>> a big api redesign at this point beyond the group management >>>>>>>>>>>>> stuff we've >>>>>>>>>>>>> discussed in the context of Samza/copycat which is already a big >>>>>>>>>>>>> effort. >>>>>>>>>>>>> >>>>>>>>>>>>> Overall I agree that we have done a poor job of documenting >>>>>>>>>>>>> which apis block and which don't and when people are surprised >>>>>>>>>>>>> because we >>>>>>>>>>>>> haven't labeled something that will be unintuitive. But the >>>>>>>>>>>>> overall style >>>>>>>>>>>>> of poll/select-based apis is quite common in programming going >>>>>>>>>>>>> back to unix >>>>>>>>>>>>> select so I don't think it's beyond people if explained well >>>>>>>>>>>>> (after all we >>>>>>>>>>>>> need to mix sync and async apis and if we don't say which is >>>>>>>>>>>>> which any >>>>>>>>>>>>> scheme will be confusing). >>>>>>>>>>>>> >>>>>>>>>>>>> For what it's worth the experience with this api has actually >>>>>>>>>>>>> been about 1000x better than the issues people had around >>>>>>>>>>>>> intuitiveness >>>>>>>>>>>>> with the high-level api. The crazy blocking iterator, impossible >>>>>>>>>>>>> internal >>>>>>>>>>>>> queue sizing, baroque threading model, etc have all caused >>>>>>>>>>>>> endless amounts >>>>>>>>>>>>> of anger. Not to mention that that client effectively >>>>>>>>>>>>> disqualifies about >>>>>>>>>>>>> 50% of the use cases people want to try to use it for (plus I >>>>>>>>>>>>> regularly >>>>>>>>>>>>> hear people tell me they've heard not to use it at all for >>>>>>>>>>>>> various reasons >>>>>>>>>>>>> ranging from data loss to lack of features). It's important to >>>>>>>>>>>>> have that >>>>>>>>>>>>> context when people need to switch and they say "oh the old way >>>>>>>>>>>>> was so >>>>>>>>>>>>> simple and the new way complex!" :-) >>>>>>>>>>>>> >>>>>>>>>>>>> Let me give some context related to your points, based on our >>>>>>>>>>>>> previous discussions: >>>>>>>>>>>>> >>>>>>>>>>>>> For commit, let's discuss, that is easy either way. >>>>>>>>>>>>> >>>>>>>>>>>>> The motivation for avoiding additional threading was two-fold. >>>>>>>>>>>>> First this client is really intended to be the lowest level >>>>>>>>>>>>> client. There >>>>>>>>>>>>> are many, many possible higher level processing abstractions. One >>>>>>>>>>>>> thing we >>>>>>>>>>>>> found to be a big problem with the high-level client was that it >>>>>>>>>>>>> coupled >>>>>>>>>>>>> things everyone must have--failover, etc--with things that are >>>>>>>>>>>>> different in >>>>>>>>>>>>> each use case like the appropriate threading model. If you do >>>>>>>>>>>>> this you need >>>>>>>>>>>>> to also maintain a thread free low-level consumer api for people >>>>>>>>>>>>> to get >>>>>>>>>>>>> around whatever you have done. >>>>>>>>>>>>> >>>>>>>>>>>>> The second reason was that the internal threading in the >>>>>>>>>>>>> client became quite complex. The answer with threading is always >>>>>>>>>>>>> that "it >>>>>>>>>>>>> won't be complex this time", but it always is. >>>>>>>>>>>>> >>>>>>>>>>>>> For the heartbeat you correctly describe the downside to >>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the >>>>>>>>>>>>> application must >>>>>>>>>>>>> regularly consume to be considered an active consumer. This >>>>>>>>>>>>> allows the >>>>>>>>>>>>> possibility of false positive failure detections. However it's >>>>>>>>>>>>> important to >>>>>>>>>>>>> understand the downside of the alternative. If you do background >>>>>>>>>>>>> polling a >>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. This >>>>>>>>>>>>> leads to >>>>>>>>>>>>> all kinds of active consumers that aren't consuming because they >>>>>>>>>>>>> have >>>>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions and >>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. If >>>>>>>>>>>>> you allow >>>>>>>>>>>>> false positives the user sees the frequent rebalances and knows >>>>>>>>>>>>> they aren't >>>>>>>>>>>>> consuming frequently enough to be considered active but if you >>>>>>>>>>>>> allows false >>>>>>>>>>>>> negatives you end up having weeks go by before someone notices >>>>>>>>>>>>> that a >>>>>>>>>>>>> partition has been unconsumed the whole time at which point the >>>>>>>>>>>>> data is >>>>>>>>>>>>> gone. Plus of course even if you do this you still have regular >>>>>>>>>>>>> false >>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this in >>>>>>>>>>>>> some depth >>>>>>>>>>>>> at the time and decided that it is better to have the liveness >>>>>>>>>>>>> notion tied >>>>>>>>>>>>> to *actual* consumption which is the actual definition of >>>>>>>>>>>>> liveness. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> >>>>>>>>>>>>> -Jay >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman < >>>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Confluent Team. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There has recently been a lot of open source activity >>>>>>>>>>>>>> regarding the new KafkaConsumer: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>>>>>>>>>>> >>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza, >>>>>>>>>>>>>> and some other teams and we got similar feedback. >>>>>>>>>>>>>> >>>>>>>>>>>>>> To summarize the feedback we received from other teams: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The current behavior is not intuitive. For example, >>>>>>>>>>>>>> KafkaConsumer.poll drives everything. The other methods like >>>>>>>>>>>>>> subscribe, >>>>>>>>>>>>>> unsubscribe, seek, commit(async) don’t do anything without a >>>>>>>>>>>>>> KafkaConsumer.poll call. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The semantics of a commit() call should be consistent >>>>>>>>>>>>>> between sync and async operations. Currently, sync commit is >>>>>>>>>>>>>> a blocking >>>>>>>>>>>>>> call which actually sends out an OffsetCommitRequest and >>>>>>>>>>>>>> waits for the >>>>>>>>>>>>>> response upon the user’s KafkaConsumer.commit call. However, >>>>>>>>>>>>>> the async >>>>>>>>>>>>>> commit is a nonblocking call which just queues up the >>>>>>>>>>>>>> OffsetCommitRequest. >>>>>>>>>>>>>> The request itself is later sent out in the next poll. The >>>>>>>>>>>>>> teams we talked >>>>>>>>>>>>>> to found this misleading. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Heartbeats are dependent on user application behavior >>>>>>>>>>>>>> (i.e. user applications calling poll). This can be a big >>>>>>>>>>>>>> problem as we >>>>>>>>>>>>>> don’t control how different applications behave. For example, >>>>>>>>>>>>>> we might have >>>>>>>>>>>>>> an application which reads from Kafka and writes to Espresso. >>>>>>>>>>>>>> If Espresso >>>>>>>>>>>>>> is slow for whatever reason, then in rebalances could happen. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Generally speaking, we feel that the current KafkaConsumer >>>>>>>>>>>>>> API design is more of a wrapping around the old simple consumer, >>>>>>>>>>>>>> i.e. in >>>>>>>>>>>>>> old consumer we ask users to deal with raw protocols and error >>>>>>>>>>>>>> handlings >>>>>>>>>>>>>> while in KafkaConsumer we do that for users. However, for old >>>>>>>>>>>>>> high level >>>>>>>>>>>>>> consumer users (which are the majority of users), the experience >>>>>>>>>>>>>> is a >>>>>>>>>>>>>> noticeable regression. The old high level consumer interface is >>>>>>>>>>>>>> simple and >>>>>>>>>>>>>> easy to use for end user, while KafkaConsumer requires users to >>>>>>>>>>>>>> be aware of >>>>>>>>>>>>>> many underlying details and is becoming prohibitive for users to >>>>>>>>>>>>>> adopt. >>>>>>>>>>>>>> This is hinted by the javadoc growing bigger and bigger. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We think it's getting to the point where we should take a >>>>>>>>>>>>>> step back and look at the big picture. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The current state of KafkaConsumer is that it's >>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called by >>>>>>>>>>>>>> the user >>>>>>>>>>>>>> which pretty much drives everything: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - data fetches >>>>>>>>>>>>>> >>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>> >>>>>>>>>>>>>> - join groups (new consumer joining a group, topic >>>>>>>>>>>>>> subscription changes, reacting to group rebalance) >>>>>>>>>>>>>> >>>>>>>>>>>>>> - async offset commits >>>>>>>>>>>>>> >>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>> >>>>>>>>>>>>>> Given that the selector's poll is being driven by the end >>>>>>>>>>>>>> user, this ends up making us educate users on NIO and the >>>>>>>>>>>>>> consequences of >>>>>>>>>>>>>> not calling KafkaConsumer.poll frequently enough: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Coordinator will mark the consumer dead >>>>>>>>>>>>>> >>>>>>>>>>>>>> - async commits won't send >>>>>>>>>>>>>> >>>>>>>>>>>>>> - callbacks won't fire >>>>>>>>>>>>>> >>>>>>>>>>>>>> More generally speaking, there are many surprises with the >>>>>>>>>>>>>> current KafkaConsumer implementation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - NIO >>>>>>>>>>>>>> >>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume messages >>>>>>>>>>>>>> >>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or >>>>>>>>>>>>>> partitions(explicit group management) >>>>>>>>>>>>>> >>>>>>>>>>>>>> - no surprises in the user experience >>>>>>>>>>>>>> >>>>>>>>>>>>>> The last point is the big one that we think we aren't >>>>>>>>>>>>>> hitting. We think the most important example is that there >>>>>>>>>>>>>> should be no >>>>>>>>>>>>>> requirement from the end user to consistently KafkaConsumer.poll >>>>>>>>>>>>>> in order >>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be >>>>>>>>>>>>>> better to split >>>>>>>>>>>>>> those tasks into tasks that should not rely on >>>>>>>>>>>>>> KafkaConsumer.poll and tasks >>>>>>>>>>>>>> that should rely on KafkaConsumer.poll. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>> >>>>>>>>>>>>>> - join groups >>>>>>>>>>>>>> >>>>>>>>>>>>>> - commits >>>>>>>>>>>>>> >>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>> >>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll >>>>>>>>>>>>>> >>>>>>>>>>>>>> This would help reduce the amount of surprises to the end >>>>>>>>>>>>>> user. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to you >>>>>>>>>>>>>> guys early next week. We’d like to meet up with you at LinkedIn >>>>>>>>>>>>>> on *July >>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it to >>>>>>>>>>>>>> open source. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> LinkedIn Kafka Dev Team >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Neha >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Neha >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>>> >>>> >>>> -- >>>> Thanks, >>>> Ewen >>>> >>> >>> >> >> >> -- >> Thanks, >> Neha >> > > -- Thanks, Neha