No the purpose of pause/resume was to be able to implement prioritization of processing by partition. i.e. if you want to prioritize a given subset of partitions without rebalancing them to another consumer you pause the others and continue reading.
-Jay On Fri, Jul 31, 2015 at 4:55 PM, Jun Rao <j...@confluent.io> wrote: > Jason, > > I guess that with the new setAssignment() api, we will also be getting > rid of pause() and resume()? > > Thanks, > > Jun > > On Fri, Jul 31, 2015 at 11:29 AM, Jason Gustafson <ja...@confluent.io> > wrote: > >> I was thinking a little bit this morning about the subscription API and I >> have a few ideas on how to address some of the concerns about intuitiveness >> and exception handling. >> >> 1. Split the current notion of topic/partition subscription into >> subscription of topics and assignment of partitions. These concepts are >> pretty fundamentally different and I think at least some of the confusion >> about when subscriptions() can be used is caused by the fact that we >> overload the term. If instead that method is renamed to assignment(), then >> we are communicating to users that it is possible to have a subscription >> without an active assignment, which is not obvious with the current API. >> The code in fact already separates these concepts internally, so this would >> just expose it to the user. >> >> 2. Merge rebalance callback into a subscription callback and add method a >> way to handle errors. The consumer's current rebalance callback is >> basically invoked when a subscription "succeeds," so it seems a little >> weird to also provide a callback on subscription. Perhaps we can just take >> the rebalance callback out of configuration and have the user provide it on >> subscribe(). We can add a method to the callback to handle errors (e.g. for >> non-existing topics). Since the callback is provided at subscribe time, it >> should be clearer to the user that the assignment will not be ready >> immediately when subscribe returns. It's also arguably a little more >> natural to set this callback at subscription time rather than when the >> consumer is constructed. >> >> 3. Get rid of the additive subscribe methods and just use setSubscription >> which would clear the old subscription. After you start providing callbacks >> to subscribe, then the implementation starts to get tricky if each call to >> subscribe provides a separate callback. Instead, as Jay suggested, we could >> just provide a way to set the full list of subscriptions at once, and then >> there is only one callback to maintain. >> >> With these points, the API might look something like this: >> >> void setSubscription(List<String> topics, RebalanceCallback callback); >> void setAssignment(List<TopicPartition> partitions); >> List<String> subscription(); >> List<TopicPartition> assignment(); >> >> interface RebalanceCallback { >> void onAssignment(List<TopicPartition> partitions); >> void onRevocation(List<TopicPartition> partitions); >> >> // handle non-existing topics, etc. >> void onError(Exception e); >> } >> >> Any thoughts? >> >> -Jason >> >> >> >> On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <j...@confluent.io> wrote: >> >>> Hey Becket, >>> >>> Yeah the high-level belief here is that it is possible to give something >>> as high level as the existing "high level" consumer, but this is not likely >>> to be the end-all be-all of high-level interfaces for processing streams of >>> messages. For example neither of these interfaces handles the threading >>> model for the processing, which obviously is a fairly low-level >>> implementation detail left to the user in you proposal, the current code, >>> as well as the existing scala consumer. >>> >>> There will be many of these: the full-fledged stream processing >>> frameworks like Storm/Spark, scalaz streams, the RxJava stuff, a more >>> traditional message queue like "processor" interface, not to mention the >>> stuff we're trying to do with KIP-28. For these frameworks it will be quite >>> weird to add a bunch of new threads since they will want to dictate the >>> threading model. >>> >>> What will be a major failure though is if this client isn't low-level >>> enough and we need to introduce another layer underneath. This would happen >>> either because we dictate too much to make it usable for various >>> applications, frameworks, or use cases. This is the concern with dictating >>> threading and processing models. >>> >>> So to summarize the goal is to subsume the existing APIs, which I think >>> we all agree this does, and be a foundation on which to build other >>> abstractions. >>> >>> WRT KIP-28, I think it is quite general and if we do that right it will >>> subsume a lot of the higher level processing and will give a full threaded >>> processing model to the user. >>> >>> >>> -Jay >>> >>> >>> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <j...@linkedin.com> wrote: >>> >>>> Thanks for the comments Jason and Jay. >>>> >>>> Jason, I had the same concern for producer's callback as well before, >>>> but it seems to be fine from some callbacks I wrote - user can always pass >>>> in object in the constructor if necessary for synchronization. >>>> >>>> Jay, I agree that the current API might be fine for people who wants to >>>> wrap it up. But I thought the new consumer was supposed to be a combination >>>> of old high and low level consumer, which means it should be able to be >>>> used as is, just like producer. If KafkaConsumer is designed to be wrapped >>>> up for use, then the question becomes whether Kafka will provide a decent >>>> wrapper or not? Neha mentioned that KIP-28 will address the users who only >>>> care about data. Would that be the wrapper provided by Kafka? I am not sure >>>> if that is sufficient though because the processor is highly abstracted, >>>> and might only meet the static data stream requirement as I listed in the >>>> grid. For users who need something from the other grids, are we going to >>>> have another wrapper? Or are we expecting all the user to write their own >>>> wrapper for KafkaConsumer? Some other comments are in line. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <j...@confluent.io> wrote: >>>> >>>>> Some comments on the proposal: >>>>> >>>>> I think we are conflating a number of things that should probably be >>>>> addressed individually because they are unrelated. My past experience is >>>>> that this always makes progress hard. The more we can pick apart these >>>>> items the better: >>>>> >>>>> 1. threading model >>>>> 2. blocking vs non-blocking semantics >>>>> 3. missing apis >>>>> 4. missing javadoc and other api surprises >>>>> 5. Throwing exceptions. >>>>> >>>>> The missing APIs are getting added independently. Some like your >>>>> proposed offsetByTime where things we agreed to hold off on for the first >>>>> release and do when we'd thought it through. If there are uses for it now >>>>> we can accelerate. I think each of these is really independent, we know >>>>> there are things that need to be added but lumping them all into one >>>>> discussion will be confusing. >>>>> >>>>> WRT throwing exceptions the policy is to throw exceptions that are >>>>> unrecoverable and handle and log other exceptions that are transient. That >>>>> policy makes sense if you go through the thought exercise of "what will >>>>> the >>>>> user do if i throw this exception to them" if they have no other rational >>>>> response but to retry (and if failing to anticipate and retry with that >>>>> exception will kill their program) . You can argue whether the topic not >>>>> existing is transient or not, unfortunately the way we did auto-creation >>>>> makes it transient if you are in "auto create mode" and non-transient >>>>> otherwise (ick!). In any case this is an orthogonal discussion to >>>>> everything else. I think the policy is right and if we don't conform to it >>>>> in some way that is really an independent bug/discussion. >>>>> >>>> Agreed we can discuss about them separately. >>>> >>>>> >>>>> I suggest we focus on threading and the current event-loop style of >>>>> api design since I think that is really the crux. >>>>> >>>>> The analogy between the producer threading model and the consumer >>>>> model actually doesn't work for me. The goal of the producer is actually >>>>> to >>>>> take requests from many many user threads and shove them into a single >>>>> buffer for batching. So the threading model isn't the 1:1 threads you >>>>> describe it is N:1.The goal of the consumer is to support single-threaded >>>>> processing. This is what drives the difference. Saying that the producer >>>>> has N:1 threads therefore for the consumer should have 1:1 threads instead >>>>> of just 1 thread doesn't make sense any more then an analogy to the >>>>> brokers >>>>> threading model would--the problem we're solving is totally different. >>>>> >>>> I think the ultimate goal for producer and consumer are still allowing >>>> user to send/receive data in parallel. In producer we picked the solution >>>> of one-producer-serving-multiple-threads, and in consumer we picked >>>> multiple-single-threaded-consumers instead of >>>> single-consumer-serving-multiple threads. And we believe people can always >>>> implement the latter with the former. I think this is a reasonable >>>> decision. However, there are also reasonable concerns over the >>>> multiple-single-threaded-consumers solution which is that the single-thread >>>> might have to be a dedicate polling thread in many cases which pushes user >>>> towards the other solution - i.e. implementing a >>>> single-thread-consumer-serving-multiple-threads wrapper. From what we hear, >>>> it seems to be a quite common concern for most of the users we talked to. >>>> Plus the adoption bar of the consumer will be much higher because user will >>>> have to understand some of the details of the things they don't care as >>>> listed in the grid. >>>> The analogy between producer/consumer is intended to show that a >>>> separate polling thread will solve the concerns we have. >>>> >>>>> >>>>> >>>> I think ultimately though what you need to think about is, does an >>>>> event loop style of API make sense? That is the source of all the issues >>>>> you describe. This style of API is incredibly prevalent from unix select >>>>> to >>>>> GUIs to node.js. It's a great way to model multiple channels of messages >>>>> coming in. It is a fantastic style for event processing. Programmers >>>>> understand this style of api though I would agree it is unusual compared >>>>> to >>>>> blocking apis. But it is is a single threaded processing model. The >>>>> current >>>>> approach is basically a pure event loop with some convenience methods that >>>>> are effectively "poll until X is complete". >>>>> >>>>> I think basically all the confusion you are describing comes from not >>>>> documenting/expecting an event loop. The "if you don't call poll nothing >>>>> happens" point is basically this. It's an event loop. You have to loop. >>>>> You >>>>> can't not call poll. The docs don't cover this right now, perhaps. I think >>>>> if they do it's not unreasonable behavior. >>>>> >>>> I'm not sure if I understand the event-loop correctly and honestly I >>>> did not think about it clearly before. My understanding is that an >>>> even-loop model means a single listener thread, but there can be multiple >>>> event generator threads. The downside is that the listener thread has to be >>>> fast and very careful about blocking. If we look at the consumer, the >>>> current model is the caller thread itself act as both event generator and >>>> listener. As a generator, it generates different task by calling the >>>> convenience methods. As a listener, it listens to the messages on broker >>>> and also the tasks generated by itself. So in our proposal, we are not >>>> changing the event-loop model here just separated the event generator and >>>> event listener. It looks to me that the underlying execution thread follows >>>> the event-loop model, the special thing might be it is not only listening >>>> to the messages from broker, but also listening to the tasks from the user >>>> thread. This is essentially the thing a consumer has to do - interact with >>>> both server and user. >>>> >>>>> >>>>> If we want to move away from an event loop I'm not sure *any* aspect >>>>> of the current event loop style of api makes sense any more. I am not >>>>> totally married to event loops, but i do think what we have gives an >>>>> elegant way of implementing any higher level abstractions that would fully >>>>> implement the user's parallelism model. I don't want to go rethink >>>>> everything but I do think a half-way implementation that is event loop + >>>>> background threads is likely going to be icky. >>>>> >>>> We brought this up before to change the consumer.poll() to >>>> consumer.consume(). And did not do so simply because we wanted to less >>>> change in API... I might be crazy but can we think of the proposed model as >>>> processing thread + event-loop instead, rather than event-loop + background >>>> thread? >>>> >>>>> >>>>> WRT making it configurable whether liveness means "actually consuming" >>>>> or "background thread running" I would suggest that that is really the >>>>> worst outcome. These type of "modes" that are functionally totally >>>>> different are just awful from a documentation, testing, usability, etc >>>>> pov. >>>>> I would strongly prefer we pick either of these, document it, and make it >>>>> work well rather than trying to do both. >>>>> >>>> Previously I thought this was the major benefit we wanted from a single >>>> threaded model, personally I don't have a strong preference on this. So I >>>> am OK with either way. >>>> >>>>> >>>>> -Jay >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io> >>>>> wrote: >>>>> >>>>>> Works now. Thanks Becket! >>>>>> >>>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com> >>>>>> wrote: >>>>>> >>>>>>> Ah... My bad, forgot to change the URL link for pictures. >>>>>>> Thanks for the quick response, Neha. It should be fixed now, can you >>>>>>> try again? >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images >>>>>>>> that the wiki refers to, but none loaded for me. Just making sure if >>>>>>>> its >>>>>>>> just me or can everyone not see the pictures? >>>>>>>> >>>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I agree with Ewen that a single threaded model will be tricky to >>>>>>>>> implement the same conventional semantic of async or Future. We just >>>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn on >>>>>>>>> the >>>>>>>>> new consumer API and threading model. >>>>>>>>> >>>>>>>>> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal >>>>>>>>> >>>>>>>>> We were trying to see: >>>>>>>>> 1. If we can use some kind of methodology to help us think about >>>>>>>>> what API we want to provide to user for different use cases. >>>>>>>>> 2. What is the pros and cons of current single threaded model. Is >>>>>>>>> there a way that we can maintain the benefits while solve the issues >>>>>>>>> we are >>>>>>>>> facing now with single threaded model. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava < >>>>>>>>> e...@confluent.io> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang < >>>>>>>>>> wangg...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along >>>>>>>>>>> with returning future in the commit calls, i.e. something similar >>>>>>>>>>> to: >>>>>>>>>>> >>>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback); >>>>>>>>>>> >>>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets, >>>>>>>>>>> ConsumerCommitCallback callback); >>>>>>>>>>> >>>>>>>>>>> At that time I was slightly intending not to include the Future >>>>>>>>>>> besides adding the callback mainly because of the implementation >>>>>>>>>>> complexity >>>>>>>>>>> I feel it could introduce along with the retry settings after >>>>>>>>>>> looking >>>>>>>>>>> through the code base. I would happy to change my mind if we could >>>>>>>>>>> propose >>>>>>>>>>> a prototype implementation that is simple enough. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> One of the reasons that interface ended up being difficult (or >>>>>>>>>> maybe impossible) to make work reasonably is because the consumer was >>>>>>>>>> thread-safe at the time. That made it impossible to know what should >>>>>>>>>> be >>>>>>>>>> done when Future.get() is called -- should the implementation call >>>>>>>>>> poll() >>>>>>>>>> itself, or would the fact that the user is calling get() imply that >>>>>>>>>> there's >>>>>>>>>> a background thread running the poll() loop and we just need to wait >>>>>>>>>> for it? >>>>>>>>>> >>>>>>>>>> The consumer is no longer thread safe, but I think the same >>>>>>>>>> problem remains because the expectation with Futures is that they are >>>>>>>>>> thread safe. Which means that even if the consumer isn't thread >>>>>>>>>> safe, I >>>>>>>>>> would expect to be able to hand that Future off to some other >>>>>>>>>> thread, have >>>>>>>>>> the second thread call get(), and then continue driving the poll >>>>>>>>>> loop in my >>>>>>>>>> thread (which in turn would eventually resolve the Future). >>>>>>>>>> >>>>>>>>>> I quite dislike the sync/async enum. While both operations commit >>>>>>>>>> offsets, their semantics are so different that overloading a single >>>>>>>>>> method >>>>>>>>>> with both is messy. That said, I don't think we should consider this >>>>>>>>>> an >>>>>>>>>> inconsistency wrt the new producer API's use of Future because the >>>>>>>>>> two APIs >>>>>>>>>> have a much more fundamental difference that justifies it: they have >>>>>>>>>> completely different threading and execution models. >>>>>>>>>> >>>>>>>>>> I think a Future-based API only makes sense if you can guarantee >>>>>>>>>> the operations that Futures are waiting on will continue to make >>>>>>>>>> progress >>>>>>>>>> regardless of what the thread using the Future does. The producer >>>>>>>>>> API makes >>>>>>>>>> that work by processing asynchronous requests in a background >>>>>>>>>> thread. The >>>>>>>>>> new consumer does not, and so it becomes difficult/impossible to >>>>>>>>>> implement >>>>>>>>>> the Future correctly. (Or, you have to make assumptions which break >>>>>>>>>> other >>>>>>>>>> use cases; if you want to support the simple use case of just making >>>>>>>>>> a >>>>>>>>>> commit() synchronous by calling get(), the Future has to call poll() >>>>>>>>>> internally; but if you do that, then if any user ever wants to add >>>>>>>>>> synchronization to the consumer via some external mechanism, then the >>>>>>>>>> implementation of the Future's get() method will not be subject to >>>>>>>>>> that >>>>>>>>>> synchronization and things will break). >>>>>>>>>> >>>>>>>>>> -Ewen >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede < >>>>>>>>>>> n...@confluent.io> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey Adi, >>>>>>>>>>>> >>>>>>>>>>>> When we designed the initial version, the producer API was >>>>>>>>>>>> still changing. I thought about adding the Future and then just >>>>>>>>>>>> didn't get >>>>>>>>>>>> to it. I agree that we should look into adding it for consistency. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Neha >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar < >>>>>>>>>>>> aaurad...@linkedin.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Great discussion everyone! >>>>>>>>>>>>> >>>>>>>>>>>>> One general comment on the sync/async API's on the new >>>>>>>>>>>>> consumer. I think the producer tackles sync vs async API's >>>>>>>>>>>>> well. For API's that can either be sync or async, can we simply >>>>>>>>>>>>> return a >>>>>>>>>>>>> future? That seems more elegant for the API's that make sense >>>>>>>>>>>>> either in >>>>>>>>>>>>> both flavors. From the users perspective, it is more consistent >>>>>>>>>>>>> with the >>>>>>>>>>>>> new producer. One easy example is the commit call with the >>>>>>>>>>>>> CommitType >>>>>>>>>>>>> enum.. we can make that call always async and users can block on >>>>>>>>>>>>> the future >>>>>>>>>>>>> if they want to make sure their offsets are committed. >>>>>>>>>>>>> >>>>>>>>>>>>> Aditya >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman < >>>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the great responses, everyone! >>>>>>>>>>>>>> >>>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring up >>>>>>>>>>>>>> old high level consumers, the teams we spoke to were actually >>>>>>>>>>>>>> not the types >>>>>>>>>>>>>> of services that simply wanted an easy way to get >>>>>>>>>>>>>> ConsumerRecords. We spoke >>>>>>>>>>>>>> to infrastructure teams that I would consider to be closer to the >>>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's >>>>>>>>>>>>>> level of >>>>>>>>>>>>>> granularity. Some would use auto group management. Some would >>>>>>>>>>>>>> use explicit >>>>>>>>>>>>>> group management. All of them would turn off auto offset >>>>>>>>>>>>>> commits. Yes, the >>>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, but >>>>>>>>>>>>>> this is >>>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I >>>>>>>>>>>>>> don't really >>>>>>>>>>>>>> think the feedback received was about the simpler times or >>>>>>>>>>>>>> wanting >>>>>>>>>>>>>> additional higher-level clients. >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Onur >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson < >>>>>>>>>>>>>> ja...@confluent.io> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we >>>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since >>>>>>>>>>>>>>> this generally >>>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's >>>>>>>>>>>>>>> currently >>>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also >>>>>>>>>>>>>>> be nice to >>>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown >>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the >>>>>>>>>>>>>>> timeout to >>>>>>>>>>>>>>> reassign partitions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Jason >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io >>>>>>>>>>>>>>> > wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hey Kartik, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the >>>>>>>>>>>>>>>> common case. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> However there are two ways to avoid this: >>>>>>>>>>>>>>>> 1. Default the timeout high >>>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> When we were doing the consumer design we discussed this >>>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that >>>>>>>>>>>>>>>> defaulting to a >>>>>>>>>>>>>>>> high timeout was actually better. This means it takes a little >>>>>>>>>>>>>>>> longer to >>>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and >>>>>>>>>>>>>>>> people who want >>>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better >>>>>>>>>>>>>>>> than having >>>>>>>>>>>>>>>> the failure detection not really cover the consumption and >>>>>>>>>>>>>>>> just be a >>>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the >>>>>>>>>>>>>>>> GC problem >>>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some >>>>>>>>>>>>>>>> sense a better >>>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of problems >>>>>>>>>>>>>>>> crop up when >>>>>>>>>>>>>>>> you have an inactive consumer with an active background thread >>>>>>>>>>>>>>>> (as today). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> When we had the discussion I think what we realized was >>>>>>>>>>>>>>>> that most people who were worried about the timeout where >>>>>>>>>>>>>>>> imagining a very >>>>>>>>>>>>>>>> low default (500ms) say. But in fact just setting this to 60 >>>>>>>>>>>>>>>> seconds or >>>>>>>>>>>>>>>> higher as a default would be okay, this adds to the failure >>>>>>>>>>>>>>>> detection time >>>>>>>>>>>>>>>> but only apps that care about this need to tune. This should >>>>>>>>>>>>>>>> largely >>>>>>>>>>>>>>>> eliminate false positives since after all if you disappear for >>>>>>>>>>>>>>>> 60 seconds >>>>>>>>>>>>>>>> that actually starts to be more of a true positive, even if >>>>>>>>>>>>>>>> you come >>>>>>>>>>>>>>>> back... :-) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < >>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> adding the open source alias. This email started off as a >>>>>>>>>>>>>>>>> broader discussion around the new consumer. I was zooming >>>>>>>>>>>>>>>>> into only the >>>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the >>>>>>>>>>>>>>>>> heartbeats. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the >>>>>>>>>>>>>>>>> problem). Monitoring the lag is important as it is the >>>>>>>>>>>>>>>>> primary way to tell >>>>>>>>>>>>>>>>> if the application is wedged. There might be other metrics >>>>>>>>>>>>>>>>> which can >>>>>>>>>>>>>>>>> possibly capture the same essence. Yes the lag is at the >>>>>>>>>>>>>>>>> consumer group >>>>>>>>>>>>>>>>> level, but you can tell that one of the consumers is messed >>>>>>>>>>>>>>>>> up if one of >>>>>>>>>>>>>>>>> the partitions in the application start generating lag and >>>>>>>>>>>>>>>>> others are good >>>>>>>>>>>>>>>>> for e.g. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is >>>>>>>>>>>>>>>>> that in the old consumer most customers don't have to worry >>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>> unnecessary rebalances and most of the things that they do in >>>>>>>>>>>>>>>>> their app >>>>>>>>>>>>>>>>> doesn't have an impact on the session timeout.. (i.e. the >>>>>>>>>>>>>>>>> only thing that >>>>>>>>>>>>>>>>> causes rebalances is when the GC is out of whack). For the >>>>>>>>>>>>>>>>> handful of >>>>>>>>>>>>>>>>> customers who are impacted by GC related rebalances, i would >>>>>>>>>>>>>>>>> imagine that >>>>>>>>>>>>>>>>> all of them would really want us to make the system more >>>>>>>>>>>>>>>>> resilient. I >>>>>>>>>>>>>>>>> agree that the GC problem can't be solved easily in the java >>>>>>>>>>>>>>>>> client, >>>>>>>>>>>>>>>>> however it appears that now we would be expecting the >>>>>>>>>>>>>>>>> consuming >>>>>>>>>>>>>>>>> applications to be even more careful with ongoing tuning of >>>>>>>>>>>>>>>>> the timeouts. >>>>>>>>>>>>>>>>> At LinkedIn, we have seen that most kafka applications don't >>>>>>>>>>>>>>>>> have much of a >>>>>>>>>>>>>>>>> clue about configuring the timeouts and just end up calling >>>>>>>>>>>>>>>>> the Kafka team >>>>>>>>>>>>>>>>> when their application sees rebalances. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is >>>>>>>>>>>>>>>>> that we have to make sure that people don't set a poll >>>>>>>>>>>>>>>>> timeout that is >>>>>>>>>>>>>>>>> larger than the session timeout. If we had a notion of >>>>>>>>>>>>>>>>> implicit >>>>>>>>>>>>>>>>> heartbeats then we could also automatically make this work >>>>>>>>>>>>>>>>> for consumers by >>>>>>>>>>>>>>>>> sending hearbeats at the appropriate interval even though the >>>>>>>>>>>>>>>>> customers >>>>>>>>>>>>>>>>> want to do a long poll. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we >>>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the >>>>>>>>>>>>>>>>> consumer. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Would love to hear how other people think about this >>>>>>>>>>>>>>>>> subject ? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> Kartik >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede < >>>>>>>>>>>>>>>>> n...@confluent.io> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is >>>>>>>>>>>>>>>>>> that there are many ways the application's message >>>>>>>>>>>>>>>>>> processing could fail >>>>>>>>>>>>>>>>>> and we wouldn't be able to model all of those in the >>>>>>>>>>>>>>>>>> consumer's failure >>>>>>>>>>>>>>>>>> detection mechanism. So we should try to model as much of it >>>>>>>>>>>>>>>>>> as we can so >>>>>>>>>>>>>>>>>> the consumer's failure detection is meaningful. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect >>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag >>>>>>>>>>>>>>>>>>> increases then for >>>>>>>>>>>>>>>>>>> sure something is wrong. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the >>>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept >>>>>>>>>>>>>>>>>> and the problem >>>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of >>>>>>>>>>>>>>>>>> individual >>>>>>>>>>>>>>>>>> consumer instances. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger >>>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The >>>>>>>>>>>>>>>>>> dilemma then is who >>>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but >>>>>>>>>>>>>>>>>> the application >>>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for >>>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for >>>>>>>>>>>>>>>>>> the user to >>>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too >>>>>>>>>>>>>>>>>> often. The >>>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be >>>>>>>>>>>>>>>>>> *exactly* 99tile of processing latency but could be in >>>>>>>>>>>>>>>>>> the ballpark + an error delta. The error delta is the >>>>>>>>>>>>>>>>>> application owner's >>>>>>>>>>>>>>>>>> acceptable risk threshold during which they would be ok if >>>>>>>>>>>>>>>>>> the application >>>>>>>>>>>>>>>>>> remains part of the group despite being dead. It is >>>>>>>>>>>>>>>>>> ultimately a tradeoff >>>>>>>>>>>>>>>>>> between operational ease and more accurate failure detection. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range from >>>>>>>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most >>>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range >>>>>>>>>>>>>>>>>> and there are >>>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC >>>>>>>>>>>>>>>>>> falls in the >>>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be >>>>>>>>>>>>>>>>>> predicted, so has to be >>>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to observe >>>>>>>>>>>>>>>>>> what this >>>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the >>>>>>>>>>>>>>>>>> appropriate >>>>>>>>>>>>>>>>>> timeouts. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Since this is not something that can be automated, this >>>>>>>>>>>>>>>>>> is a config that the application owner has to set based on >>>>>>>>>>>>>>>>>> the expected >>>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads >>>>>>>>>>>>>>>>>> to ending up >>>>>>>>>>>>>>>>>> with bad consumption semantics where the application process >>>>>>>>>>>>>>>>>> continues to >>>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming since >>>>>>>>>>>>>>>>>> it has halted >>>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to >>>>>>>>>>>>>>>>>> express that in >>>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the >>>>>>>>>>>>>>>>>> application owner >>>>>>>>>>>>>>>>>> has to go through the process of measuring and then defining >>>>>>>>>>>>>>>>>> this "max". >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The reverse where they don't do this and the application >>>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing >>>>>>>>>>>>>>>>>> and frustrating >>>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as >>>>>>>>>>>>>>>>>> long as the >>>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it >>>>>>>>>>>>>>>>>> should be easy >>>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or >>>>>>>>>>>>>>>>>> not and how >>>>>>>>>>>>>>>>>> that should be tuned. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source >>>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is >>>>>>>>>>>>>>>>>> unblocked since you >>>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Neha >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier >>>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points >>>>>>>>>>>>>>>>>>> you discuss are all very valid. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. >>>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry >>>>>>>>>>>>>>>>>>> apps that get hit >>>>>>>>>>>>>>>>>>> by it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are >>>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also >>>>>>>>>>>>>>>>>>> real. If the app >>>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is >>>>>>>>>>>>>>>>>>> healthy is surely >>>>>>>>>>>>>>>>>>> higher. But this again isn't an absolute measure that the >>>>>>>>>>>>>>>>>>> app is >>>>>>>>>>>>>>>>>>> processing correctly. >>>>>>>>>>>>>>>>>>> In other cases the app might have even died in which >>>>>>>>>>>>>>>>>>> case this discussion is moot. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect >>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag >>>>>>>>>>>>>>>>>>> increases then for >>>>>>>>>>>>>>>>>>> sure something is wrong. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to >>>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time >>>>>>>>>>>>>>>>>>> they take to >>>>>>>>>>>>>>>>>>> process events after every poll. This turns out is a >>>>>>>>>>>>>>>>>>> nontrivial thing to >>>>>>>>>>>>>>>>>>> do for an application todo. To start with when an >>>>>>>>>>>>>>>>>>> application is new their >>>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on >>>>>>>>>>>>>>>>>>> synthetic data. >>>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in >>>>>>>>>>>>>>>>>>> production. Once >>>>>>>>>>>>>>>>>>> the app is in production their processing latencies will >>>>>>>>>>>>>>>>>>> potentially vary >>>>>>>>>>>>>>>>>>> over time. It is extremely unlikely that the application >>>>>>>>>>>>>>>>>>> owner does a >>>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over time >>>>>>>>>>>>>>>>>>> and readjust >>>>>>>>>>>>>>>>>>> the settings. Often times the latencies vary because of >>>>>>>>>>>>>>>>>>> variance is other >>>>>>>>>>>>>>>>>>> services that are called by the consumer as part of >>>>>>>>>>>>>>>>>>> processing the events. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events >>>>>>>>>>>>>>>>>>> and writes to Kafka. With quotas the write latencies to >>>>>>>>>>>>>>>>>>> kafka could range >>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds. >>>>>>>>>>>>>>>>>>> As the scale of >>>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' >>>>>>>>>>>>>>>>>>> could now get >>>>>>>>>>>>>>>>>>> quotaed. Instead of slowing down gracefully unless the >>>>>>>>>>>>>>>>>>> application owner >>>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a >>>>>>>>>>>>>>>>>>> potential outage >>>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us >>>>>>>>>>>>>>>>>>> to take care of this in the linkedin wrapper. Whereby we >>>>>>>>>>>>>>>>>>> would keep >>>>>>>>>>>>>>>>>>> calling poll on a separate thread periodically and enqueue >>>>>>>>>>>>>>>>>>> the messages. >>>>>>>>>>>>>>>>>>> When the queue is full we would call pause(). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I >>>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as >>>>>>>>>>>>>>>>>>> every major >>>>>>>>>>>>>>>>>>> customer will eventually be pained by it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Kartik >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps < >>>>>>>>>>>>>>>>>>> j...@confluent.io> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hey guys, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges >>>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or >>>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group >>>>>>>>>>>>>>>>>>> management stuff >>>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is >>>>>>>>>>>>>>>>>>> already a big >>>>>>>>>>>>>>>>>>> effort. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of >>>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when >>>>>>>>>>>>>>>>>>> people are surprised >>>>>>>>>>>>>>>>>>> because we haven't labeled something that will be >>>>>>>>>>>>>>>>>>> unintuitive. But the >>>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in >>>>>>>>>>>>>>>>>>> programming >>>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond >>>>>>>>>>>>>>>>>>> people if explained >>>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if >>>>>>>>>>>>>>>>>>> we don't say >>>>>>>>>>>>>>>>>>> which is which any scheme will be confusing). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has >>>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people had >>>>>>>>>>>>>>>>>>> around >>>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking >>>>>>>>>>>>>>>>>>> iterator, >>>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, >>>>>>>>>>>>>>>>>>> etc have all >>>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that >>>>>>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people >>>>>>>>>>>>>>>>>>> want to try to >>>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've >>>>>>>>>>>>>>>>>>> heard not to use >>>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to >>>>>>>>>>>>>>>>>>> lack of features). >>>>>>>>>>>>>>>>>>> It's important to have that context when people need to >>>>>>>>>>>>>>>>>>> switch and they say >>>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Let me give some context related to your points, based >>>>>>>>>>>>>>>>>>> on our previous discussions: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was >>>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the >>>>>>>>>>>>>>>>>>> lowest level >>>>>>>>>>>>>>>>>>> client. There are many, many possible higher level >>>>>>>>>>>>>>>>>>> processing abstractions. >>>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level >>>>>>>>>>>>>>>>>>> client was that >>>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with >>>>>>>>>>>>>>>>>>> things that are >>>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading >>>>>>>>>>>>>>>>>>> model. If you do >>>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level >>>>>>>>>>>>>>>>>>> consumer api for >>>>>>>>>>>>>>>>>>> people to get around whatever you have done. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The second reason was that the internal threading in the >>>>>>>>>>>>>>>>>>> client became quite complex. The answer with threading is >>>>>>>>>>>>>>>>>>> always that "it >>>>>>>>>>>>>>>>>>> won't be complex this time", but it always is. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to >>>>>>>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the >>>>>>>>>>>>>>>>>>> application must >>>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. This >>>>>>>>>>>>>>>>>>> allows the >>>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However >>>>>>>>>>>>>>>>>>> it's important to >>>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do >>>>>>>>>>>>>>>>>>> background polling a >>>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. >>>>>>>>>>>>>>>>>>> This leads to >>>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because >>>>>>>>>>>>>>>>>>> they have >>>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming >>>>>>>>>>>>>>>>>>> partitions and >>>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. >>>>>>>>>>>>>>>>>>> If you allow >>>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and >>>>>>>>>>>>>>>>>>> knows they aren't >>>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if >>>>>>>>>>>>>>>>>>> you allows false >>>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone >>>>>>>>>>>>>>>>>>> notices that a >>>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which point >>>>>>>>>>>>>>>>>>> the data is >>>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have >>>>>>>>>>>>>>>>>>> regular false >>>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this >>>>>>>>>>>>>>>>>>> in some depth >>>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the >>>>>>>>>>>>>>>>>>> liveness notion tied >>>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition >>>>>>>>>>>>>>>>>>> of liveness. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman < >>>>>>>>>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Confluent Team. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity >>>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, >>>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The current behavior is not intuitive. For example, >>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll drives everything. The other methods >>>>>>>>>>>>>>>>>>>> like subscribe, >>>>>>>>>>>>>>>>>>>> unsubscribe, seek, commit(async) don’t do anything >>>>>>>>>>>>>>>>>>>> without a >>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll call. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The semantics of a commit() call should be >>>>>>>>>>>>>>>>>>>> consistent between sync and async operations. >>>>>>>>>>>>>>>>>>>> Currently, sync commit is a >>>>>>>>>>>>>>>>>>>> blocking call which actually sends out an >>>>>>>>>>>>>>>>>>>> OffsetCommitRequest and waits for >>>>>>>>>>>>>>>>>>>> the response upon the user’s KafkaConsumer.commit call. >>>>>>>>>>>>>>>>>>>> However, the async >>>>>>>>>>>>>>>>>>>> commit is a nonblocking call which just queues up the >>>>>>>>>>>>>>>>>>>> OffsetCommitRequest. >>>>>>>>>>>>>>>>>>>> The request itself is later sent out in the next poll. >>>>>>>>>>>>>>>>>>>> The teams we talked >>>>>>>>>>>>>>>>>>>> to found this misleading. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Heartbeats are dependent on user application >>>>>>>>>>>>>>>>>>>> behavior (i.e. user applications calling poll). This >>>>>>>>>>>>>>>>>>>> can be a big problem >>>>>>>>>>>>>>>>>>>> as we don’t control how different applications behave. >>>>>>>>>>>>>>>>>>>> For example, we >>>>>>>>>>>>>>>>>>>> might have an application which reads from Kafka and >>>>>>>>>>>>>>>>>>>> writes to Espresso. If >>>>>>>>>>>>>>>>>>>> Espresso is slow for whatever reason, then in >>>>>>>>>>>>>>>>>>>> rebalances could happen. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current >>>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the >>>>>>>>>>>>>>>>>>>> old simple >>>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with >>>>>>>>>>>>>>>>>>>> raw protocols and >>>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for >>>>>>>>>>>>>>>>>>>> users. However, for >>>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of >>>>>>>>>>>>>>>>>>>> users), the >>>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level >>>>>>>>>>>>>>>>>>>> consumer >>>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while >>>>>>>>>>>>>>>>>>>> KafkaConsumer >>>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and >>>>>>>>>>>>>>>>>>>> is becoming >>>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the >>>>>>>>>>>>>>>>>>>> javadoc growing >>>>>>>>>>>>>>>>>>>> bigger and bigger. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should take >>>>>>>>>>>>>>>>>>>> a step back and look at the big picture. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's >>>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called >>>>>>>>>>>>>>>>>>>> by the user >>>>>>>>>>>>>>>>>>>> which pretty much drives everything: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - data fetches >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic >>>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - async offset commits >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the >>>>>>>>>>>>>>>>>>>> end user, this ends up making us educate users on NIO and >>>>>>>>>>>>>>>>>>>> the consequences >>>>>>>>>>>>>>>>>>>> of not calling KafkaConsumer.poll frequently enough: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - async commits won't send >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - callbacks won't fire >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with >>>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of >>>>>>>>>>>>>>>>>>>> KafkaConsumer: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - NIO >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume >>>>>>>>>>>>>>>>>>>> messages >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) >>>>>>>>>>>>>>>>>>>> or partitions(explicit group management) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - no surprises in the user experience >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't >>>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that there >>>>>>>>>>>>>>>>>>>> should be no >>>>>>>>>>>>>>>>>>>> requirement from the end user to consistently >>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll in order >>>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be >>>>>>>>>>>>>>>>>>>> better to split >>>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on >>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks >>>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - join groups >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - commits >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the >>>>>>>>>>>>>>>>>>>> end user. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to >>>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you at >>>>>>>>>>>>>>>>>>>> LinkedIn on *July >>>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it >>>>>>>>>>>>>>>>>>>> to open source. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Neha >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Neha >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Thanks, >>>>>>>>>> Ewen >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Thanks, >>>>>>>> Neha >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Neha >>>>>> >>>>> >>>>> >>>> >>> >> >