For those who are interested in the ticket:
https://issues.apache.org/jira/browse/KAFKA-2388

On Fri, Jul 31, 2015 at 1:14 PM, Jiangjie Qin <j...@linkedin.com> wrote:

> I like the idea as well. That is much clearer.
> Also agree with Jay on the naming.
>
> Thanks, Jason. I'll update the Jira ticket.
>
> Jiangjie (Becket) Qin
>
> On Fri, Jul 31, 2015 at 12:19 PM, Jay Kreps <j...@confluent.io> wrote:
>
>> I like all these ideas.
>>
>> Our convention is to keep method names declarative so it should probably
>> be
>>   subscribe(List<String> topics, Callback c)
>>   assign(List<TopicPartition)
>>
>> The javadoc would obviously have to clarify the relationship between a
>> subscribed topic and assigned partitions. Presumably unsubscribe/unassign
>> are unnecessary since this is just a matter of subscribing to the empty
>> list.
>>
>> -Jay
>>
>> On Fri, Jul 31, 2015 at 11:29 AM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>>> I was thinking a little bit this morning about the subscription API and
>>> I have a few ideas on how to address some of the concerns about
>>> intuitiveness and exception handling.
>>>
>>> 1. Split the current notion of topic/partition subscription into
>>> subscription of topics and assignment of partitions. These concepts are
>>> pretty fundamentally different and I think at least some of the confusion
>>> about when subscriptions() can be used is caused by the fact that we
>>> overload the term. If instead that method is renamed to assignment(), then
>>> we are communicating to users that it is possible to have a subscription
>>> without an active assignment, which is not obvious with the current API.
>>> The code in fact already separates these concepts internally, so this would
>>> just expose it to the user.
>>>
>>> 2. Merge rebalance callback into a subscription callback and add method
>>> a way to handle errors. The consumer's current rebalance callback is
>>> basically invoked when a subscription "succeeds," so it seems a little
>>> weird to also provide a callback on subscription. Perhaps we can just take
>>> the rebalance callback out of configuration and have the user provide it on
>>> subscribe(). We can add a method to the callback to handle errors (e.g. for
>>> non-existing topics). Since the callback is provided at subscribe time, it
>>> should be clearer to the user that the assignment will not be ready
>>> immediately when subscribe returns. It's also arguably a little more
>>> natural to set this callback at subscription time rather than when the
>>> consumer is constructed.
>>>
>>> 3. Get rid of the additive subscribe methods and just use
>>> setSubscription which would clear the old subscription. After you start
>>> providing callbacks to subscribe, then the implementation starts to get
>>> tricky if each call to subscribe provides a separate callback. Instead, as
>>> Jay suggested, we could just provide a way to set the full list of
>>> subscriptions at once, and then there is only one callback to maintain.
>>>
>>> With these points, the API might look something like this:
>>>
>>> void setSubscription(List<String> topics, RebalanceCallback callback);
>>> void setAssignment(List<TopicPartition> partitions);
>>> List<String> subscription();
>>> List<TopicPartition> assignment();
>>>
>>> interface RebalanceCallback {
>>>   void onAssignment(List<TopicPartition> partitions);
>>>   void onRevocation(List<TopicPartition> partitions);
>>>
>>>   // handle non-existing topics, etc.
>>>   void onError(Exception e);
>>> }
>>>
>>> Any thoughts?
>>>
>>> -Jason
>>>
>>>
>>>
>>> On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <j...@confluent.io> wrote:
>>>
>>>> Hey Becket,
>>>>
>>>> Yeah the high-level belief here is that it is possible to give
>>>> something as high level as the existing "high level" consumer, but this is
>>>> not likely to be the end-all be-all of high-level interfaces for processing
>>>> streams of messages. For example neither of these interfaces handles the
>>>> threading model for the processing, which obviously is a fairly low-level
>>>> implementation detail left to the user in you proposal, the current code,
>>>> as well as the existing scala consumer.
>>>>
>>>> There will be many of these: the full-fledged stream processing
>>>> frameworks like Storm/Spark, scalaz streams, the RxJava stuff, a more
>>>> traditional message queue like "processor" interface, not to mention the
>>>> stuff we're trying to do with KIP-28. For these frameworks it will be quite
>>>> weird to add a bunch of new threads since they will want to dictate the
>>>> threading model.
>>>>
>>>> What will be a major failure though is if this client isn't low-level
>>>> enough and we need to introduce another layer underneath. This would happen
>>>> either because we dictate too much to make it usable for various
>>>> applications, frameworks, or use cases. This is the concern with dictating
>>>> threading and processing models.
>>>>
>>>> So to summarize the goal is to subsume the existing APIs, which I think
>>>> we all agree this does, and be a foundation on which to build other
>>>> abstractions.
>>>>
>>>> WRT KIP-28, I think it is quite general and if we do that right it will
>>>> subsume a lot of the higher level processing and will give a full threaded
>>>> processing model to the user.
>>>>
>>>>
>>>> -Jay
>>>>
>>>>
>>>> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <j...@linkedin.com>
>>>> wrote:
>>>>
>>>>> Thanks for the comments Jason and Jay.
>>>>>
>>>>> Jason, I had the same concern for producer's callback as well before,
>>>>> but it seems to be fine from some callbacks I wrote - user can always pass
>>>>> in object in the constructor if necessary for synchronization.
>>>>>
>>>>> Jay, I agree that the current API might be fine for people who wants
>>>>> to wrap it up. But I thought the new consumer was supposed to be a
>>>>> combination of old high and low level consumer, which means it should be
>>>>> able to be used as is, just like producer. If KafkaConsumer is designed to
>>>>> be wrapped up for use, then the question becomes whether Kafka will 
>>>>> provide
>>>>> a decent wrapper or not? Neha mentioned that KIP-28 will address the users
>>>>> who only care about data. Would that be the wrapper provided by Kafka? I 
>>>>> am
>>>>> not sure if that is sufficient though because the processor is highly
>>>>> abstracted, and might only meet the static data stream requirement as I
>>>>> listed in the grid. For users who need something from the other grids, are
>>>>> we going to have another wrapper? Or are we expecting all the user to 
>>>>> write
>>>>> their own wrapper for KafkaConsumer? Some other comments are in line.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <j...@confluent.io> wrote:
>>>>>
>>>>>> Some comments on the proposal:
>>>>>>
>>>>>> I think we are conflating a number of things that should probably be
>>>>>> addressed individually because they are unrelated. My past experience is
>>>>>> that this always makes progress hard. The more we can pick apart these
>>>>>> items the better:
>>>>>>
>>>>>>    1. threading model
>>>>>>    2. blocking vs non-blocking semantics
>>>>>>    3. missing apis
>>>>>>    4. missing javadoc and other api surprises
>>>>>>    5. Throwing exceptions.
>>>>>>
>>>>>> The missing APIs are getting added independently. Some like your
>>>>>> proposed offsetByTime where things we agreed to hold off on for the first
>>>>>> release and do when we'd thought it through. If there are uses for it now
>>>>>> we can accelerate. I think each of these is really independent, we know
>>>>>> there are things that need to be added but lumping them all into one
>>>>>> discussion will be confusing.
>>>>>>
>>>>>> WRT throwing exceptions the policy is to throw exceptions that are
>>>>>> unrecoverable and handle and log other exceptions that are transient. 
>>>>>> That
>>>>>> policy makes sense if you go through the thought exercise of "what will 
>>>>>> the
>>>>>> user do if i throw this exception to them" if they have no other rational
>>>>>> response but to retry (and if failing to anticipate and retry with that
>>>>>> exception will kill their program) . You can argue whether the topic not
>>>>>> existing is transient or not, unfortunately the way we did auto-creation
>>>>>> makes it transient if you are in "auto create mode" and non-transient
>>>>>> otherwise (ick!). In any case this is an orthogonal discussion to
>>>>>> everything else. I think the policy is right and if we don't conform to 
>>>>>> it
>>>>>> in some way that is really an independent bug/discussion.
>>>>>>
>>>>> Agreed we can discuss about them separately.
>>>>>
>>>>>>
>>>>>> I suggest we focus on threading and the current event-loop style of
>>>>>> api design since I think that is really the crux.
>>>>>>
>>>>>> The analogy between the producer threading model and the consumer
>>>>>> model actually doesn't work for me. The goal of the producer is actually 
>>>>>> to
>>>>>> take requests from many many user threads and shove them into a single
>>>>>> buffer for batching. So the threading model isn't the 1:1 threads you
>>>>>> describe it is N:1.The goal of the consumer is to support single-threaded
>>>>>> processing. This is what drives the difference. Saying that the producer
>>>>>> has N:1 threads therefore for the consumer should have 1:1 threads 
>>>>>> instead
>>>>>> of just 1 thread doesn't make sense any more then an analogy to the 
>>>>>> brokers
>>>>>> threading model would--the problem we're solving is totally different.
>>>>>>
>>>>> I think the ultimate goal for producer and consumer are still allowing
>>>>> user to send/receive data in parallel. In producer we picked the solution
>>>>> of one-producer-serving-multiple-threads, and in consumer we picked
>>>>> multiple-single-threaded-consumers instead of
>>>>> single-consumer-serving-multiple threads. And we believe people can always
>>>>> implement the latter with the former. I think this is a reasonable
>>>>> decision. However, there are also reasonable concerns over the
>>>>> multiple-single-threaded-consumers solution which is that the 
>>>>> single-thread
>>>>> might have to be a dedicate polling thread in many cases which pushes user
>>>>> towards the other solution - i.e. implementing a
>>>>> single-thread-consumer-serving-multiple-threads wrapper. From what we 
>>>>> hear,
>>>>> it seems to be a quite common concern for most of the users we talked to.
>>>>> Plus the adoption bar of the consumer will be much higher because user 
>>>>> will
>>>>> have to understand some of the details of the things they don't care as
>>>>> listed in the grid.
>>>>> The analogy between producer/consumer is intended to show that a
>>>>> separate polling thread will solve the concerns we have.
>>>>>
>>>>>>
>>>>>>
>>>>> I think ultimately though what you need to think about is, does an
>>>>>> event loop style of API make sense? That is the source of all the issues
>>>>>> you describe. This style of API is incredibly prevalent from unix select 
>>>>>> to
>>>>>> GUIs to node.js. It's a great way to model multiple channels of messages
>>>>>> coming in. It is a fantastic style for event processing. Programmers
>>>>>> understand this style of api though I would agree it is unusual compared 
>>>>>> to
>>>>>> blocking apis. But it is is a single threaded processing model. The 
>>>>>> current
>>>>>> approach is basically a pure event loop with some convenience methods 
>>>>>> that
>>>>>> are effectively "poll until X is complete".
>>>>>>
>>>>>> I think basically all the confusion you are describing comes from not
>>>>>> documenting/expecting an event loop. The "if you don't call poll nothing
>>>>>> happens" point is basically this. It's an event loop. You have to loop. 
>>>>>> You
>>>>>> can't not call poll. The docs don't cover this right now, perhaps. I 
>>>>>> think
>>>>>> if they do it's not unreasonable behavior.
>>>>>>
>>>>> I'm not sure if I understand the event-loop correctly and honestly I
>>>>> did not think about it clearly before. My understanding is that an
>>>>> even-loop model means a single listener thread, but there can be multiple
>>>>> event generator threads. The downside is that the listener thread has to 
>>>>> be
>>>>> fast and very careful about blocking. If we look at the consumer, the
>>>>> current model is the caller thread itself act as both event generator and
>>>>> listener. As a generator, it generates different task by calling the
>>>>> convenience methods. As a listener, it listens to the messages on broker
>>>>> and also the tasks generated by itself. So in our proposal, we are not
>>>>> changing the event-loop model here just separated the event generator and
>>>>> event listener. It looks to me that the underlying execution thread 
>>>>> follows
>>>>> the event-loop model, the special thing might be it is not only listening
>>>>> to the messages from broker, but also listening to the tasks from the user
>>>>> thread. This is essentially the thing a consumer has to do - interact with
>>>>> both server and user.
>>>>>
>>>>>>
>>>>>> If we want to move away from an event loop I'm not sure *any* aspect
>>>>>> of the current event loop style of api makes sense any more. I am not
>>>>>> totally married to event loops, but i do think what we have gives an
>>>>>> elegant way of implementing any higher level abstractions that would 
>>>>>> fully
>>>>>> implement the user's parallelism model. I don't want to go rethink
>>>>>> everything but I do think a half-way implementation that is event loop +
>>>>>> background threads is likely going to be icky.
>>>>>>
>>>>> We brought this up before to change the consumer.poll() to
>>>>> consumer.consume(). And did not do so simply because we wanted to less
>>>>> change in API... I might be crazy but can we think of the proposed model 
>>>>> as
>>>>> processing thread + event-loop instead, rather than event-loop + 
>>>>> background
>>>>> thread?
>>>>>
>>>>>>
>>>>>> WRT making it configurable whether liveness means "actually
>>>>>> consuming" or "background thread running" I would suggest that that is
>>>>>> really the worst outcome. These type of "modes" that are functionally
>>>>>> totally different are just awful from a documentation, testing, 
>>>>>> usability,
>>>>>> etc pov. I would strongly prefer we pick either of these, document it, 
>>>>>> and
>>>>>> make it work well rather than trying to do both.
>>>>>>
>>>>> Previously I thought this was the major benefit we wanted from a
>>>>> single threaded model, personally I don't have a strong preference on 
>>>>> this.
>>>>> So I am OK with either way.
>>>>>
>>>>>>
>>>>>> -Jay
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Works now. Thanks Becket!
>>>>>>>
>>>>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ah... My bad, forgot to change the URL link for pictures.
>>>>>>>> Thanks for the quick response, Neha. It should be fixed now, can
>>>>>>>> you try again?
>>>>>>>>
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>
>>>>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images
>>>>>>>>> that the wiki refers to, but none loaded for me. Just making sure if 
>>>>>>>>> its
>>>>>>>>> just me or can everyone not see the pictures?
>>>>>>>>>
>>>>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I agree with Ewen that a single threaded model will be tricky to
>>>>>>>>>> implement the same conventional semantic of async or Future. We just
>>>>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn 
>>>>>>>>>> on the
>>>>>>>>>> new consumer API and threading model.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>>>>>>>>>
>>>>>>>>>> We were trying to see:
>>>>>>>>>> 1. If we can use some kind of methodology to help us think about
>>>>>>>>>> what API we want to provide to user for different use cases.
>>>>>>>>>> 2. What is the pros and cons of current single threaded model. Is
>>>>>>>>>> there a way that we can maintain the benefits while solve the issues 
>>>>>>>>>> we are
>>>>>>>>>> facing now with single threaded model.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>
>>>>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>>>>>>>>>> e...@confluent.io> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <
>>>>>>>>>>> wangg...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along
>>>>>>>>>>>> with returning future in the commit calls, i.e. something similar 
>>>>>>>>>>>> to:
>>>>>>>>>>>>
>>>>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>>>>>>>>>
>>>>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>>>>>>>>>> ConsumerCommitCallback callback);
>>>>>>>>>>>>
>>>>>>>>>>>> At that time I was slightly intending not to include the Future
>>>>>>>>>>>> besides adding the callback mainly because of the implementation 
>>>>>>>>>>>> complexity
>>>>>>>>>>>> I feel it could introduce along with the retry settings after 
>>>>>>>>>>>> looking
>>>>>>>>>>>> through the code base. I would happy to change my mind if we could 
>>>>>>>>>>>> propose
>>>>>>>>>>>> a prototype implementation that is simple enough.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> One of the reasons that interface ended up being difficult (or
>>>>>>>>>>> maybe impossible) to make work reasonably is because the consumer 
>>>>>>>>>>> was
>>>>>>>>>>> thread-safe at the time. That made it impossible to know what 
>>>>>>>>>>> should be
>>>>>>>>>>> done when Future.get() is called -- should the implementation call 
>>>>>>>>>>> poll()
>>>>>>>>>>> itself, or would the fact that the user is calling get() imply that 
>>>>>>>>>>> there's
>>>>>>>>>>> a background thread running the poll() loop and we just need to 
>>>>>>>>>>> wait for it?
>>>>>>>>>>>
>>>>>>>>>>> The consumer is no longer thread safe, but I think the same
>>>>>>>>>>> problem remains because the expectation with Futures is that they 
>>>>>>>>>>> are
>>>>>>>>>>> thread safe. Which means that even if the consumer isn't thread 
>>>>>>>>>>> safe, I
>>>>>>>>>>> would expect to be able to hand that Future off to some other 
>>>>>>>>>>> thread, have
>>>>>>>>>>> the second thread call get(), and then continue driving the poll 
>>>>>>>>>>> loop in my
>>>>>>>>>>> thread (which in turn would eventually resolve the Future).
>>>>>>>>>>>
>>>>>>>>>>> I quite dislike the sync/async enum. While both operations
>>>>>>>>>>> commit offsets, their semantics are so different that overloading a 
>>>>>>>>>>> single
>>>>>>>>>>> method with both is messy. That said, I don't think we should 
>>>>>>>>>>> consider this
>>>>>>>>>>> an inconsistency wrt the new producer API's use of Future because 
>>>>>>>>>>> the two
>>>>>>>>>>> APIs have a much more fundamental difference that justifies it: 
>>>>>>>>>>> they have
>>>>>>>>>>> completely different threading and execution models.
>>>>>>>>>>>
>>>>>>>>>>> I think a Future-based API only makes sense if you can guarantee
>>>>>>>>>>> the operations that Futures are waiting on will continue to make 
>>>>>>>>>>> progress
>>>>>>>>>>> regardless of what the thread using the Future does. The producer 
>>>>>>>>>>> API makes
>>>>>>>>>>> that work by processing asynchronous requests in a background 
>>>>>>>>>>> thread. The
>>>>>>>>>>> new consumer does not, and so it becomes difficult/impossible to 
>>>>>>>>>>> implement
>>>>>>>>>>> the Future correctly. (Or, you have to make assumptions which break 
>>>>>>>>>>> other
>>>>>>>>>>> use cases; if you want to support the simple use case of just 
>>>>>>>>>>> making a
>>>>>>>>>>> commit() synchronous by calling get(), the Future has to call poll()
>>>>>>>>>>> internally; but if you do that, then if any user ever wants to add
>>>>>>>>>>> synchronization to the consumer via some external mechanism, then 
>>>>>>>>>>> the
>>>>>>>>>>> implementation of the Future's get() method will not be subject to 
>>>>>>>>>>> that
>>>>>>>>>>> synchronization and things will break).
>>>>>>>>>>>
>>>>>>>>>>> -Ewen
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <
>>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Adi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> When we designed the initial version, the producer API was
>>>>>>>>>>>>> still changing. I thought about adding the Future and then just 
>>>>>>>>>>>>> didn't get
>>>>>>>>>>>>> to it. I agree that we should look into adding it for consistency.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>>>>>>>>>> aaurad...@linkedin.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Great discussion everyone!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One general comment on the sync/async API's on the new
>>>>>>>>>>>>>> consumer. I think the producer tackles sync vs async API's
>>>>>>>>>>>>>> well. For API's that can either be sync or async, can we simply 
>>>>>>>>>>>>>> return a
>>>>>>>>>>>>>> future? That seems more elegant for the API's that make sense 
>>>>>>>>>>>>>> either in
>>>>>>>>>>>>>> both flavors. From the users perspective, it is more consistent 
>>>>>>>>>>>>>> with the
>>>>>>>>>>>>>> new producer. One easy example is the commit call with the 
>>>>>>>>>>>>>> CommitType
>>>>>>>>>>>>>> enum.. we can make that call always async and users can block on 
>>>>>>>>>>>>>> the future
>>>>>>>>>>>>>> if they want to make sure their offsets are committed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Aditya
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <
>>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the great responses, everyone!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring
>>>>>>>>>>>>>>> up old high level consumers, the teams we spoke to were 
>>>>>>>>>>>>>>> actually not the
>>>>>>>>>>>>>>> types of services that simply wanted an easy way to get 
>>>>>>>>>>>>>>> ConsumerRecords. We
>>>>>>>>>>>>>>> spoke to infrastructure teams that I would consider to be 
>>>>>>>>>>>>>>> closer to the
>>>>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's 
>>>>>>>>>>>>>>> level of
>>>>>>>>>>>>>>> granularity. Some would use auto group management. Some would 
>>>>>>>>>>>>>>> use explicit
>>>>>>>>>>>>>>> group management. All of them would turn off auto offset 
>>>>>>>>>>>>>>> commits. Yes, the
>>>>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, 
>>>>>>>>>>>>>>> but this is
>>>>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I 
>>>>>>>>>>>>>>> don't really
>>>>>>>>>>>>>>> think the feedback received was about the simpler times or 
>>>>>>>>>>>>>>> wanting
>>>>>>>>>>>>>>> additional higher-level clients.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - Onur
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <
>>>>>>>>>>>>>>> ja...@confluent.io> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we
>>>>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since 
>>>>>>>>>>>>>>>> this generally
>>>>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's 
>>>>>>>>>>>>>>>> currently
>>>>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also 
>>>>>>>>>>>>>>>> be nice to
>>>>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean 
>>>>>>>>>>>>>>>> shutdown of a
>>>>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the 
>>>>>>>>>>>>>>>> timeout to
>>>>>>>>>>>>>>>> reassign partitions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Jason
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <
>>>>>>>>>>>>>>>> j...@confluent.io> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey Kartik,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the
>>>>>>>>>>>>>>>>> common case.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>>>>>>>>>> 1. Default the timeout high
>>>>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When we were doing the consumer design we discussed this
>>>>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that 
>>>>>>>>>>>>>>>>> defaulting to a
>>>>>>>>>>>>>>>>> high timeout was actually better. This means it takes a 
>>>>>>>>>>>>>>>>> little longer to
>>>>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and 
>>>>>>>>>>>>>>>>> people who want
>>>>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better 
>>>>>>>>>>>>>>>>> than having
>>>>>>>>>>>>>>>>> the failure detection not really cover the consumption and 
>>>>>>>>>>>>>>>>> just be a
>>>>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the 
>>>>>>>>>>>>>>>>> GC problem
>>>>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some 
>>>>>>>>>>>>>>>>> sense a better
>>>>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of 
>>>>>>>>>>>>>>>>> problems crop up when
>>>>>>>>>>>>>>>>> you have an inactive consumer with an active background 
>>>>>>>>>>>>>>>>> thread (as today).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When we had the discussion I think what we realized was
>>>>>>>>>>>>>>>>> that most people who were worried about the timeout where 
>>>>>>>>>>>>>>>>> imagining a very
>>>>>>>>>>>>>>>>> low default (500ms) say. But in fact just setting this to 60 
>>>>>>>>>>>>>>>>> seconds or
>>>>>>>>>>>>>>>>> higher as a default would be okay, this adds to the failure 
>>>>>>>>>>>>>>>>> detection time
>>>>>>>>>>>>>>>>> but only apps that care about this need to tune. This should 
>>>>>>>>>>>>>>>>> largely
>>>>>>>>>>>>>>>>> eliminate false positives since after all if you disappear 
>>>>>>>>>>>>>>>>> for 60 seconds
>>>>>>>>>>>>>>>>> that actually starts to be more of a true positive, even if 
>>>>>>>>>>>>>>>>> you come
>>>>>>>>>>>>>>>>> back... :-)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> adding the open source alias.  This email started off as
>>>>>>>>>>>>>>>>>> a broader discussion around the new consumer.  I was zooming 
>>>>>>>>>>>>>>>>>> into only the
>>>>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the 
>>>>>>>>>>>>>>>>>> heartbeats.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the
>>>>>>>>>>>>>>>>>> problem).  Monitoring the lag is important as it is the 
>>>>>>>>>>>>>>>>>> primary way to tell
>>>>>>>>>>>>>>>>>> if the application is wedged.  There might be other metrics 
>>>>>>>>>>>>>>>>>> which can
>>>>>>>>>>>>>>>>>> possibly capture the same essence. Yes the lag is at the 
>>>>>>>>>>>>>>>>>> consumer group
>>>>>>>>>>>>>>>>>> level, but you can tell that one of the consumers is messed 
>>>>>>>>>>>>>>>>>> up if one of
>>>>>>>>>>>>>>>>>> the partitions in the application start generating lag and 
>>>>>>>>>>>>>>>>>> others are good
>>>>>>>>>>>>>>>>>> for e.g.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is
>>>>>>>>>>>>>>>>>> that in the old consumer most customers don't have to worry 
>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>> unnecessary rebalances and most of the things that they do 
>>>>>>>>>>>>>>>>>> in their app
>>>>>>>>>>>>>>>>>> doesn't have an impact on the session timeout..  (i.e. the 
>>>>>>>>>>>>>>>>>> only thing that
>>>>>>>>>>>>>>>>>> causes rebalances is when the GC is out of whack).    For 
>>>>>>>>>>>>>>>>>> the handful of
>>>>>>>>>>>>>>>>>> customers who are impacted by GC related rebalances, i would 
>>>>>>>>>>>>>>>>>> imagine that
>>>>>>>>>>>>>>>>>> all of them would really want us to make the system more 
>>>>>>>>>>>>>>>>>> resilient.    I
>>>>>>>>>>>>>>>>>> agree that the GC problem can't be solved easily in the java 
>>>>>>>>>>>>>>>>>> client,
>>>>>>>>>>>>>>>>>> however it appears that now we would be expecting the 
>>>>>>>>>>>>>>>>>> consuming
>>>>>>>>>>>>>>>>>> applications to be even more careful with ongoing tuning of 
>>>>>>>>>>>>>>>>>> the timeouts.
>>>>>>>>>>>>>>>>>> At LinkedIn, we have seen that most kafka applications don't 
>>>>>>>>>>>>>>>>>> have much of a
>>>>>>>>>>>>>>>>>> clue about configuring the timeouts and just end up calling 
>>>>>>>>>>>>>>>>>> the Kafka team
>>>>>>>>>>>>>>>>>> when their application sees rebalances.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is
>>>>>>>>>>>>>>>>>> that we have to make sure that people don't set a poll 
>>>>>>>>>>>>>>>>>> timeout that is
>>>>>>>>>>>>>>>>>> larger than the session timeout.   If we had a notion of 
>>>>>>>>>>>>>>>>>> implicit
>>>>>>>>>>>>>>>>>> heartbeats then we could also automatically make this work 
>>>>>>>>>>>>>>>>>> for consumers by
>>>>>>>>>>>>>>>>>> sending hearbeats at the appropriate interval even though 
>>>>>>>>>>>>>>>>>> the customers
>>>>>>>>>>>>>>>>>> want to do a long poll.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we
>>>>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the 
>>>>>>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Would love to hear how other people think about this
>>>>>>>>>>>>>>>>>> subject ?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <
>>>>>>>>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is
>>>>>>>>>>>>>>>>>>> that there are many ways the application's message 
>>>>>>>>>>>>>>>>>>> processing could fail
>>>>>>>>>>>>>>>>>>> and we wouldn't be able to model all of those in the 
>>>>>>>>>>>>>>>>>>> consumer's failure
>>>>>>>>>>>>>>>>>>> detection mechanism. So we should try to model as much of 
>>>>>>>>>>>>>>>>>>> it as we can so
>>>>>>>>>>>>>>>>>>> the consumer's failure detection is meaningful.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the
>>>>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept 
>>>>>>>>>>>>>>>>>>> and the problem
>>>>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of 
>>>>>>>>>>>>>>>>>>> individual
>>>>>>>>>>>>>>>>>>> consumer instances.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The 
>>>>>>>>>>>>>>>>>>> dilemma then is who
>>>>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but 
>>>>>>>>>>>>>>>>>>> the application
>>>>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for 
>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for 
>>>>>>>>>>>>>>>>>>> the user to
>>>>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too 
>>>>>>>>>>>>>>>>>>> often. The
>>>>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be
>>>>>>>>>>>>>>>>>>> *exactly* 99tile of processing latency but could be in
>>>>>>>>>>>>>>>>>>> the ballpark + an error delta. The error delta is the 
>>>>>>>>>>>>>>>>>>> application owner's
>>>>>>>>>>>>>>>>>>> acceptable risk threshold during which they would be ok if 
>>>>>>>>>>>>>>>>>>> the application
>>>>>>>>>>>>>>>>>>> remains part of the group despite being dead. It is 
>>>>>>>>>>>>>>>>>>> ultimately a tradeoff
>>>>>>>>>>>>>>>>>>> between operational ease and more accurate failure 
>>>>>>>>>>>>>>>>>>> detection.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range
>>>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most
>>>>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range 
>>>>>>>>>>>>>>>>>>> and there are
>>>>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC 
>>>>>>>>>>>>>>>>>>> falls in the
>>>>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be 
>>>>>>>>>>>>>>>>>>> predicted, so has to be
>>>>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to 
>>>>>>>>>>>>>>>>>>> observe what this
>>>>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the 
>>>>>>>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>>>>>>> timeouts.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Since this is not something that can be automated, this
>>>>>>>>>>>>>>>>>>> is a config that the application owner has to set based on 
>>>>>>>>>>>>>>>>>>> the expected
>>>>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads 
>>>>>>>>>>>>>>>>>>> to ending up
>>>>>>>>>>>>>>>>>>> with bad consumption semantics where the application 
>>>>>>>>>>>>>>>>>>> process continues to
>>>>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming 
>>>>>>>>>>>>>>>>>>> since it has halted
>>>>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to 
>>>>>>>>>>>>>>>>>>> express that in
>>>>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the 
>>>>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>>>>> has to go through the process of measuring and then 
>>>>>>>>>>>>>>>>>>> defining this "max".
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The reverse where they don't do this and the application
>>>>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing 
>>>>>>>>>>>>>>>>>>> and frustrating
>>>>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And 
>>>>>>>>>>>>>>>>>>> as long as the
>>>>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it 
>>>>>>>>>>>>>>>>>>> should be easy
>>>>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid 
>>>>>>>>>>>>>>>>>>> or not and how
>>>>>>>>>>>>>>>>>>> that should be tuned.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source
>>>>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is 
>>>>>>>>>>>>>>>>>>> unblocked since you
>>>>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the
>>>>>>>>>>>>>>>>>>>> points you discuss are all very valid.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue.
>>>>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry 
>>>>>>>>>>>>>>>>>>>> apps that get hit
>>>>>>>>>>>>>>>>>>>> by it.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are
>>>>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also 
>>>>>>>>>>>>>>>>>>>> real.  If the app
>>>>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is 
>>>>>>>>>>>>>>>>>>>> healthy is surely
>>>>>>>>>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the 
>>>>>>>>>>>>>>>>>>>> app is
>>>>>>>>>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>>>>>>>>>> In other cases the app might have even died in which
>>>>>>>>>>>>>>>>>>>> case this discussion is moot.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to
>>>>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time 
>>>>>>>>>>>>>>>>>>>> they take to
>>>>>>>>>>>>>>>>>>>> process events after every poll.   This turns out is a 
>>>>>>>>>>>>>>>>>>>> nontrivial thing to
>>>>>>>>>>>>>>>>>>>> do for an application todo. To start with when an 
>>>>>>>>>>>>>>>>>>>> application is new their
>>>>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on 
>>>>>>>>>>>>>>>>>>>> synthetic data.
>>>>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in 
>>>>>>>>>>>>>>>>>>>> production.  Once
>>>>>>>>>>>>>>>>>>>> the app is in production their processing latencies will 
>>>>>>>>>>>>>>>>>>>> potentially vary
>>>>>>>>>>>>>>>>>>>> over time.  It is extremely unlikely that the application 
>>>>>>>>>>>>>>>>>>>> owner does a
>>>>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over 
>>>>>>>>>>>>>>>>>>>> time and readjust
>>>>>>>>>>>>>>>>>>>> the settings.  Often times the latencies vary because of 
>>>>>>>>>>>>>>>>>>>> variance is other
>>>>>>>>>>>>>>>>>>>> services that are called by the consumer as part of 
>>>>>>>>>>>>>>>>>>>> processing the events.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events
>>>>>>>>>>>>>>>>>>>> and writes to Kafka.  With quotas the write latencies to 
>>>>>>>>>>>>>>>>>>>> kafka could range
>>>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds.  
>>>>>>>>>>>>>>>>>>>> As the scale of
>>>>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' 
>>>>>>>>>>>>>>>>>>>> could now get
>>>>>>>>>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the 
>>>>>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a 
>>>>>>>>>>>>>>>>>>>> potential outage
>>>>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us
>>>>>>>>>>>>>>>>>>>> to take care of this in the linkedin wrapper.  Whereby we 
>>>>>>>>>>>>>>>>>>>> would keep
>>>>>>>>>>>>>>>>>>>> calling poll on a separate thread periodically and enqueue 
>>>>>>>>>>>>>>>>>>>> the messages.
>>>>>>>>>>>>>>>>>>>> When the queue is full we would call pause().
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I
>>>>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as 
>>>>>>>>>>>>>>>>>>>> every major
>>>>>>>>>>>>>>>>>>>> customer will eventually be pained by it.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <
>>>>>>>>>>>>>>>>>>>> j...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges
>>>>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or
>>>>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group 
>>>>>>>>>>>>>>>>>>>> management stuff
>>>>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is 
>>>>>>>>>>>>>>>>>>>> already a big
>>>>>>>>>>>>>>>>>>>> effort.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of
>>>>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when 
>>>>>>>>>>>>>>>>>>>> people are surprised
>>>>>>>>>>>>>>>>>>>> because we haven't labeled something that will be 
>>>>>>>>>>>>>>>>>>>> unintuitive. But the
>>>>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in 
>>>>>>>>>>>>>>>>>>>> programming
>>>>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond 
>>>>>>>>>>>>>>>>>>>> people if explained
>>>>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if 
>>>>>>>>>>>>>>>>>>>> we don't say
>>>>>>>>>>>>>>>>>>>> which is which any scheme will be confusing).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has
>>>>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people 
>>>>>>>>>>>>>>>>>>>> had around
>>>>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking 
>>>>>>>>>>>>>>>>>>>> iterator,
>>>>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, 
>>>>>>>>>>>>>>>>>>>> etc  have all
>>>>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that 
>>>>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people 
>>>>>>>>>>>>>>>>>>>> want to try to
>>>>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've 
>>>>>>>>>>>>>>>>>>>> heard not to use
>>>>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to 
>>>>>>>>>>>>>>>>>>>> lack of features).
>>>>>>>>>>>>>>>>>>>> It's important to have that context when people need to 
>>>>>>>>>>>>>>>>>>>> switch and they say
>>>>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Let me give some context related to your points, based
>>>>>>>>>>>>>>>>>>>> on our previous discussions:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was
>>>>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the 
>>>>>>>>>>>>>>>>>>>> lowest level
>>>>>>>>>>>>>>>>>>>> client. There are many, many possible higher level 
>>>>>>>>>>>>>>>>>>>> processing abstractions.
>>>>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level 
>>>>>>>>>>>>>>>>>>>> client was that
>>>>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with 
>>>>>>>>>>>>>>>>>>>> things that are
>>>>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading 
>>>>>>>>>>>>>>>>>>>> model. If you do
>>>>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level 
>>>>>>>>>>>>>>>>>>>> consumer api for
>>>>>>>>>>>>>>>>>>>> people to get around whatever you have done.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The second reason was that the internal threading in
>>>>>>>>>>>>>>>>>>>> the client became quite complex. The answer with threading 
>>>>>>>>>>>>>>>>>>>> is always that
>>>>>>>>>>>>>>>>>>>> "it won't be complex this time", but it always is.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside
>>>>>>>>>>>>>>>>>>>> to coupling heartbeat with poll--the contract is that the 
>>>>>>>>>>>>>>>>>>>> application must
>>>>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. 
>>>>>>>>>>>>>>>>>>>> This allows the
>>>>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However 
>>>>>>>>>>>>>>>>>>>> it's important to
>>>>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do 
>>>>>>>>>>>>>>>>>>>> background polling a
>>>>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't 
>>>>>>>>>>>>>>>>>>>> shutdown. This leads to
>>>>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming 
>>>>>>>>>>>>>>>>>>>> because they have
>>>>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming 
>>>>>>>>>>>>>>>>>>>> partitions and
>>>>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far 
>>>>>>>>>>>>>>>>>>>> worse. If you allow
>>>>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and 
>>>>>>>>>>>>>>>>>>>> knows they aren't
>>>>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if 
>>>>>>>>>>>>>>>>>>>> you allows false
>>>>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone 
>>>>>>>>>>>>>>>>>>>> notices that a
>>>>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which 
>>>>>>>>>>>>>>>>>>>> point the data is
>>>>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have 
>>>>>>>>>>>>>>>>>>>> regular false
>>>>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed 
>>>>>>>>>>>>>>>>>>>> this in some depth
>>>>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the 
>>>>>>>>>>>>>>>>>>>> liveness notion tied
>>>>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition
>>>>>>>>>>>>>>>>>>>> of liveness.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus,
>>>>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    The current behavior is not intuitive. For
>>>>>>>>>>>>>>>>>>>>>    example, KafkaConsumer.poll drives everything. The 
>>>>>>>>>>>>>>>>>>>>> other methods like
>>>>>>>>>>>>>>>>>>>>>    subscribe, unsubscribe, seek, commit(async) don’t do 
>>>>>>>>>>>>>>>>>>>>> anything without a
>>>>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    The semantics of a commit() call should be
>>>>>>>>>>>>>>>>>>>>>    consistent between sync and async operations. 
>>>>>>>>>>>>>>>>>>>>> Currently, sync commit is a
>>>>>>>>>>>>>>>>>>>>>    blocking call which actually sends out an 
>>>>>>>>>>>>>>>>>>>>> OffsetCommitRequest and waits for
>>>>>>>>>>>>>>>>>>>>>    the response upon the user’s KafkaConsumer.commit 
>>>>>>>>>>>>>>>>>>>>> call. However, the async
>>>>>>>>>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the 
>>>>>>>>>>>>>>>>>>>>> OffsetCommitRequest.
>>>>>>>>>>>>>>>>>>>>>    The request itself is later sent out in the next poll. 
>>>>>>>>>>>>>>>>>>>>> The teams we talked
>>>>>>>>>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    Heartbeats are dependent on user application
>>>>>>>>>>>>>>>>>>>>>    behavior (i.e. user applications calling poll). This 
>>>>>>>>>>>>>>>>>>>>> can be a big problem
>>>>>>>>>>>>>>>>>>>>>    as we don’t control how different applications behave. 
>>>>>>>>>>>>>>>>>>>>> For example, we
>>>>>>>>>>>>>>>>>>>>>    might have an application which reads from Kafka and 
>>>>>>>>>>>>>>>>>>>>> writes to Espresso. If
>>>>>>>>>>>>>>>>>>>>>    Espresso is slow for whatever reason, then in 
>>>>>>>>>>>>>>>>>>>>> rebalances could happen.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current
>>>>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the 
>>>>>>>>>>>>>>>>>>>>> old simple
>>>>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with 
>>>>>>>>>>>>>>>>>>>>> raw protocols and
>>>>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for 
>>>>>>>>>>>>>>>>>>>>> users. However, for
>>>>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of 
>>>>>>>>>>>>>>>>>>>>> users), the
>>>>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level 
>>>>>>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while 
>>>>>>>>>>>>>>>>>>>>> KafkaConsumer
>>>>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and 
>>>>>>>>>>>>>>>>>>>>> is becoming
>>>>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the 
>>>>>>>>>>>>>>>>>>>>> javadoc growing
>>>>>>>>>>>>>>>>>>>>> bigger and bigger.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should
>>>>>>>>>>>>>>>>>>>>> take a step back and look at the big picture.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll 
>>>>>>>>>>>>>>>>>>>>> called by the user
>>>>>>>>>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the
>>>>>>>>>>>>>>>>>>>>> end user, this ends up making us educate users on NIO and 
>>>>>>>>>>>>>>>>>>>>> the consequences
>>>>>>>>>>>>>>>>>>>>> of not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with
>>>>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of
>>>>>>>>>>>>>>>>>>>>> KafkaConsumer:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume
>>>>>>>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management)
>>>>>>>>>>>>>>>>>>>>> or partitions(explicit group management)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't
>>>>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that 
>>>>>>>>>>>>>>>>>>>>> there should be no
>>>>>>>>>>>>>>>>>>>>> requirement from the end user to consistently 
>>>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll in order
>>>>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would 
>>>>>>>>>>>>>>>>>>>>> be better to split
>>>>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on 
>>>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks
>>>>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the
>>>>>>>>>>>>>>>>>>>>> end user.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to
>>>>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you 
>>>>>>>>>>>>>>>>>>>>> at LinkedIn on *July
>>>>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it
>>>>>>>>>>>>>>>>>>>>> to open source.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Ewen
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks,
>>>>>>>>> Neha
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks,
>>>>>>> Neha
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to