I like all these ideas. Our convention is to keep method names declarative so it should probably be subscribe(List<String> topics, Callback c) assign(List<TopicPartition)
The javadoc would obviously have to clarify the relationship between a subscribed topic and assigned partitions. Presumably unsubscribe/unassign are unnecessary since this is just a matter of subscribing to the empty list. -Jay On Fri, Jul 31, 2015 at 11:29 AM, Jason Gustafson <ja...@confluent.io> wrote: > I was thinking a little bit this morning about the subscription API and I > have a few ideas on how to address some of the concerns about intuitiveness > and exception handling. > > 1. Split the current notion of topic/partition subscription into > subscription of topics and assignment of partitions. These concepts are > pretty fundamentally different and I think at least some of the confusion > about when subscriptions() can be used is caused by the fact that we > overload the term. If instead that method is renamed to assignment(), then > we are communicating to users that it is possible to have a subscription > without an active assignment, which is not obvious with the current API. > The code in fact already separates these concepts internally, so this would > just expose it to the user. > > 2. Merge rebalance callback into a subscription callback and add method a > way to handle errors. The consumer's current rebalance callback is > basically invoked when a subscription "succeeds," so it seems a little > weird to also provide a callback on subscription. Perhaps we can just take > the rebalance callback out of configuration and have the user provide it on > subscribe(). We can add a method to the callback to handle errors (e.g. for > non-existing topics). Since the callback is provided at subscribe time, it > should be clearer to the user that the assignment will not be ready > immediately when subscribe returns. It's also arguably a little more > natural to set this callback at subscription time rather than when the > consumer is constructed. > > 3. Get rid of the additive subscribe methods and just use setSubscription > which would clear the old subscription. After you start providing callbacks > to subscribe, then the implementation starts to get tricky if each call to > subscribe provides a separate callback. Instead, as Jay suggested, we could > just provide a way to set the full list of subscriptions at once, and then > there is only one callback to maintain. > > With these points, the API might look something like this: > > void setSubscription(List<String> topics, RebalanceCallback callback); > void setAssignment(List<TopicPartition> partitions); > List<String> subscription(); > List<TopicPartition> assignment(); > > interface RebalanceCallback { > void onAssignment(List<TopicPartition> partitions); > void onRevocation(List<TopicPartition> partitions); > > // handle non-existing topics, etc. > void onError(Exception e); > } > > Any thoughts? > > -Jason > > > > On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <j...@confluent.io> wrote: > >> Hey Becket, >> >> Yeah the high-level belief here is that it is possible to give something >> as high level as the existing "high level" consumer, but this is not likely >> to be the end-all be-all of high-level interfaces for processing streams of >> messages. For example neither of these interfaces handles the threading >> model for the processing, which obviously is a fairly low-level >> implementation detail left to the user in you proposal, the current code, >> as well as the existing scala consumer. >> >> There will be many of these: the full-fledged stream processing >> frameworks like Storm/Spark, scalaz streams, the RxJava stuff, a more >> traditional message queue like "processor" interface, not to mention the >> stuff we're trying to do with KIP-28. For these frameworks it will be quite >> weird to add a bunch of new threads since they will want to dictate the >> threading model. >> >> What will be a major failure though is if this client isn't low-level >> enough and we need to introduce another layer underneath. This would happen >> either because we dictate too much to make it usable for various >> applications, frameworks, or use cases. This is the concern with dictating >> threading and processing models. >> >> So to summarize the goal is to subsume the existing APIs, which I think >> we all agree this does, and be a foundation on which to build other >> abstractions. >> >> WRT KIP-28, I think it is quite general and if we do that right it will >> subsume a lot of the higher level processing and will give a full threaded >> processing model to the user. >> >> >> -Jay >> >> >> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <j...@linkedin.com> wrote: >> >>> Thanks for the comments Jason and Jay. >>> >>> Jason, I had the same concern for producer's callback as well before, >>> but it seems to be fine from some callbacks I wrote - user can always pass >>> in object in the constructor if necessary for synchronization. >>> >>> Jay, I agree that the current API might be fine for people who wants to >>> wrap it up. But I thought the new consumer was supposed to be a combination >>> of old high and low level consumer, which means it should be able to be >>> used as is, just like producer. If KafkaConsumer is designed to be wrapped >>> up for use, then the question becomes whether Kafka will provide a decent >>> wrapper or not? Neha mentioned that KIP-28 will address the users who only >>> care about data. Would that be the wrapper provided by Kafka? I am not sure >>> if that is sufficient though because the processor is highly abstracted, >>> and might only meet the static data stream requirement as I listed in the >>> grid. For users who need something from the other grids, are we going to >>> have another wrapper? Or are we expecting all the user to write their own >>> wrapper for KafkaConsumer? Some other comments are in line. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <j...@confluent.io> wrote: >>> >>>> Some comments on the proposal: >>>> >>>> I think we are conflating a number of things that should probably be >>>> addressed individually because they are unrelated. My past experience is >>>> that this always makes progress hard. The more we can pick apart these >>>> items the better: >>>> >>>> 1. threading model >>>> 2. blocking vs non-blocking semantics >>>> 3. missing apis >>>> 4. missing javadoc and other api surprises >>>> 5. Throwing exceptions. >>>> >>>> The missing APIs are getting added independently. Some like your >>>> proposed offsetByTime where things we agreed to hold off on for the first >>>> release and do when we'd thought it through. If there are uses for it now >>>> we can accelerate. I think each of these is really independent, we know >>>> there are things that need to be added but lumping them all into one >>>> discussion will be confusing. >>>> >>>> WRT throwing exceptions the policy is to throw exceptions that are >>>> unrecoverable and handle and log other exceptions that are transient. That >>>> policy makes sense if you go through the thought exercise of "what will the >>>> user do if i throw this exception to them" if they have no other rational >>>> response but to retry (and if failing to anticipate and retry with that >>>> exception will kill their program) . You can argue whether the topic not >>>> existing is transient or not, unfortunately the way we did auto-creation >>>> makes it transient if you are in "auto create mode" and non-transient >>>> otherwise (ick!). In any case this is an orthogonal discussion to >>>> everything else. I think the policy is right and if we don't conform to it >>>> in some way that is really an independent bug/discussion. >>>> >>> Agreed we can discuss about them separately. >>> >>>> >>>> I suggest we focus on threading and the current event-loop style of api >>>> design since I think that is really the crux. >>>> >>>> The analogy between the producer threading model and the consumer model >>>> actually doesn't work for me. The goal of the producer is actually to take >>>> requests from many many user threads and shove them into a single buffer >>>> for batching. So the threading model isn't the 1:1 threads you describe it >>>> is N:1.The goal of the consumer is to support single-threaded processing. >>>> This is what drives the difference. Saying that the producer has N:1 >>>> threads therefore for the consumer should have 1:1 threads instead of just >>>> 1 thread doesn't make sense any more then an analogy to the brokers >>>> threading model would--the problem we're solving is totally different. >>>> >>> I think the ultimate goal for producer and consumer are still allowing >>> user to send/receive data in parallel. In producer we picked the solution >>> of one-producer-serving-multiple-threads, and in consumer we picked >>> multiple-single-threaded-consumers instead of >>> single-consumer-serving-multiple threads. And we believe people can always >>> implement the latter with the former. I think this is a reasonable >>> decision. However, there are also reasonable concerns over the >>> multiple-single-threaded-consumers solution which is that the single-thread >>> might have to be a dedicate polling thread in many cases which pushes user >>> towards the other solution - i.e. implementing a >>> single-thread-consumer-serving-multiple-threads wrapper. From what we hear, >>> it seems to be a quite common concern for most of the users we talked to. >>> Plus the adoption bar of the consumer will be much higher because user will >>> have to understand some of the details of the things they don't care as >>> listed in the grid. >>> The analogy between producer/consumer is intended to show that a >>> separate polling thread will solve the concerns we have. >>> >>>> >>>> >>> I think ultimately though what you need to think about is, does an event >>>> loop style of API make sense? That is the source of all the issues you >>>> describe. This style of API is incredibly prevalent from unix select to >>>> GUIs to node.js. It's a great way to model multiple channels of messages >>>> coming in. It is a fantastic style for event processing. Programmers >>>> understand this style of api though I would agree it is unusual compared to >>>> blocking apis. But it is is a single threaded processing model. The current >>>> approach is basically a pure event loop with some convenience methods that >>>> are effectively "poll until X is complete". >>>> >>>> I think basically all the confusion you are describing comes from not >>>> documenting/expecting an event loop. The "if you don't call poll nothing >>>> happens" point is basically this. It's an event loop. You have to loop. You >>>> can't not call poll. The docs don't cover this right now, perhaps. I think >>>> if they do it's not unreasonable behavior. >>>> >>> I'm not sure if I understand the event-loop correctly and honestly I did >>> not think about it clearly before. My understanding is that an even-loop >>> model means a single listener thread, but there can be multiple event >>> generator threads. The downside is that the listener thread has to be fast >>> and very careful about blocking. If we look at the consumer, the current >>> model is the caller thread itself act as both event generator and listener. >>> As a generator, it generates different task by calling the convenience >>> methods. As a listener, it listens to the messages on broker and also the >>> tasks generated by itself. So in our proposal, we are not changing the >>> event-loop model here just separated the event generator and event >>> listener. It looks to me that the underlying execution thread follows the >>> event-loop model, the special thing might be it is not only listening to >>> the messages from broker, but also listening to the tasks from the user >>> thread. This is essentially the thing a consumer has to do - interact with >>> both server and user. >>> >>>> >>>> If we want to move away from an event loop I'm not sure *any* aspect >>>> of the current event loop style of api makes sense any more. I am not >>>> totally married to event loops, but i do think what we have gives an >>>> elegant way of implementing any higher level abstractions that would fully >>>> implement the user's parallelism model. I don't want to go rethink >>>> everything but I do think a half-way implementation that is event loop + >>>> background threads is likely going to be icky. >>>> >>> We brought this up before to change the consumer.poll() to >>> consumer.consume(). And did not do so simply because we wanted to less >>> change in API... I might be crazy but can we think of the proposed model as >>> processing thread + event-loop instead, rather than event-loop + background >>> thread? >>> >>>> >>>> WRT making it configurable whether liveness means "actually consuming" >>>> or "background thread running" I would suggest that that is really the >>>> worst outcome. These type of "modes" that are functionally totally >>>> different are just awful from a documentation, testing, usability, etc pov. >>>> I would strongly prefer we pick either of these, document it, and make it >>>> work well rather than trying to do both. >>>> >>> Previously I thought this was the major benefit we wanted from a single >>> threaded model, personally I don't have a strong preference on this. So I >>> am OK with either way. >>> >>>> >>>> -Jay >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io> >>>> wrote: >>>> >>>>> Works now. Thanks Becket! >>>>> >>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com> >>>>> wrote: >>>>> >>>>>> Ah... My bad, forgot to change the URL link for pictures. >>>>>> Thanks for the quick response, Neha. It should be fixed now, can you >>>>>> try again? >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images >>>>>>> that the wiki refers to, but none loaded for me. Just making sure if its >>>>>>> just me or can everyone not see the pictures? >>>>>>> >>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I agree with Ewen that a single threaded model will be tricky to >>>>>>>> implement the same conventional semantic of async or Future. We just >>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn on >>>>>>>> the >>>>>>>> new consumer API and threading model. >>>>>>>> >>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal >>>>>>>> >>>>>>>> We were trying to see: >>>>>>>> 1. If we can use some kind of methodology to help us think about >>>>>>>> what API we want to provide to user for different use cases. >>>>>>>> 2. What is the pros and cons of current single threaded model. Is >>>>>>>> there a way that we can maintain the benefits while solve the issues >>>>>>>> we are >>>>>>>> facing now with single threaded model. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava < >>>>>>>> e...@confluent.io> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along >>>>>>>>>> with returning future in the commit calls, i.e. something similar to: >>>>>>>>>> >>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback); >>>>>>>>>> >>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets, >>>>>>>>>> ConsumerCommitCallback callback); >>>>>>>>>> >>>>>>>>>> At that time I was slightly intending not to include the Future >>>>>>>>>> besides adding the callback mainly because of the implementation >>>>>>>>>> complexity >>>>>>>>>> I feel it could introduce along with the retry settings after looking >>>>>>>>>> through the code base. I would happy to change my mind if we could >>>>>>>>>> propose >>>>>>>>>> a prototype implementation that is simple enough. >>>>>>>>>> >>>>>>>>>> >>>>>>>>> One of the reasons that interface ended up being difficult (or >>>>>>>>> maybe impossible) to make work reasonably is because the consumer was >>>>>>>>> thread-safe at the time. That made it impossible to know what should >>>>>>>>> be >>>>>>>>> done when Future.get() is called -- should the implementation call >>>>>>>>> poll() >>>>>>>>> itself, or would the fact that the user is calling get() imply that >>>>>>>>> there's >>>>>>>>> a background thread running the poll() loop and we just need to wait >>>>>>>>> for it? >>>>>>>>> >>>>>>>>> The consumer is no longer thread safe, but I think the same >>>>>>>>> problem remains because the expectation with Futures is that they are >>>>>>>>> thread safe. Which means that even if the consumer isn't thread safe, >>>>>>>>> I >>>>>>>>> would expect to be able to hand that Future off to some other thread, >>>>>>>>> have >>>>>>>>> the second thread call get(), and then continue driving the poll loop >>>>>>>>> in my >>>>>>>>> thread (which in turn would eventually resolve the Future). >>>>>>>>> >>>>>>>>> I quite dislike the sync/async enum. While both operations commit >>>>>>>>> offsets, their semantics are so different that overloading a single >>>>>>>>> method >>>>>>>>> with both is messy. That said, I don't think we should consider this >>>>>>>>> an >>>>>>>>> inconsistency wrt the new producer API's use of Future because the >>>>>>>>> two APIs >>>>>>>>> have a much more fundamental difference that justifies it: they have >>>>>>>>> completely different threading and execution models. >>>>>>>>> >>>>>>>>> I think a Future-based API only makes sense if you can guarantee >>>>>>>>> the operations that Futures are waiting on will continue to make >>>>>>>>> progress >>>>>>>>> regardless of what the thread using the Future does. The producer API >>>>>>>>> makes >>>>>>>>> that work by processing asynchronous requests in a background thread. >>>>>>>>> The >>>>>>>>> new consumer does not, and so it becomes difficult/impossible to >>>>>>>>> implement >>>>>>>>> the Future correctly. (Or, you have to make assumptions which break >>>>>>>>> other >>>>>>>>> use cases; if you want to support the simple use case of just making a >>>>>>>>> commit() synchronous by calling get(), the Future has to call poll() >>>>>>>>> internally; but if you do that, then if any user ever wants to add >>>>>>>>> synchronization to the consumer via some external mechanism, then the >>>>>>>>> implementation of the Future's get() method will not be subject to >>>>>>>>> that >>>>>>>>> synchronization and things will break). >>>>>>>>> >>>>>>>>> -Ewen >>>>>>>>> >>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Hey Adi, >>>>>>>>>>> >>>>>>>>>>> When we designed the initial version, the producer API was still >>>>>>>>>>> changing. I thought about adding the Future and then just didn't >>>>>>>>>>> get to it. >>>>>>>>>>> I agree that we should look into adding it for consistency. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Neha >>>>>>>>>>> >>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar < >>>>>>>>>>> aaurad...@linkedin.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Great discussion everyone! >>>>>>>>>>>> >>>>>>>>>>>> One general comment on the sync/async API's on the new consumer. >>>>>>>>>>>> I think the producer tackles sync vs async API's well. For >>>>>>>>>>>> API's that can either be sync or async, can we simply return a >>>>>>>>>>>> future? That >>>>>>>>>>>> seems more elegant for the API's that make sense either in both >>>>>>>>>>>> flavors. >>>>>>>>>>>> From the users perspective, it is more consistent with the new >>>>>>>>>>>> producer. >>>>>>>>>>>> One easy example is the commit call with the CommitType enum.. we >>>>>>>>>>>> can make >>>>>>>>>>>> that call always async and users can block on the future if they >>>>>>>>>>>> want to >>>>>>>>>>>> make sure their offsets are committed. >>>>>>>>>>>> >>>>>>>>>>>> Aditya >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman < >>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the great responses, everyone! >>>>>>>>>>>>> >>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring up >>>>>>>>>>>>> old high level consumers, the teams we spoke to were actually not >>>>>>>>>>>>> the types >>>>>>>>>>>>> of services that simply wanted an easy way to get >>>>>>>>>>>>> ConsumerRecords. We spoke >>>>>>>>>>>>> to infrastructure teams that I would consider to be closer to the >>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's >>>>>>>>>>>>> level of >>>>>>>>>>>>> granularity. Some would use auto group management. Some would use >>>>>>>>>>>>> explicit >>>>>>>>>>>>> group management. All of them would turn off auto offset commits. >>>>>>>>>>>>> Yes, the >>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, but >>>>>>>>>>>>> this is >>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I >>>>>>>>>>>>> don't really >>>>>>>>>>>>> think the feedback received was about the simpler times or wanting >>>>>>>>>>>>> additional higher-level clients. >>>>>>>>>>>>> >>>>>>>>>>>>> - Onur >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson < >>>>>>>>>>>>> ja...@confluent.io> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we >>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since >>>>>>>>>>>>>> this generally >>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's >>>>>>>>>>>>>> currently >>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be >>>>>>>>>>>>>> nice to >>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown >>>>>>>>>>>>>> of a >>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the >>>>>>>>>>>>>> timeout to >>>>>>>>>>>>>> reassign partitions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Jason >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hey Kartik, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the >>>>>>>>>>>>>>> common case. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> However there are two ways to avoid this: >>>>>>>>>>>>>>> 1. Default the timeout high >>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> When we were doing the consumer design we discussed this >>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that >>>>>>>>>>>>>>> defaulting to a >>>>>>>>>>>>>>> high timeout was actually better. This means it takes a little >>>>>>>>>>>>>>> longer to >>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and >>>>>>>>>>>>>>> people who want >>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better >>>>>>>>>>>>>>> than having >>>>>>>>>>>>>>> the failure detection not really cover the consumption and just >>>>>>>>>>>>>>> be a >>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the >>>>>>>>>>>>>>> GC problem >>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some >>>>>>>>>>>>>>> sense a better >>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of problems >>>>>>>>>>>>>>> crop up when >>>>>>>>>>>>>>> you have an inactive consumer with an active background thread >>>>>>>>>>>>>>> (as today). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> When we had the discussion I think what we realized was that >>>>>>>>>>>>>>> most people who were worried about the timeout where imagining >>>>>>>>>>>>>>> a very low >>>>>>>>>>>>>>> default (500ms) say. But in fact just setting this to 60 >>>>>>>>>>>>>>> seconds or higher >>>>>>>>>>>>>>> as a default would be okay, this adds to the failure detection >>>>>>>>>>>>>>> time but >>>>>>>>>>>>>>> only apps that care about this need to tune. This should >>>>>>>>>>>>>>> largely eliminate >>>>>>>>>>>>>>> false positives since after all if you disappear for 60 seconds >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>> actually starts to be more of a true positive, even if you come >>>>>>>>>>>>>>> back... :-) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < >>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> adding the open source alias. This email started off as a >>>>>>>>>>>>>>>> broader discussion around the new consumer. I was zooming >>>>>>>>>>>>>>>> into only the >>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the >>>>>>>>>>>>>>>> heartbeats. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the >>>>>>>>>>>>>>>> problem). Monitoring the lag is important as it is the >>>>>>>>>>>>>>>> primary way to tell >>>>>>>>>>>>>>>> if the application is wedged. There might be other metrics >>>>>>>>>>>>>>>> which can >>>>>>>>>>>>>>>> possibly capture the same essence. Yes the lag is at the >>>>>>>>>>>>>>>> consumer group >>>>>>>>>>>>>>>> level, but you can tell that one of the consumers is messed up >>>>>>>>>>>>>>>> if one of >>>>>>>>>>>>>>>> the partitions in the application start generating lag and >>>>>>>>>>>>>>>> others are good >>>>>>>>>>>>>>>> for e.g. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is that >>>>>>>>>>>>>>>> in the old consumer most customers don't have to worry about >>>>>>>>>>>>>>>> unnecessary >>>>>>>>>>>>>>>> rebalances and most of the things that they do in their app >>>>>>>>>>>>>>>> doesn't have an >>>>>>>>>>>>>>>> impact on the session timeout.. (i.e. the only thing that >>>>>>>>>>>>>>>> causes >>>>>>>>>>>>>>>> rebalances is when the GC is out of whack). For the handful >>>>>>>>>>>>>>>> of customers >>>>>>>>>>>>>>>> who are impacted by GC related rebalances, i would imagine >>>>>>>>>>>>>>>> that all of them >>>>>>>>>>>>>>>> would really want us to make the system more resilient. I >>>>>>>>>>>>>>>> agree that the >>>>>>>>>>>>>>>> GC problem can't be solved easily in the java client, however >>>>>>>>>>>>>>>> it appears >>>>>>>>>>>>>>>> that now we would be expecting the consuming applications to >>>>>>>>>>>>>>>> be even more >>>>>>>>>>>>>>>> careful with ongoing tuning of the timeouts. At LinkedIn, we >>>>>>>>>>>>>>>> have seen >>>>>>>>>>>>>>>> that most kafka applications don't have much of a clue about >>>>>>>>>>>>>>>> configuring >>>>>>>>>>>>>>>> the timeouts and just end up calling the Kafka team when their >>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>> sees rebalances. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is >>>>>>>>>>>>>>>> that we have to make sure that people don't set a poll timeout >>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>> larger than the session timeout. If we had a notion of >>>>>>>>>>>>>>>> implicit >>>>>>>>>>>>>>>> heartbeats then we could also automatically make this work for >>>>>>>>>>>>>>>> consumers by >>>>>>>>>>>>>>>> sending hearbeats at the appropriate interval even though the >>>>>>>>>>>>>>>> customers >>>>>>>>>>>>>>>> want to do a long poll. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we >>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the >>>>>>>>>>>>>>>> consumer. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Would love to hear how other people think about this >>>>>>>>>>>>>>>> subject ? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Kartik >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede < >>>>>>>>>>>>>>>> n...@confluent.io> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is that >>>>>>>>>>>>>>>>> there are many ways the application's message processing >>>>>>>>>>>>>>>>> could fail and we >>>>>>>>>>>>>>>>> wouldn't be able to model all of those in the consumer's >>>>>>>>>>>>>>>>> failure detection >>>>>>>>>>>>>>>>> mechanism. So we should try to model as much of it as we can >>>>>>>>>>>>>>>>> so the >>>>>>>>>>>>>>>>> consumer's failure detection is meaningful. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect >>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag >>>>>>>>>>>>>>>>>> increases then for >>>>>>>>>>>>>>>>>> sure something is wrong. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the >>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept >>>>>>>>>>>>>>>>> and the problem >>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of >>>>>>>>>>>>>>>>> individual >>>>>>>>>>>>>>>>> consumer instances. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger >>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The >>>>>>>>>>>>>>>>> dilemma then is who >>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but >>>>>>>>>>>>>>>>> the application >>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for >>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for >>>>>>>>>>>>>>>>> the user to >>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too >>>>>>>>>>>>>>>>> often. The >>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be *exactly* >>>>>>>>>>>>>>>>> 99tile of processing latency but could be in the ballpark + >>>>>>>>>>>>>>>>> an error delta. >>>>>>>>>>>>>>>>> The error delta is the application owner's acceptable risk >>>>>>>>>>>>>>>>> threshold during >>>>>>>>>>>>>>>>> which they would be ok if the application remains part of the >>>>>>>>>>>>>>>>> group despite >>>>>>>>>>>>>>>>> being dead. It is ultimately a tradeoff between operational >>>>>>>>>>>>>>>>> ease and more >>>>>>>>>>>>>>>>> accurate failure detection. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range from >>>>>>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most >>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range >>>>>>>>>>>>>>>>> and there are >>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC falls >>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be predicted, >>>>>>>>>>>>>>>>> so has to be >>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to observe >>>>>>>>>>>>>>>>> what this >>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the >>>>>>>>>>>>>>>>> appropriate >>>>>>>>>>>>>>>>> timeouts. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Since this is not something that can be automated, this is >>>>>>>>>>>>>>>>> a config that the application owner has to set based on the >>>>>>>>>>>>>>>>> expected >>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads >>>>>>>>>>>>>>>>> to ending up >>>>>>>>>>>>>>>>> with bad consumption semantics where the application process >>>>>>>>>>>>>>>>> continues to >>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming since >>>>>>>>>>>>>>>>> it has halted >>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to >>>>>>>>>>>>>>>>> express that in >>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the >>>>>>>>>>>>>>>>> application owner >>>>>>>>>>>>>>>>> has to go through the process of measuring and then defining >>>>>>>>>>>>>>>>> this "max". >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The reverse where they don't do this and the application >>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing >>>>>>>>>>>>>>>>> and frustrating >>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as >>>>>>>>>>>>>>>>> long as the >>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it >>>>>>>>>>>>>>>>> should be easy >>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or >>>>>>>>>>>>>>>>> not and how >>>>>>>>>>>>>>>>> that should be tuned. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source >>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is >>>>>>>>>>>>>>>>> unblocked since you >>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Neha >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier >>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points >>>>>>>>>>>>>>>>>> you discuss are all very valid. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. >>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry apps >>>>>>>>>>>>>>>>>> that get hit >>>>>>>>>>>>>>>>>> by it. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are >>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also >>>>>>>>>>>>>>>>>> real. If the app >>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is >>>>>>>>>>>>>>>>>> healthy is surely >>>>>>>>>>>>>>>>>> higher. But this again isn't an absolute measure that the >>>>>>>>>>>>>>>>>> app is >>>>>>>>>>>>>>>>>> processing correctly. >>>>>>>>>>>>>>>>>> In other cases the app might have even died in which case >>>>>>>>>>>>>>>>>> this discussion is moot. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect >>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag >>>>>>>>>>>>>>>>>> increases then for >>>>>>>>>>>>>>>>>> sure something is wrong. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to >>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time >>>>>>>>>>>>>>>>>> they take to >>>>>>>>>>>>>>>>>> process events after every poll. This turns out is a >>>>>>>>>>>>>>>>>> nontrivial thing to >>>>>>>>>>>>>>>>>> do for an application todo. To start with when an >>>>>>>>>>>>>>>>>> application is new their >>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on >>>>>>>>>>>>>>>>>> synthetic data. >>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in >>>>>>>>>>>>>>>>>> production. Once >>>>>>>>>>>>>>>>>> the app is in production their processing latencies will >>>>>>>>>>>>>>>>>> potentially vary >>>>>>>>>>>>>>>>>> over time. It is extremely unlikely that the application >>>>>>>>>>>>>>>>>> owner does a >>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over time >>>>>>>>>>>>>>>>>> and readjust >>>>>>>>>>>>>>>>>> the settings. Often times the latencies vary because of >>>>>>>>>>>>>>>>>> variance is other >>>>>>>>>>>>>>>>>> services that are called by the consumer as part of >>>>>>>>>>>>>>>>>> processing the events. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events >>>>>>>>>>>>>>>>>> and writes to Kafka. With quotas the write latencies to >>>>>>>>>>>>>>>>>> kafka could range >>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds. >>>>>>>>>>>>>>>>>> As the scale of >>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' >>>>>>>>>>>>>>>>>> could now get >>>>>>>>>>>>>>>>>> quotaed. Instead of slowing down gracefully unless the >>>>>>>>>>>>>>>>>> application owner >>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a >>>>>>>>>>>>>>>>>> potential outage >>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us >>>>>>>>>>>>>>>>>> to take care of this in the linkedin wrapper. Whereby we >>>>>>>>>>>>>>>>>> would keep >>>>>>>>>>>>>>>>>> calling poll on a separate thread periodically and enqueue >>>>>>>>>>>>>>>>>> the messages. >>>>>>>>>>>>>>>>>> When the queue is full we would call pause(). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I >>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as >>>>>>>>>>>>>>>>>> every major >>>>>>>>>>>>>>>>>> customer will eventually be pained by it. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Kartik >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hey guys, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges >>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or >>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group >>>>>>>>>>>>>>>>>> management stuff >>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is >>>>>>>>>>>>>>>>>> already a big >>>>>>>>>>>>>>>>>> effort. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of >>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when people >>>>>>>>>>>>>>>>>> are surprised >>>>>>>>>>>>>>>>>> because we haven't labeled something that will be >>>>>>>>>>>>>>>>>> unintuitive. But the >>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in >>>>>>>>>>>>>>>>>> programming >>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond >>>>>>>>>>>>>>>>>> people if explained >>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if we >>>>>>>>>>>>>>>>>> don't say >>>>>>>>>>>>>>>>>> which is which any scheme will be confusing). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has >>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people had >>>>>>>>>>>>>>>>>> around >>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking >>>>>>>>>>>>>>>>>> iterator, >>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, >>>>>>>>>>>>>>>>>> etc have all >>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that >>>>>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people >>>>>>>>>>>>>>>>>> want to try to >>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've >>>>>>>>>>>>>>>>>> heard not to use >>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to lack >>>>>>>>>>>>>>>>>> of features). >>>>>>>>>>>>>>>>>> It's important to have that context when people need to >>>>>>>>>>>>>>>>>> switch and they say >>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Let me give some context related to your points, based on >>>>>>>>>>>>>>>>>> our previous discussions: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was >>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the >>>>>>>>>>>>>>>>>> lowest level >>>>>>>>>>>>>>>>>> client. There are many, many possible higher level >>>>>>>>>>>>>>>>>> processing abstractions. >>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level >>>>>>>>>>>>>>>>>> client was that >>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with >>>>>>>>>>>>>>>>>> things that are >>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading >>>>>>>>>>>>>>>>>> model. If you do >>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level >>>>>>>>>>>>>>>>>> consumer api for >>>>>>>>>>>>>>>>>> people to get around whatever you have done. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The second reason was that the internal threading in the >>>>>>>>>>>>>>>>>> client became quite complex. The answer with threading is >>>>>>>>>>>>>>>>>> always that "it >>>>>>>>>>>>>>>>>> won't be complex this time", but it always is. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to >>>>>>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the >>>>>>>>>>>>>>>>>> application must >>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. This >>>>>>>>>>>>>>>>>> allows the >>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However >>>>>>>>>>>>>>>>>> it's important to >>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do >>>>>>>>>>>>>>>>>> background polling a >>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. >>>>>>>>>>>>>>>>>> This leads to >>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because >>>>>>>>>>>>>>>>>> they have >>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming >>>>>>>>>>>>>>>>>> partitions and >>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. >>>>>>>>>>>>>>>>>> If you allow >>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and >>>>>>>>>>>>>>>>>> knows they aren't >>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if >>>>>>>>>>>>>>>>>> you allows false >>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone >>>>>>>>>>>>>>>>>> notices that a >>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which point >>>>>>>>>>>>>>>>>> the data is >>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have >>>>>>>>>>>>>>>>>> regular false >>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this >>>>>>>>>>>>>>>>>> in some depth >>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the >>>>>>>>>>>>>>>>>> liveness notion tied >>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition >>>>>>>>>>>>>>>>>> of liveness. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman < >>>>>>>>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Confluent Team. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity >>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, >>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The current behavior is not intuitive. For example, >>>>>>>>>>>>>>>>>>> KafkaConsumer.poll drives everything. The other methods >>>>>>>>>>>>>>>>>>> like subscribe, >>>>>>>>>>>>>>>>>>> unsubscribe, seek, commit(async) don’t do anything >>>>>>>>>>>>>>>>>>> without a >>>>>>>>>>>>>>>>>>> KafkaConsumer.poll call. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The semantics of a commit() call should be >>>>>>>>>>>>>>>>>>> consistent between sync and async operations. Currently, >>>>>>>>>>>>>>>>>>> sync commit is a >>>>>>>>>>>>>>>>>>> blocking call which actually sends out an >>>>>>>>>>>>>>>>>>> OffsetCommitRequest and waits for >>>>>>>>>>>>>>>>>>> the response upon the user’s KafkaConsumer.commit call. >>>>>>>>>>>>>>>>>>> However, the async >>>>>>>>>>>>>>>>>>> commit is a nonblocking call which just queues up the >>>>>>>>>>>>>>>>>>> OffsetCommitRequest. >>>>>>>>>>>>>>>>>>> The request itself is later sent out in the next poll. >>>>>>>>>>>>>>>>>>> The teams we talked >>>>>>>>>>>>>>>>>>> to found this misleading. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Heartbeats are dependent on user application >>>>>>>>>>>>>>>>>>> behavior (i.e. user applications calling poll). This can >>>>>>>>>>>>>>>>>>> be a big problem >>>>>>>>>>>>>>>>>>> as we don’t control how different applications behave. >>>>>>>>>>>>>>>>>>> For example, we >>>>>>>>>>>>>>>>>>> might have an application which reads from Kafka and >>>>>>>>>>>>>>>>>>> writes to Espresso. If >>>>>>>>>>>>>>>>>>> Espresso is slow for whatever reason, then in rebalances >>>>>>>>>>>>>>>>>>> could happen. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current >>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the >>>>>>>>>>>>>>>>>>> old simple >>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with >>>>>>>>>>>>>>>>>>> raw protocols and >>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for >>>>>>>>>>>>>>>>>>> users. However, for >>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of >>>>>>>>>>>>>>>>>>> users), the >>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level >>>>>>>>>>>>>>>>>>> consumer >>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while >>>>>>>>>>>>>>>>>>> KafkaConsumer >>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and >>>>>>>>>>>>>>>>>>> is becoming >>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the >>>>>>>>>>>>>>>>>>> javadoc growing >>>>>>>>>>>>>>>>>>> bigger and bigger. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should take >>>>>>>>>>>>>>>>>>> a step back and look at the big picture. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's >>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called >>>>>>>>>>>>>>>>>>> by the user >>>>>>>>>>>>>>>>>>> which pretty much drives everything: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - data fetches >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic >>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - async offset commits >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the >>>>>>>>>>>>>>>>>>> end user, this ends up making us educate users on NIO and >>>>>>>>>>>>>>>>>>> the consequences >>>>>>>>>>>>>>>>>>> of not calling KafkaConsumer.poll frequently enough: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - async commits won't send >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - callbacks won't fire >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with >>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - NIO >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume >>>>>>>>>>>>>>>>>>> messages >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or >>>>>>>>>>>>>>>>>>> partitions(explicit group management) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - no surprises in the user experience >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't >>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that there >>>>>>>>>>>>>>>>>>> should be no >>>>>>>>>>>>>>>>>>> requirement from the end user to consistently >>>>>>>>>>>>>>>>>>> KafkaConsumer.poll in order >>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be >>>>>>>>>>>>>>>>>>> better to split >>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on >>>>>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks >>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - join groups >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - commits >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the >>>>>>>>>>>>>>>>>>> end user. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to >>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you at >>>>>>>>>>>>>>>>>>> LinkedIn on *July >>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it >>>>>>>>>>>>>>>>>>> to open source. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Neha >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Thanks, >>>>>>>>>>> Neha >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Thanks, >>>>>>>>> Ewen >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks, >>>>>>> Neha >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Neha >>>>> >>>> >>>> >>> >> >