For those who are interested in the ticket: https://issues.apache.org/jira/browse/KAFKA-2388
On Fri, Jul 31, 2015 at 1:14 PM, Jiangjie Qin <j...@linkedin.com> wrote: > I like the idea as well. That is much clearer. > Also agree with Jay on the naming. > > Thanks, Jason. I'll update the Jira ticket. > > Jiangjie (Becket) Qin > > On Fri, Jul 31, 2015 at 12:19 PM, Jay Kreps <j...@confluent.io> wrote: > >> I like all these ideas. >> >> Our convention is to keep method names declarative so it should probably >> be >> subscribe(List<String> topics, Callback c) >> assign(List<TopicPartition) >> >> The javadoc would obviously have to clarify the relationship between a >> subscribed topic and assigned partitions. Presumably unsubscribe/unassign >> are unnecessary since this is just a matter of subscribing to the empty >> list. >> >> -Jay >> >> On Fri, Jul 31, 2015 at 11:29 AM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >>> I was thinking a little bit this morning about the subscription API and >>> I have a few ideas on how to address some of the concerns about >>> intuitiveness and exception handling. >>> >>> 1. Split the current notion of topic/partition subscription into >>> subscription of topics and assignment of partitions. These concepts are >>> pretty fundamentally different and I think at least some of the confusion >>> about when subscriptions() can be used is caused by the fact that we >>> overload the term. If instead that method is renamed to assignment(), then >>> we are communicating to users that it is possible to have a subscription >>> without an active assignment, which is not obvious with the current API. >>> The code in fact already separates these concepts internally, so this would >>> just expose it to the user. >>> >>> 2. Merge rebalance callback into a subscription callback and add method >>> a way to handle errors. The consumer's current rebalance callback is >>> basically invoked when a subscription "succeeds," so it seems a little >>> weird to also provide a callback on subscription. Perhaps we can just take >>> the rebalance callback out of configuration and have the user provide it on >>> subscribe(). We can add a method to the callback to handle errors (e.g. for >>> non-existing topics). Since the callback is provided at subscribe time, it >>> should be clearer to the user that the assignment will not be ready >>> immediately when subscribe returns. It's also arguably a little more >>> natural to set this callback at subscription time rather than when the >>> consumer is constructed. >>> >>> 3. Get rid of the additive subscribe methods and just use >>> setSubscription which would clear the old subscription. After you start >>> providing callbacks to subscribe, then the implementation starts to get >>> tricky if each call to subscribe provides a separate callback. Instead, as >>> Jay suggested, we could just provide a way to set the full list of >>> subscriptions at once, and then there is only one callback to maintain. >>> >>> With these points, the API might look something like this: >>> >>> void setSubscription(List<String> topics, RebalanceCallback callback); >>> void setAssignment(List<TopicPartition> partitions); >>> List<String> subscription(); >>> List<TopicPartition> assignment(); >>> >>> interface RebalanceCallback { >>> void onAssignment(List<TopicPartition> partitions); >>> void onRevocation(List<TopicPartition> partitions); >>> >>> // handle non-existing topics, etc. >>> void onError(Exception e); >>> } >>> >>> Any thoughts? >>> >>> -Jason >>> >>> >>> >>> On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <j...@confluent.io> wrote: >>> >>>> Hey Becket, >>>> >>>> Yeah the high-level belief here is that it is possible to give >>>> something as high level as the existing "high level" consumer, but this is >>>> not likely to be the end-all be-all of high-level interfaces for processing >>>> streams of messages. For example neither of these interfaces handles the >>>> threading model for the processing, which obviously is a fairly low-level >>>> implementation detail left to the user in you proposal, the current code, >>>> as well as the existing scala consumer. >>>> >>>> There will be many of these: the full-fledged stream processing >>>> frameworks like Storm/Spark, scalaz streams, the RxJava stuff, a more >>>> traditional message queue like "processor" interface, not to mention the >>>> stuff we're trying to do with KIP-28. For these frameworks it will be quite >>>> weird to add a bunch of new threads since they will want to dictate the >>>> threading model. >>>> >>>> What will be a major failure though is if this client isn't low-level >>>> enough and we need to introduce another layer underneath. This would happen >>>> either because we dictate too much to make it usable for various >>>> applications, frameworks, or use cases. This is the concern with dictating >>>> threading and processing models. >>>> >>>> So to summarize the goal is to subsume the existing APIs, which I think >>>> we all agree this does, and be a foundation on which to build other >>>> abstractions. >>>> >>>> WRT KIP-28, I think it is quite general and if we do that right it will >>>> subsume a lot of the higher level processing and will give a full threaded >>>> processing model to the user. >>>> >>>> >>>> -Jay >>>> >>>> >>>> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <j...@linkedin.com> >>>> wrote: >>>> >>>>> Thanks for the comments Jason and Jay. >>>>> >>>>> Jason, I had the same concern for producer's callback as well before, >>>>> but it seems to be fine from some callbacks I wrote - user can always pass >>>>> in object in the constructor if necessary for synchronization. >>>>> >>>>> Jay, I agree that the current API might be fine for people who wants >>>>> to wrap it up. But I thought the new consumer was supposed to be a >>>>> combination of old high and low level consumer, which means it should be >>>>> able to be used as is, just like producer. If KafkaConsumer is designed to >>>>> be wrapped up for use, then the question becomes whether Kafka will >>>>> provide >>>>> a decent wrapper or not? Neha mentioned that KIP-28 will address the users >>>>> who only care about data. Would that be the wrapper provided by Kafka? I >>>>> am >>>>> not sure if that is sufficient though because the processor is highly >>>>> abstracted, and might only meet the static data stream requirement as I >>>>> listed in the grid. For users who need something from the other grids, are >>>>> we going to have another wrapper? Or are we expecting all the user to >>>>> write >>>>> their own wrapper for KafkaConsumer? Some other comments are in line. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <j...@confluent.io> wrote: >>>>> >>>>>> Some comments on the proposal: >>>>>> >>>>>> I think we are conflating a number of things that should probably be >>>>>> addressed individually because they are unrelated. My past experience is >>>>>> that this always makes progress hard. The more we can pick apart these >>>>>> items the better: >>>>>> >>>>>> 1. threading model >>>>>> 2. blocking vs non-blocking semantics >>>>>> 3. missing apis >>>>>> 4. missing javadoc and other api surprises >>>>>> 5. Throwing exceptions. >>>>>> >>>>>> The missing APIs are getting added independently. Some like your >>>>>> proposed offsetByTime where things we agreed to hold off on for the first >>>>>> release and do when we'd thought it through. If there are uses for it now >>>>>> we can accelerate. I think each of these is really independent, we know >>>>>> there are things that need to be added but lumping them all into one >>>>>> discussion will be confusing. >>>>>> >>>>>> WRT throwing exceptions the policy is to throw exceptions that are >>>>>> unrecoverable and handle and log other exceptions that are transient. >>>>>> That >>>>>> policy makes sense if you go through the thought exercise of "what will >>>>>> the >>>>>> user do if i throw this exception to them" if they have no other rational >>>>>> response but to retry (and if failing to anticipate and retry with that >>>>>> exception will kill their program) . You can argue whether the topic not >>>>>> existing is transient or not, unfortunately the way we did auto-creation >>>>>> makes it transient if you are in "auto create mode" and non-transient >>>>>> otherwise (ick!). In any case this is an orthogonal discussion to >>>>>> everything else. I think the policy is right and if we don't conform to >>>>>> it >>>>>> in some way that is really an independent bug/discussion. >>>>>> >>>>> Agreed we can discuss about them separately. >>>>> >>>>>> >>>>>> I suggest we focus on threading and the current event-loop style of >>>>>> api design since I think that is really the crux. >>>>>> >>>>>> The analogy between the producer threading model and the consumer >>>>>> model actually doesn't work for me. The goal of the producer is actually >>>>>> to >>>>>> take requests from many many user threads and shove them into a single >>>>>> buffer for batching. So the threading model isn't the 1:1 threads you >>>>>> describe it is N:1.The goal of the consumer is to support single-threaded >>>>>> processing. This is what drives the difference. Saying that the producer >>>>>> has N:1 threads therefore for the consumer should have 1:1 threads >>>>>> instead >>>>>> of just 1 thread doesn't make sense any more then an analogy to the >>>>>> brokers >>>>>> threading model would--the problem we're solving is totally different. >>>>>> >>>>> I think the ultimate goal for producer and consumer are still allowing >>>>> user to send/receive data in parallel. In producer we picked the solution >>>>> of one-producer-serving-multiple-threads, and in consumer we picked >>>>> multiple-single-threaded-consumers instead of >>>>> single-consumer-serving-multiple threads. And we believe people can always >>>>> implement the latter with the former. I think this is a reasonable >>>>> decision. However, there are also reasonable concerns over the >>>>> multiple-single-threaded-consumers solution which is that the >>>>> single-thread >>>>> might have to be a dedicate polling thread in many cases which pushes user >>>>> towards the other solution - i.e. implementing a >>>>> single-thread-consumer-serving-multiple-threads wrapper. From what we >>>>> hear, >>>>> it seems to be a quite common concern for most of the users we talked to. >>>>> Plus the adoption bar of the consumer will be much higher because user >>>>> will >>>>> have to understand some of the details of the things they don't care as >>>>> listed in the grid. >>>>> The analogy between producer/consumer is intended to show that a >>>>> separate polling thread will solve the concerns we have. >>>>> >>>>>> >>>>>> >>>>> I think ultimately though what you need to think about is, does an >>>>>> event loop style of API make sense? That is the source of all the issues >>>>>> you describe. This style of API is incredibly prevalent from unix select >>>>>> to >>>>>> GUIs to node.js. It's a great way to model multiple channels of messages >>>>>> coming in. It is a fantastic style for event processing. Programmers >>>>>> understand this style of api though I would agree it is unusual compared >>>>>> to >>>>>> blocking apis. But it is is a single threaded processing model. The >>>>>> current >>>>>> approach is basically a pure event loop with some convenience methods >>>>>> that >>>>>> are effectively "poll until X is complete". >>>>>> >>>>>> I think basically all the confusion you are describing comes from not >>>>>> documenting/expecting an event loop. The "if you don't call poll nothing >>>>>> happens" point is basically this. It's an event loop. You have to loop. >>>>>> You >>>>>> can't not call poll. The docs don't cover this right now, perhaps. I >>>>>> think >>>>>> if they do it's not unreasonable behavior. >>>>>> >>>>> I'm not sure if I understand the event-loop correctly and honestly I >>>>> did not think about it clearly before. My understanding is that an >>>>> even-loop model means a single listener thread, but there can be multiple >>>>> event generator threads. The downside is that the listener thread has to >>>>> be >>>>> fast and very careful about blocking. If we look at the consumer, the >>>>> current model is the caller thread itself act as both event generator and >>>>> listener. As a generator, it generates different task by calling the >>>>> convenience methods. As a listener, it listens to the messages on broker >>>>> and also the tasks generated by itself. So in our proposal, we are not >>>>> changing the event-loop model here just separated the event generator and >>>>> event listener. It looks to me that the underlying execution thread >>>>> follows >>>>> the event-loop model, the special thing might be it is not only listening >>>>> to the messages from broker, but also listening to the tasks from the user >>>>> thread. This is essentially the thing a consumer has to do - interact with >>>>> both server and user. >>>>> >>>>>> >>>>>> If we want to move away from an event loop I'm not sure *any* aspect >>>>>> of the current event loop style of api makes sense any more. I am not >>>>>> totally married to event loops, but i do think what we have gives an >>>>>> elegant way of implementing any higher level abstractions that would >>>>>> fully >>>>>> implement the user's parallelism model. I don't want to go rethink >>>>>> everything but I do think a half-way implementation that is event loop + >>>>>> background threads is likely going to be icky. >>>>>> >>>>> We brought this up before to change the consumer.poll() to >>>>> consumer.consume(). And did not do so simply because we wanted to less >>>>> change in API... I might be crazy but can we think of the proposed model >>>>> as >>>>> processing thread + event-loop instead, rather than event-loop + >>>>> background >>>>> thread? >>>>> >>>>>> >>>>>> WRT making it configurable whether liveness means "actually >>>>>> consuming" or "background thread running" I would suggest that that is >>>>>> really the worst outcome. These type of "modes" that are functionally >>>>>> totally different are just awful from a documentation, testing, >>>>>> usability, >>>>>> etc pov. I would strongly prefer we pick either of these, document it, >>>>>> and >>>>>> make it work well rather than trying to do both. >>>>>> >>>>> Previously I thought this was the major benefit we wanted from a >>>>> single threaded model, personally I don't have a strong preference on >>>>> this. >>>>> So I am OK with either way. >>>>> >>>>>> >>>>>> -Jay >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Works now. Thanks Becket! >>>>>>> >>>>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Ah... My bad, forgot to change the URL link for pictures. >>>>>>>> Thanks for the quick response, Neha. It should be fixed now, can >>>>>>>> you try again? >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images >>>>>>>>> that the wiki refers to, but none loaded for me. Just making sure if >>>>>>>>> its >>>>>>>>> just me or can everyone not see the pictures? >>>>>>>>> >>>>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I agree with Ewen that a single threaded model will be tricky to >>>>>>>>>> implement the same conventional semantic of async or Future. We just >>>>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn >>>>>>>>>> on the >>>>>>>>>> new consumer API and threading model. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal >>>>>>>>>> >>>>>>>>>> We were trying to see: >>>>>>>>>> 1. If we can use some kind of methodology to help us think about >>>>>>>>>> what API we want to provide to user for different use cases. >>>>>>>>>> 2. What is the pros and cons of current single threaded model. Is >>>>>>>>>> there a way that we can maintain the benefits while solve the issues >>>>>>>>>> we are >>>>>>>>>> facing now with single threaded model. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>> >>>>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava < >>>>>>>>>> e...@confluent.io> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang < >>>>>>>>>>> wangg...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along >>>>>>>>>>>> with returning future in the commit calls, i.e. something similar >>>>>>>>>>>> to: >>>>>>>>>>>> >>>>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback); >>>>>>>>>>>> >>>>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets, >>>>>>>>>>>> ConsumerCommitCallback callback); >>>>>>>>>>>> >>>>>>>>>>>> At that time I was slightly intending not to include the Future >>>>>>>>>>>> besides adding the callback mainly because of the implementation >>>>>>>>>>>> complexity >>>>>>>>>>>> I feel it could introduce along with the retry settings after >>>>>>>>>>>> looking >>>>>>>>>>>> through the code base. I would happy to change my mind if we could >>>>>>>>>>>> propose >>>>>>>>>>>> a prototype implementation that is simple enough. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> One of the reasons that interface ended up being difficult (or >>>>>>>>>>> maybe impossible) to make work reasonably is because the consumer >>>>>>>>>>> was >>>>>>>>>>> thread-safe at the time. That made it impossible to know what >>>>>>>>>>> should be >>>>>>>>>>> done when Future.get() is called -- should the implementation call >>>>>>>>>>> poll() >>>>>>>>>>> itself, or would the fact that the user is calling get() imply that >>>>>>>>>>> there's >>>>>>>>>>> a background thread running the poll() loop and we just need to >>>>>>>>>>> wait for it? >>>>>>>>>>> >>>>>>>>>>> The consumer is no longer thread safe, but I think the same >>>>>>>>>>> problem remains because the expectation with Futures is that they >>>>>>>>>>> are >>>>>>>>>>> thread safe. Which means that even if the consumer isn't thread >>>>>>>>>>> safe, I >>>>>>>>>>> would expect to be able to hand that Future off to some other >>>>>>>>>>> thread, have >>>>>>>>>>> the second thread call get(), and then continue driving the poll >>>>>>>>>>> loop in my >>>>>>>>>>> thread (which in turn would eventually resolve the Future). >>>>>>>>>>> >>>>>>>>>>> I quite dislike the sync/async enum. While both operations >>>>>>>>>>> commit offsets, their semantics are so different that overloading a >>>>>>>>>>> single >>>>>>>>>>> method with both is messy. That said, I don't think we should >>>>>>>>>>> consider this >>>>>>>>>>> an inconsistency wrt the new producer API's use of Future because >>>>>>>>>>> the two >>>>>>>>>>> APIs have a much more fundamental difference that justifies it: >>>>>>>>>>> they have >>>>>>>>>>> completely different threading and execution models. >>>>>>>>>>> >>>>>>>>>>> I think a Future-based API only makes sense if you can guarantee >>>>>>>>>>> the operations that Futures are waiting on will continue to make >>>>>>>>>>> progress >>>>>>>>>>> regardless of what the thread using the Future does. The producer >>>>>>>>>>> API makes >>>>>>>>>>> that work by processing asynchronous requests in a background >>>>>>>>>>> thread. The >>>>>>>>>>> new consumer does not, and so it becomes difficult/impossible to >>>>>>>>>>> implement >>>>>>>>>>> the Future correctly. (Or, you have to make assumptions which break >>>>>>>>>>> other >>>>>>>>>>> use cases; if you want to support the simple use case of just >>>>>>>>>>> making a >>>>>>>>>>> commit() synchronous by calling get(), the Future has to call poll() >>>>>>>>>>> internally; but if you do that, then if any user ever wants to add >>>>>>>>>>> synchronization to the consumer via some external mechanism, then >>>>>>>>>>> the >>>>>>>>>>> implementation of the Future's get() method will not be subject to >>>>>>>>>>> that >>>>>>>>>>> synchronization and things will break). >>>>>>>>>>> >>>>>>>>>>> -Ewen >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede < >>>>>>>>>>>> n...@confluent.io> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hey Adi, >>>>>>>>>>>>> >>>>>>>>>>>>> When we designed the initial version, the producer API was >>>>>>>>>>>>> still changing. I thought about adding the Future and then just >>>>>>>>>>>>> didn't get >>>>>>>>>>>>> to it. I agree that we should look into adding it for consistency. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Neha >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar < >>>>>>>>>>>>> aaurad...@linkedin.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Great discussion everyone! >>>>>>>>>>>>>> >>>>>>>>>>>>>> One general comment on the sync/async API's on the new >>>>>>>>>>>>>> consumer. I think the producer tackles sync vs async API's >>>>>>>>>>>>>> well. For API's that can either be sync or async, can we simply >>>>>>>>>>>>>> return a >>>>>>>>>>>>>> future? That seems more elegant for the API's that make sense >>>>>>>>>>>>>> either in >>>>>>>>>>>>>> both flavors. From the users perspective, it is more consistent >>>>>>>>>>>>>> with the >>>>>>>>>>>>>> new producer. One easy example is the commit call with the >>>>>>>>>>>>>> CommitType >>>>>>>>>>>>>> enum.. we can make that call always async and users can block on >>>>>>>>>>>>>> the future >>>>>>>>>>>>>> if they want to make sure their offsets are committed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Aditya >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman < >>>>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the great responses, everyone! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring >>>>>>>>>>>>>>> up old high level consumers, the teams we spoke to were >>>>>>>>>>>>>>> actually not the >>>>>>>>>>>>>>> types of services that simply wanted an easy way to get >>>>>>>>>>>>>>> ConsumerRecords. We >>>>>>>>>>>>>>> spoke to infrastructure teams that I would consider to be >>>>>>>>>>>>>>> closer to the >>>>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's >>>>>>>>>>>>>>> level of >>>>>>>>>>>>>>> granularity. Some would use auto group management. Some would >>>>>>>>>>>>>>> use explicit >>>>>>>>>>>>>>> group management. All of them would turn off auto offset >>>>>>>>>>>>>>> commits. Yes, the >>>>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, >>>>>>>>>>>>>>> but this is >>>>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I >>>>>>>>>>>>>>> don't really >>>>>>>>>>>>>>> think the feedback received was about the simpler times or >>>>>>>>>>>>>>> wanting >>>>>>>>>>>>>>> additional higher-level clients. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Onur >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson < >>>>>>>>>>>>>>> ja...@confluent.io> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we >>>>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since >>>>>>>>>>>>>>>> this generally >>>>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's >>>>>>>>>>>>>>>> currently >>>>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also >>>>>>>>>>>>>>>> be nice to >>>>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean >>>>>>>>>>>>>>>> shutdown of a >>>>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the >>>>>>>>>>>>>>>> timeout to >>>>>>>>>>>>>>>> reassign partitions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Jason >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps < >>>>>>>>>>>>>>>> j...@confluent.io> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hey Kartik, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the >>>>>>>>>>>>>>>>> common case. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> However there are two ways to avoid this: >>>>>>>>>>>>>>>>> 1. Default the timeout high >>>>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When we were doing the consumer design we discussed this >>>>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that >>>>>>>>>>>>>>>>> defaulting to a >>>>>>>>>>>>>>>>> high timeout was actually better. This means it takes a >>>>>>>>>>>>>>>>> little longer to >>>>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and >>>>>>>>>>>>>>>>> people who want >>>>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better >>>>>>>>>>>>>>>>> than having >>>>>>>>>>>>>>>>> the failure detection not really cover the consumption and >>>>>>>>>>>>>>>>> just be a >>>>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the >>>>>>>>>>>>>>>>> GC problem >>>>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some >>>>>>>>>>>>>>>>> sense a better >>>>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of >>>>>>>>>>>>>>>>> problems crop up when >>>>>>>>>>>>>>>>> you have an inactive consumer with an active background >>>>>>>>>>>>>>>>> thread (as today). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When we had the discussion I think what we realized was >>>>>>>>>>>>>>>>> that most people who were worried about the timeout where >>>>>>>>>>>>>>>>> imagining a very >>>>>>>>>>>>>>>>> low default (500ms) say. But in fact just setting this to 60 >>>>>>>>>>>>>>>>> seconds or >>>>>>>>>>>>>>>>> higher as a default would be okay, this adds to the failure >>>>>>>>>>>>>>>>> detection time >>>>>>>>>>>>>>>>> but only apps that care about this need to tune. This should >>>>>>>>>>>>>>>>> largely >>>>>>>>>>>>>>>>> eliminate false positives since after all if you disappear >>>>>>>>>>>>>>>>> for 60 seconds >>>>>>>>>>>>>>>>> that actually starts to be more of a true positive, even if >>>>>>>>>>>>>>>>> you come >>>>>>>>>>>>>>>>> back... :-) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam < >>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> adding the open source alias. This email started off as >>>>>>>>>>>>>>>>>> a broader discussion around the new consumer. I was zooming >>>>>>>>>>>>>>>>>> into only the >>>>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the >>>>>>>>>>>>>>>>>> heartbeats. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the >>>>>>>>>>>>>>>>>> problem). Monitoring the lag is important as it is the >>>>>>>>>>>>>>>>>> primary way to tell >>>>>>>>>>>>>>>>>> if the application is wedged. There might be other metrics >>>>>>>>>>>>>>>>>> which can >>>>>>>>>>>>>>>>>> possibly capture the same essence. Yes the lag is at the >>>>>>>>>>>>>>>>>> consumer group >>>>>>>>>>>>>>>>>> level, but you can tell that one of the consumers is messed >>>>>>>>>>>>>>>>>> up if one of >>>>>>>>>>>>>>>>>> the partitions in the application start generating lag and >>>>>>>>>>>>>>>>>> others are good >>>>>>>>>>>>>>>>>> for e.g. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is >>>>>>>>>>>>>>>>>> that in the old consumer most customers don't have to worry >>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>> unnecessary rebalances and most of the things that they do >>>>>>>>>>>>>>>>>> in their app >>>>>>>>>>>>>>>>>> doesn't have an impact on the session timeout.. (i.e. the >>>>>>>>>>>>>>>>>> only thing that >>>>>>>>>>>>>>>>>> causes rebalances is when the GC is out of whack). For >>>>>>>>>>>>>>>>>> the handful of >>>>>>>>>>>>>>>>>> customers who are impacted by GC related rebalances, i would >>>>>>>>>>>>>>>>>> imagine that >>>>>>>>>>>>>>>>>> all of them would really want us to make the system more >>>>>>>>>>>>>>>>>> resilient. I >>>>>>>>>>>>>>>>>> agree that the GC problem can't be solved easily in the java >>>>>>>>>>>>>>>>>> client, >>>>>>>>>>>>>>>>>> however it appears that now we would be expecting the >>>>>>>>>>>>>>>>>> consuming >>>>>>>>>>>>>>>>>> applications to be even more careful with ongoing tuning of >>>>>>>>>>>>>>>>>> the timeouts. >>>>>>>>>>>>>>>>>> At LinkedIn, we have seen that most kafka applications don't >>>>>>>>>>>>>>>>>> have much of a >>>>>>>>>>>>>>>>>> clue about configuring the timeouts and just end up calling >>>>>>>>>>>>>>>>>> the Kafka team >>>>>>>>>>>>>>>>>> when their application sees rebalances. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is >>>>>>>>>>>>>>>>>> that we have to make sure that people don't set a poll >>>>>>>>>>>>>>>>>> timeout that is >>>>>>>>>>>>>>>>>> larger than the session timeout. If we had a notion of >>>>>>>>>>>>>>>>>> implicit >>>>>>>>>>>>>>>>>> heartbeats then we could also automatically make this work >>>>>>>>>>>>>>>>>> for consumers by >>>>>>>>>>>>>>>>>> sending hearbeats at the appropriate interval even though >>>>>>>>>>>>>>>>>> the customers >>>>>>>>>>>>>>>>>> want to do a long poll. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we >>>>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the >>>>>>>>>>>>>>>>>> consumer. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Would love to hear how other people think about this >>>>>>>>>>>>>>>>>> subject ? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>> Kartik >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede < >>>>>>>>>>>>>>>>>> n...@confluent.io> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is >>>>>>>>>>>>>>>>>>> that there are many ways the application's message >>>>>>>>>>>>>>>>>>> processing could fail >>>>>>>>>>>>>>>>>>> and we wouldn't be able to model all of those in the >>>>>>>>>>>>>>>>>>> consumer's failure >>>>>>>>>>>>>>>>>>> detection mechanism. So we should try to model as much of >>>>>>>>>>>>>>>>>>> it as we can so >>>>>>>>>>>>>>>>>>> the consumer's failure detection is meaningful. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect >>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag >>>>>>>>>>>>>>>>>>>> increases then for >>>>>>>>>>>>>>>>>>>> sure something is wrong. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the >>>>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept >>>>>>>>>>>>>>>>>>> and the problem >>>>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of >>>>>>>>>>>>>>>>>>> individual >>>>>>>>>>>>>>>>>>> consumer instances. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger >>>>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The >>>>>>>>>>>>>>>>>>> dilemma then is who >>>>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but >>>>>>>>>>>>>>>>>>> the application >>>>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for >>>>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for >>>>>>>>>>>>>>>>>>> the user to >>>>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too >>>>>>>>>>>>>>>>>>> often. The >>>>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be >>>>>>>>>>>>>>>>>>> *exactly* 99tile of processing latency but could be in >>>>>>>>>>>>>>>>>>> the ballpark + an error delta. The error delta is the >>>>>>>>>>>>>>>>>>> application owner's >>>>>>>>>>>>>>>>>>> acceptable risk threshold during which they would be ok if >>>>>>>>>>>>>>>>>>> the application >>>>>>>>>>>>>>>>>>> remains part of the group despite being dead. It is >>>>>>>>>>>>>>>>>>> ultimately a tradeoff >>>>>>>>>>>>>>>>>>> between operational ease and more accurate failure >>>>>>>>>>>>>>>>>>> detection. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range >>>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most >>>>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range >>>>>>>>>>>>>>>>>>> and there are >>>>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC >>>>>>>>>>>>>>>>>>> falls in the >>>>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be >>>>>>>>>>>>>>>>>>> predicted, so has to be >>>>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to >>>>>>>>>>>>>>>>>>> observe what this >>>>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the >>>>>>>>>>>>>>>>>>> appropriate >>>>>>>>>>>>>>>>>>> timeouts. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Since this is not something that can be automated, this >>>>>>>>>>>>>>>>>>> is a config that the application owner has to set based on >>>>>>>>>>>>>>>>>>> the expected >>>>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads >>>>>>>>>>>>>>>>>>> to ending up >>>>>>>>>>>>>>>>>>> with bad consumption semantics where the application >>>>>>>>>>>>>>>>>>> process continues to >>>>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming >>>>>>>>>>>>>>>>>>> since it has halted >>>>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to >>>>>>>>>>>>>>>>>>> express that in >>>>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the >>>>>>>>>>>>>>>>>>> application owner >>>>>>>>>>>>>>>>>>> has to go through the process of measuring and then >>>>>>>>>>>>>>>>>>> defining this "max". >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The reverse where they don't do this and the application >>>>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing >>>>>>>>>>>>>>>>>>> and frustrating >>>>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And >>>>>>>>>>>>>>>>>>> as long as the >>>>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it >>>>>>>>>>>>>>>>>>> should be easy >>>>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid >>>>>>>>>>>>>>>>>>> or not and how >>>>>>>>>>>>>>>>>>> that should be tuned. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source >>>>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is >>>>>>>>>>>>>>>>>>> unblocked since you >>>>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Neha >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam < >>>>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier >>>>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the >>>>>>>>>>>>>>>>>>>> points you discuss are all very valid. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. >>>>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry >>>>>>>>>>>>>>>>>>>> apps that get hit >>>>>>>>>>>>>>>>>>>> by it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are >>>>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also >>>>>>>>>>>>>>>>>>>> real. If the app >>>>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is >>>>>>>>>>>>>>>>>>>> healthy is surely >>>>>>>>>>>>>>>>>>>> higher. But this again isn't an absolute measure that the >>>>>>>>>>>>>>>>>>>> app is >>>>>>>>>>>>>>>>>>>> processing correctly. >>>>>>>>>>>>>>>>>>>> In other cases the app might have even died in which >>>>>>>>>>>>>>>>>>>> case this discussion is moot. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect >>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag >>>>>>>>>>>>>>>>>>>> increases then for >>>>>>>>>>>>>>>>>>>> sure something is wrong. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to >>>>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time >>>>>>>>>>>>>>>>>>>> they take to >>>>>>>>>>>>>>>>>>>> process events after every poll. This turns out is a >>>>>>>>>>>>>>>>>>>> nontrivial thing to >>>>>>>>>>>>>>>>>>>> do for an application todo. To start with when an >>>>>>>>>>>>>>>>>>>> application is new their >>>>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on >>>>>>>>>>>>>>>>>>>> synthetic data. >>>>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in >>>>>>>>>>>>>>>>>>>> production. Once >>>>>>>>>>>>>>>>>>>> the app is in production their processing latencies will >>>>>>>>>>>>>>>>>>>> potentially vary >>>>>>>>>>>>>>>>>>>> over time. It is extremely unlikely that the application >>>>>>>>>>>>>>>>>>>> owner does a >>>>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over >>>>>>>>>>>>>>>>>>>> time and readjust >>>>>>>>>>>>>>>>>>>> the settings. Often times the latencies vary because of >>>>>>>>>>>>>>>>>>>> variance is other >>>>>>>>>>>>>>>>>>>> services that are called by the consumer as part of >>>>>>>>>>>>>>>>>>>> processing the events. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events >>>>>>>>>>>>>>>>>>>> and writes to Kafka. With quotas the write latencies to >>>>>>>>>>>>>>>>>>>> kafka could range >>>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds. >>>>>>>>>>>>>>>>>>>> As the scale of >>>>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' >>>>>>>>>>>>>>>>>>>> could now get >>>>>>>>>>>>>>>>>>>> quotaed. Instead of slowing down gracefully unless the >>>>>>>>>>>>>>>>>>>> application owner >>>>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a >>>>>>>>>>>>>>>>>>>> potential outage >>>>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us >>>>>>>>>>>>>>>>>>>> to take care of this in the linkedin wrapper. Whereby we >>>>>>>>>>>>>>>>>>>> would keep >>>>>>>>>>>>>>>>>>>> calling poll on a separate thread periodically and enqueue >>>>>>>>>>>>>>>>>>>> the messages. >>>>>>>>>>>>>>>>>>>> When the queue is full we would call pause(). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I >>>>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as >>>>>>>>>>>>>>>>>>>> every major >>>>>>>>>>>>>>>>>>>> customer will eventually be pained by it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Kartik >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps < >>>>>>>>>>>>>>>>>>>> j...@confluent.io> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hey guys, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges >>>>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or >>>>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group >>>>>>>>>>>>>>>>>>>> management stuff >>>>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is >>>>>>>>>>>>>>>>>>>> already a big >>>>>>>>>>>>>>>>>>>> effort. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of >>>>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when >>>>>>>>>>>>>>>>>>>> people are surprised >>>>>>>>>>>>>>>>>>>> because we haven't labeled something that will be >>>>>>>>>>>>>>>>>>>> unintuitive. But the >>>>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in >>>>>>>>>>>>>>>>>>>> programming >>>>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond >>>>>>>>>>>>>>>>>>>> people if explained >>>>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if >>>>>>>>>>>>>>>>>>>> we don't say >>>>>>>>>>>>>>>>>>>> which is which any scheme will be confusing). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has >>>>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people >>>>>>>>>>>>>>>>>>>> had around >>>>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking >>>>>>>>>>>>>>>>>>>> iterator, >>>>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, >>>>>>>>>>>>>>>>>>>> etc have all >>>>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that >>>>>>>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people >>>>>>>>>>>>>>>>>>>> want to try to >>>>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've >>>>>>>>>>>>>>>>>>>> heard not to use >>>>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to >>>>>>>>>>>>>>>>>>>> lack of features). >>>>>>>>>>>>>>>>>>>> It's important to have that context when people need to >>>>>>>>>>>>>>>>>>>> switch and they say >>>>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Let me give some context related to your points, based >>>>>>>>>>>>>>>>>>>> on our previous discussions: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was >>>>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the >>>>>>>>>>>>>>>>>>>> lowest level >>>>>>>>>>>>>>>>>>>> client. There are many, many possible higher level >>>>>>>>>>>>>>>>>>>> processing abstractions. >>>>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level >>>>>>>>>>>>>>>>>>>> client was that >>>>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with >>>>>>>>>>>>>>>>>>>> things that are >>>>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading >>>>>>>>>>>>>>>>>>>> model. If you do >>>>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level >>>>>>>>>>>>>>>>>>>> consumer api for >>>>>>>>>>>>>>>>>>>> people to get around whatever you have done. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The second reason was that the internal threading in >>>>>>>>>>>>>>>>>>>> the client became quite complex. The answer with threading >>>>>>>>>>>>>>>>>>>> is always that >>>>>>>>>>>>>>>>>>>> "it won't be complex this time", but it always is. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside >>>>>>>>>>>>>>>>>>>> to coupling heartbeat with poll--the contract is that the >>>>>>>>>>>>>>>>>>>> application must >>>>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. >>>>>>>>>>>>>>>>>>>> This allows the >>>>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However >>>>>>>>>>>>>>>>>>>> it's important to >>>>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do >>>>>>>>>>>>>>>>>>>> background polling a >>>>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't >>>>>>>>>>>>>>>>>>>> shutdown. This leads to >>>>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming >>>>>>>>>>>>>>>>>>>> because they have >>>>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming >>>>>>>>>>>>>>>>>>>> partitions and >>>>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far >>>>>>>>>>>>>>>>>>>> worse. If you allow >>>>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and >>>>>>>>>>>>>>>>>>>> knows they aren't >>>>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if >>>>>>>>>>>>>>>>>>>> you allows false >>>>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone >>>>>>>>>>>>>>>>>>>> notices that a >>>>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which >>>>>>>>>>>>>>>>>>>> point the data is >>>>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have >>>>>>>>>>>>>>>>>>>> regular false >>>>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed >>>>>>>>>>>>>>>>>>>> this in some depth >>>>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the >>>>>>>>>>>>>>>>>>>> liveness notion tied >>>>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition >>>>>>>>>>>>>>>>>>>> of liveness. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman < >>>>>>>>>>>>>>>>>>>> okara...@linkedin.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Confluent Team. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity >>>>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123 >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350 >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359 >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, >>>>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The current behavior is not intuitive. For >>>>>>>>>>>>>>>>>>>>> example, KafkaConsumer.poll drives everything. The >>>>>>>>>>>>>>>>>>>>> other methods like >>>>>>>>>>>>>>>>>>>>> subscribe, unsubscribe, seek, commit(async) don’t do >>>>>>>>>>>>>>>>>>>>> anything without a >>>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll call. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The semantics of a commit() call should be >>>>>>>>>>>>>>>>>>>>> consistent between sync and async operations. >>>>>>>>>>>>>>>>>>>>> Currently, sync commit is a >>>>>>>>>>>>>>>>>>>>> blocking call which actually sends out an >>>>>>>>>>>>>>>>>>>>> OffsetCommitRequest and waits for >>>>>>>>>>>>>>>>>>>>> the response upon the user’s KafkaConsumer.commit >>>>>>>>>>>>>>>>>>>>> call. However, the async >>>>>>>>>>>>>>>>>>>>> commit is a nonblocking call which just queues up the >>>>>>>>>>>>>>>>>>>>> OffsetCommitRequest. >>>>>>>>>>>>>>>>>>>>> The request itself is later sent out in the next poll. >>>>>>>>>>>>>>>>>>>>> The teams we talked >>>>>>>>>>>>>>>>>>>>> to found this misleading. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Heartbeats are dependent on user application >>>>>>>>>>>>>>>>>>>>> behavior (i.e. user applications calling poll). This >>>>>>>>>>>>>>>>>>>>> can be a big problem >>>>>>>>>>>>>>>>>>>>> as we don’t control how different applications behave. >>>>>>>>>>>>>>>>>>>>> For example, we >>>>>>>>>>>>>>>>>>>>> might have an application which reads from Kafka and >>>>>>>>>>>>>>>>>>>>> writes to Espresso. If >>>>>>>>>>>>>>>>>>>>> Espresso is slow for whatever reason, then in >>>>>>>>>>>>>>>>>>>>> rebalances could happen. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current >>>>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the >>>>>>>>>>>>>>>>>>>>> old simple >>>>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with >>>>>>>>>>>>>>>>>>>>> raw protocols and >>>>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for >>>>>>>>>>>>>>>>>>>>> users. However, for >>>>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of >>>>>>>>>>>>>>>>>>>>> users), the >>>>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level >>>>>>>>>>>>>>>>>>>>> consumer >>>>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while >>>>>>>>>>>>>>>>>>>>> KafkaConsumer >>>>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and >>>>>>>>>>>>>>>>>>>>> is becoming >>>>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the >>>>>>>>>>>>>>>>>>>>> javadoc growing >>>>>>>>>>>>>>>>>>>>> bigger and bigger. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should >>>>>>>>>>>>>>>>>>>>> take a step back and look at the big picture. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's >>>>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll >>>>>>>>>>>>>>>>>>>>> called by the user >>>>>>>>>>>>>>>>>>>>> which pretty much drives everything: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - data fetches >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic >>>>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - async offset commits >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the >>>>>>>>>>>>>>>>>>>>> end user, this ends up making us educate users on NIO and >>>>>>>>>>>>>>>>>>>>> the consequences >>>>>>>>>>>>>>>>>>>>> of not calling KafkaConsumer.poll frequently enough: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - async commits won't send >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - callbacks won't fire >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with >>>>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of >>>>>>>>>>>>>>>>>>>>> KafkaConsumer: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - NIO >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume >>>>>>>>>>>>>>>>>>>>> messages >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) >>>>>>>>>>>>>>>>>>>>> or partitions(explicit group management) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - no surprises in the user experience >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't >>>>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that >>>>>>>>>>>>>>>>>>>>> there should be no >>>>>>>>>>>>>>>>>>>>> requirement from the end user to consistently >>>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll in order >>>>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would >>>>>>>>>>>>>>>>>>>>> be better to split >>>>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on >>>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks >>>>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - heartbeats >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - join groups >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - commits >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - executing callbacks >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the >>>>>>>>>>>>>>>>>>>>> end user. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to >>>>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you >>>>>>>>>>>>>>>>>>>>> at LinkedIn on *July >>>>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it >>>>>>>>>>>>>>>>>>>>> to open source. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Neha >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Neha >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -- Guozhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Thanks, >>>>>>>>>>> Ewen >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Thanks, >>>>>>>>> Neha >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks, >>>>>>> Neha >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >