I like all these ideas.

Our convention is to keep method names declarative so it should probably be
  subscribe(List<String> topics, Callback c)
  assign(List<TopicPartition)

The javadoc would obviously have to clarify the relationship between a
subscribed topic and assigned partitions. Presumably unsubscribe/unassign
are unnecessary since this is just a matter of subscribing to the empty
list.

-Jay

On Fri, Jul 31, 2015 at 11:29 AM, Jason Gustafson <ja...@confluent.io>
wrote:

> I was thinking a little bit this morning about the subscription API and I
> have a few ideas on how to address some of the concerns about intuitiveness
> and exception handling.
>
> 1. Split the current notion of topic/partition subscription into
> subscription of topics and assignment of partitions. These concepts are
> pretty fundamentally different and I think at least some of the confusion
> about when subscriptions() can be used is caused by the fact that we
> overload the term. If instead that method is renamed to assignment(), then
> we are communicating to users that it is possible to have a subscription
> without an active assignment, which is not obvious with the current API.
> The code in fact already separates these concepts internally, so this would
> just expose it to the user.
>
> 2. Merge rebalance callback into a subscription callback and add method a
> way to handle errors. The consumer's current rebalance callback is
> basically invoked when a subscription "succeeds," so it seems a little
> weird to also provide a callback on subscription. Perhaps we can just take
> the rebalance callback out of configuration and have the user provide it on
> subscribe(). We can add a method to the callback to handle errors (e.g. for
> non-existing topics). Since the callback is provided at subscribe time, it
> should be clearer to the user that the assignment will not be ready
> immediately when subscribe returns. It's also arguably a little more
> natural to set this callback at subscription time rather than when the
> consumer is constructed.
>
> 3. Get rid of the additive subscribe methods and just use setSubscription
> which would clear the old subscription. After you start providing callbacks
> to subscribe, then the implementation starts to get tricky if each call to
> subscribe provides a separate callback. Instead, as Jay suggested, we could
> just provide a way to set the full list of subscriptions at once, and then
> there is only one callback to maintain.
>
> With these points, the API might look something like this:
>
> void setSubscription(List<String> topics, RebalanceCallback callback);
> void setAssignment(List<TopicPartition> partitions);
> List<String> subscription();
> List<TopicPartition> assignment();
>
> interface RebalanceCallback {
>   void onAssignment(List<TopicPartition> partitions);
>   void onRevocation(List<TopicPartition> partitions);
>
>   // handle non-existing topics, etc.
>   void onError(Exception e);
> }
>
> Any thoughts?
>
> -Jason
>
>
>
> On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <j...@confluent.io> wrote:
>
>> Hey Becket,
>>
>> Yeah the high-level belief here is that it is possible to give something
>> as high level as the existing "high level" consumer, but this is not likely
>> to be the end-all be-all of high-level interfaces for processing streams of
>> messages. For example neither of these interfaces handles the threading
>> model for the processing, which obviously is a fairly low-level
>> implementation detail left to the user in you proposal, the current code,
>> as well as the existing scala consumer.
>>
>> There will be many of these: the full-fledged stream processing
>> frameworks like Storm/Spark, scalaz streams, the RxJava stuff, a more
>> traditional message queue like "processor" interface, not to mention the
>> stuff we're trying to do with KIP-28. For these frameworks it will be quite
>> weird to add a bunch of new threads since they will want to dictate the
>> threading model.
>>
>> What will be a major failure though is if this client isn't low-level
>> enough and we need to introduce another layer underneath. This would happen
>> either because we dictate too much to make it usable for various
>> applications, frameworks, or use cases. This is the concern with dictating
>> threading and processing models.
>>
>> So to summarize the goal is to subsume the existing APIs, which I think
>> we all agree this does, and be a foundation on which to build other
>> abstractions.
>>
>> WRT KIP-28, I think it is quite general and if we do that right it will
>> subsume a lot of the higher level processing and will give a full threaded
>> processing model to the user.
>>
>>
>> -Jay
>>
>>
>> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>>
>>> Thanks for the comments Jason and Jay.
>>>
>>> Jason, I had the same concern for producer's callback as well before,
>>> but it seems to be fine from some callbacks I wrote - user can always pass
>>> in object in the constructor if necessary for synchronization.
>>>
>>> Jay, I agree that the current API might be fine for people who wants to
>>> wrap it up. But I thought the new consumer was supposed to be a combination
>>> of old high and low level consumer, which means it should be able to be
>>> used as is, just like producer. If KafkaConsumer is designed to be wrapped
>>> up for use, then the question becomes whether Kafka will provide a decent
>>> wrapper or not? Neha mentioned that KIP-28 will address the users who only
>>> care about data. Would that be the wrapper provided by Kafka? I am not sure
>>> if that is sufficient though because the processor is highly abstracted,
>>> and might only meet the static data stream requirement as I listed in the
>>> grid. For users who need something from the other grids, are we going to
>>> have another wrapper? Or are we expecting all the user to write their own
>>> wrapper for KafkaConsumer? Some other comments are in line.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <j...@confluent.io> wrote:
>>>
>>>> Some comments on the proposal:
>>>>
>>>> I think we are conflating a number of things that should probably be
>>>> addressed individually because they are unrelated. My past experience is
>>>> that this always makes progress hard. The more we can pick apart these
>>>> items the better:
>>>>
>>>>    1. threading model
>>>>    2. blocking vs non-blocking semantics
>>>>    3. missing apis
>>>>    4. missing javadoc and other api surprises
>>>>    5. Throwing exceptions.
>>>>
>>>> The missing APIs are getting added independently. Some like your
>>>> proposed offsetByTime where things we agreed to hold off on for the first
>>>> release and do when we'd thought it through. If there are uses for it now
>>>> we can accelerate. I think each of these is really independent, we know
>>>> there are things that need to be added but lumping them all into one
>>>> discussion will be confusing.
>>>>
>>>> WRT throwing exceptions the policy is to throw exceptions that are
>>>> unrecoverable and handle and log other exceptions that are transient. That
>>>> policy makes sense if you go through the thought exercise of "what will the
>>>> user do if i throw this exception to them" if they have no other rational
>>>> response but to retry (and if failing to anticipate and retry with that
>>>> exception will kill their program) . You can argue whether the topic not
>>>> existing is transient or not, unfortunately the way we did auto-creation
>>>> makes it transient if you are in "auto create mode" and non-transient
>>>> otherwise (ick!). In any case this is an orthogonal discussion to
>>>> everything else. I think the policy is right and if we don't conform to it
>>>> in some way that is really an independent bug/discussion.
>>>>
>>> Agreed we can discuss about them separately.
>>>
>>>>
>>>> I suggest we focus on threading and the current event-loop style of api
>>>> design since I think that is really the crux.
>>>>
>>>> The analogy between the producer threading model and the consumer model
>>>> actually doesn't work for me. The goal of the producer is actually to take
>>>> requests from many many user threads and shove them into a single buffer
>>>> for batching. So the threading model isn't the 1:1 threads you describe it
>>>> is N:1.The goal of the consumer is to support single-threaded processing.
>>>> This is what drives the difference. Saying that the producer has N:1
>>>> threads therefore for the consumer should have 1:1 threads instead of just
>>>> 1 thread doesn't make sense any more then an analogy to the brokers
>>>> threading model would--the problem we're solving is totally different.
>>>>
>>> I think the ultimate goal for producer and consumer are still allowing
>>> user to send/receive data in parallel. In producer we picked the solution
>>> of one-producer-serving-multiple-threads, and in consumer we picked
>>> multiple-single-threaded-consumers instead of
>>> single-consumer-serving-multiple threads. And we believe people can always
>>> implement the latter with the former. I think this is a reasonable
>>> decision. However, there are also reasonable concerns over the
>>> multiple-single-threaded-consumers solution which is that the single-thread
>>> might have to be a dedicate polling thread in many cases which pushes user
>>> towards the other solution - i.e. implementing a
>>> single-thread-consumer-serving-multiple-threads wrapper. From what we hear,
>>> it seems to be a quite common concern for most of the users we talked to.
>>> Plus the adoption bar of the consumer will be much higher because user will
>>> have to understand some of the details of the things they don't care as
>>> listed in the grid.
>>> The analogy between producer/consumer is intended to show that a
>>> separate polling thread will solve the concerns we have.
>>>
>>>>
>>>>
>>> I think ultimately though what you need to think about is, does an event
>>>> loop style of API make sense? That is the source of all the issues you
>>>> describe. This style of API is incredibly prevalent from unix select to
>>>> GUIs to node.js. It's a great way to model multiple channels of messages
>>>> coming in. It is a fantastic style for event processing. Programmers
>>>> understand this style of api though I would agree it is unusual compared to
>>>> blocking apis. But it is is a single threaded processing model. The current
>>>> approach is basically a pure event loop with some convenience methods that
>>>> are effectively "poll until X is complete".
>>>>
>>>> I think basically all the confusion you are describing comes from not
>>>> documenting/expecting an event loop. The "if you don't call poll nothing
>>>> happens" point is basically this. It's an event loop. You have to loop. You
>>>> can't not call poll. The docs don't cover this right now, perhaps. I think
>>>> if they do it's not unreasonable behavior.
>>>>
>>> I'm not sure if I understand the event-loop correctly and honestly I did
>>> not think about it clearly before. My understanding is that an even-loop
>>> model means a single listener thread, but there can be multiple event
>>> generator threads. The downside is that the listener thread has to be fast
>>> and very careful about blocking. If we look at the consumer, the current
>>> model is the caller thread itself act as both event generator and listener.
>>> As a generator, it generates different task by calling the convenience
>>> methods. As a listener, it listens to the messages on broker and also the
>>> tasks generated by itself. So in our proposal, we are not changing the
>>> event-loop model here just separated the event generator and event
>>> listener. It looks to me that the underlying execution thread follows the
>>> event-loop model, the special thing might be it is not only listening to
>>> the messages from broker, but also listening to the tasks from the user
>>> thread. This is essentially the thing a consumer has to do - interact with
>>> both server and user.
>>>
>>>>
>>>> If we want to move away from an event loop I'm not sure *any* aspect
>>>> of the current event loop style of api makes sense any more. I am not
>>>> totally married to event loops, but i do think what we have gives an
>>>> elegant way of implementing any higher level abstractions that would fully
>>>> implement the user's parallelism model. I don't want to go rethink
>>>> everything but I do think a half-way implementation that is event loop +
>>>> background threads is likely going to be icky.
>>>>
>>> We brought this up before to change the consumer.poll() to
>>> consumer.consume(). And did not do so simply because we wanted to less
>>> change in API... I might be crazy but can we think of the proposed model as
>>> processing thread + event-loop instead, rather than event-loop + background
>>> thread?
>>>
>>>>
>>>> WRT making it configurable whether liveness means "actually consuming"
>>>> or "background thread running" I would suggest that that is really the
>>>> worst outcome. These type of "modes" that are functionally totally
>>>> different are just awful from a documentation, testing, usability, etc pov.
>>>> I would strongly prefer we pick either of these, document it, and make it
>>>> work well rather than trying to do both.
>>>>
>>> Previously I thought this was the major benefit we wanted from a single
>>> threaded model, personally I don't have a strong preference on this. So I
>>> am OK with either way.
>>>
>>>>
>>>> -Jay
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io>
>>>> wrote:
>>>>
>>>>> Works now. Thanks Becket!
>>>>>
>>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com>
>>>>> wrote:
>>>>>
>>>>>> Ah... My bad, forgot to change the URL link for pictures.
>>>>>> Thanks for the quick response, Neha. It should be fixed now, can you
>>>>>> try again?
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images
>>>>>>> that the wiki refers to, but none loaded for me. Just making sure if its
>>>>>>> just me or can everyone not see the pictures?
>>>>>>>
>>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I agree with Ewen that a single threaded model will be tricky to
>>>>>>>> implement the same conventional semantic of async or Future. We just
>>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn on 
>>>>>>>> the
>>>>>>>> new consumer API and threading model.
>>>>>>>>
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>>>>>>>
>>>>>>>> We were trying to see:
>>>>>>>> 1. If we can use some kind of methodology to help us think about
>>>>>>>> what API we want to provide to user for different use cases.
>>>>>>>> 2. What is the pros and cons of current single threaded model. Is
>>>>>>>> there a way that we can maintain the benefits while solve the issues 
>>>>>>>> we are
>>>>>>>> facing now with single threaded model.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>
>>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>>>>>>>> e...@confluent.io> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along
>>>>>>>>>> with returning future in the commit calls, i.e. something similar to:
>>>>>>>>>>
>>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>>>>>>>
>>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>>>>>>>> ConsumerCommitCallback callback);
>>>>>>>>>>
>>>>>>>>>> At that time I was slightly intending not to include the Future
>>>>>>>>>> besides adding the callback mainly because of the implementation 
>>>>>>>>>> complexity
>>>>>>>>>> I feel it could introduce along with the retry settings after looking
>>>>>>>>>> through the code base. I would happy to change my mind if we could 
>>>>>>>>>> propose
>>>>>>>>>> a prototype implementation that is simple enough.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> One of the reasons that interface ended up being difficult (or
>>>>>>>>> maybe impossible) to make work reasonably is because the consumer was
>>>>>>>>> thread-safe at the time. That made it impossible to know what should 
>>>>>>>>> be
>>>>>>>>> done when Future.get() is called -- should the implementation call 
>>>>>>>>> poll()
>>>>>>>>> itself, or would the fact that the user is calling get() imply that 
>>>>>>>>> there's
>>>>>>>>> a background thread running the poll() loop and we just need to wait 
>>>>>>>>> for it?
>>>>>>>>>
>>>>>>>>> The consumer is no longer thread safe, but I think the same
>>>>>>>>> problem remains because the expectation with Futures is that they are
>>>>>>>>> thread safe. Which means that even if the consumer isn't thread safe, 
>>>>>>>>> I
>>>>>>>>> would expect to be able to hand that Future off to some other thread, 
>>>>>>>>> have
>>>>>>>>> the second thread call get(), and then continue driving the poll loop 
>>>>>>>>> in my
>>>>>>>>> thread (which in turn would eventually resolve the Future).
>>>>>>>>>
>>>>>>>>> I quite dislike the sync/async enum. While both operations commit
>>>>>>>>> offsets, their semantics are so different that overloading a single 
>>>>>>>>> method
>>>>>>>>> with both is messy. That said, I don't think we should consider this 
>>>>>>>>> an
>>>>>>>>> inconsistency wrt the new producer API's use of Future because the 
>>>>>>>>> two APIs
>>>>>>>>> have a much more fundamental difference that justifies it: they have
>>>>>>>>> completely different threading and execution models.
>>>>>>>>>
>>>>>>>>> I think a Future-based API only makes sense if you can guarantee
>>>>>>>>> the operations that Futures are waiting on will continue to make 
>>>>>>>>> progress
>>>>>>>>> regardless of what the thread using the Future does. The producer API 
>>>>>>>>> makes
>>>>>>>>> that work by processing asynchronous requests in a background thread. 
>>>>>>>>> The
>>>>>>>>> new consumer does not, and so it becomes difficult/impossible to 
>>>>>>>>> implement
>>>>>>>>> the Future correctly. (Or, you have to make assumptions which break 
>>>>>>>>> other
>>>>>>>>> use cases; if you want to support the simple use case of just making a
>>>>>>>>> commit() synchronous by calling get(), the Future has to call poll()
>>>>>>>>> internally; but if you do that, then if any user ever wants to add
>>>>>>>>> synchronization to the consumer via some external mechanism, then the
>>>>>>>>> implementation of the Future's get() method will not be subject to 
>>>>>>>>> that
>>>>>>>>> synchronization and things will break).
>>>>>>>>>
>>>>>>>>> -Ewen
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Adi,
>>>>>>>>>>>
>>>>>>>>>>> When we designed the initial version, the producer API was still
>>>>>>>>>>> changing. I thought about adding the Future and then just didn't 
>>>>>>>>>>> get to it.
>>>>>>>>>>> I agree that we should look into adding it for consistency.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Neha
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>>>>>>>> aaurad...@linkedin.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Great discussion everyone!
>>>>>>>>>>>>
>>>>>>>>>>>> One general comment on the sync/async API's on the new consumer.
>>>>>>>>>>>>  I think the producer tackles sync vs async API's well. For
>>>>>>>>>>>> API's that can either be sync or async, can we simply return a 
>>>>>>>>>>>> future? That
>>>>>>>>>>>> seems more elegant for the API's that make sense either in both 
>>>>>>>>>>>> flavors.
>>>>>>>>>>>> From the users perspective, it is more consistent with the new 
>>>>>>>>>>>> producer.
>>>>>>>>>>>> One easy example is the commit call with the CommitType enum.. we 
>>>>>>>>>>>> can make
>>>>>>>>>>>> that call always async and users can block on the future if they 
>>>>>>>>>>>> want to
>>>>>>>>>>>> make sure their offsets are committed.
>>>>>>>>>>>>
>>>>>>>>>>>> Aditya
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <
>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the great responses, everyone!
>>>>>>>>>>>>>
>>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring up
>>>>>>>>>>>>> old high level consumers, the teams we spoke to were actually not 
>>>>>>>>>>>>> the types
>>>>>>>>>>>>> of services that simply wanted an easy way to get 
>>>>>>>>>>>>> ConsumerRecords. We spoke
>>>>>>>>>>>>> to infrastructure teams that I would consider to be closer to the
>>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's 
>>>>>>>>>>>>> level of
>>>>>>>>>>>>> granularity. Some would use auto group management. Some would use 
>>>>>>>>>>>>> explicit
>>>>>>>>>>>>> group management. All of them would turn off auto offset commits. 
>>>>>>>>>>>>> Yes, the
>>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, but 
>>>>>>>>>>>>> this is
>>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I 
>>>>>>>>>>>>> don't really
>>>>>>>>>>>>> think the feedback received was about the simpler times or wanting
>>>>>>>>>>>>> additional higher-level clients.
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Onur
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <
>>>>>>>>>>>>> ja...@confluent.io> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we
>>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since 
>>>>>>>>>>>>>> this generally
>>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's 
>>>>>>>>>>>>>> currently
>>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be 
>>>>>>>>>>>>>> nice to
>>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown 
>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the 
>>>>>>>>>>>>>> timeout to
>>>>>>>>>>>>>> reassign partitions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Jason
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hey Kartik,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the
>>>>>>>>>>>>>>> common case.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>>>>>>>> 1. Default the timeout high
>>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When we were doing the consumer design we discussed this
>>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that 
>>>>>>>>>>>>>>> defaulting to a
>>>>>>>>>>>>>>> high timeout was actually better. This means it takes a little 
>>>>>>>>>>>>>>> longer to
>>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and 
>>>>>>>>>>>>>>> people who want
>>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better 
>>>>>>>>>>>>>>> than having
>>>>>>>>>>>>>>> the failure detection not really cover the consumption and just 
>>>>>>>>>>>>>>> be a
>>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the 
>>>>>>>>>>>>>>> GC problem
>>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some 
>>>>>>>>>>>>>>> sense a better
>>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of problems 
>>>>>>>>>>>>>>> crop up when
>>>>>>>>>>>>>>> you have an inactive consumer with an active background thread 
>>>>>>>>>>>>>>> (as today).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When we had the discussion I think what we realized was that
>>>>>>>>>>>>>>> most people who were worried about the timeout where imagining 
>>>>>>>>>>>>>>> a very low
>>>>>>>>>>>>>>> default (500ms) say. But in fact just setting this to 60 
>>>>>>>>>>>>>>> seconds or higher
>>>>>>>>>>>>>>> as a default would be okay, this adds to the failure detection 
>>>>>>>>>>>>>>> time but
>>>>>>>>>>>>>>> only apps that care about this need to tune. This should 
>>>>>>>>>>>>>>> largely eliminate
>>>>>>>>>>>>>>> false positives since after all if you disappear for 60 seconds 
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> actually starts to be more of a true positive, even if you come 
>>>>>>>>>>>>>>> back... :-)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> adding the open source alias.  This email started off as a
>>>>>>>>>>>>>>>> broader discussion around the new consumer.  I was zooming 
>>>>>>>>>>>>>>>> into only the
>>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the 
>>>>>>>>>>>>>>>> heartbeats.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the
>>>>>>>>>>>>>>>> problem).  Monitoring the lag is important as it is the 
>>>>>>>>>>>>>>>> primary way to tell
>>>>>>>>>>>>>>>> if the application is wedged.  There might be other metrics 
>>>>>>>>>>>>>>>> which can
>>>>>>>>>>>>>>>> possibly capture the same essence. Yes the lag is at the 
>>>>>>>>>>>>>>>> consumer group
>>>>>>>>>>>>>>>> level, but you can tell that one of the consumers is messed up 
>>>>>>>>>>>>>>>> if one of
>>>>>>>>>>>>>>>> the partitions in the application start generating lag and 
>>>>>>>>>>>>>>>> others are good
>>>>>>>>>>>>>>>> for e.g.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is that
>>>>>>>>>>>>>>>> in the old consumer most customers don't have to worry about 
>>>>>>>>>>>>>>>> unnecessary
>>>>>>>>>>>>>>>> rebalances and most of the things that they do in their app 
>>>>>>>>>>>>>>>> doesn't have an
>>>>>>>>>>>>>>>> impact on the session timeout..  (i.e. the only thing that 
>>>>>>>>>>>>>>>> causes
>>>>>>>>>>>>>>>> rebalances is when the GC is out of whack).    For the handful 
>>>>>>>>>>>>>>>> of customers
>>>>>>>>>>>>>>>> who are impacted by GC related rebalances, i would imagine 
>>>>>>>>>>>>>>>> that all of them
>>>>>>>>>>>>>>>> would really want us to make the system more resilient.    I 
>>>>>>>>>>>>>>>> agree that the
>>>>>>>>>>>>>>>> GC problem can't be solved easily in the java client, however 
>>>>>>>>>>>>>>>> it appears
>>>>>>>>>>>>>>>> that now we would be expecting the consuming applications to 
>>>>>>>>>>>>>>>> be even more
>>>>>>>>>>>>>>>> careful with ongoing tuning of the timeouts.  At LinkedIn, we 
>>>>>>>>>>>>>>>> have seen
>>>>>>>>>>>>>>>> that most kafka applications don't have much of a clue about 
>>>>>>>>>>>>>>>> configuring
>>>>>>>>>>>>>>>> the timeouts and just end up calling the Kafka team when their 
>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>> sees rebalances.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is
>>>>>>>>>>>>>>>> that we have to make sure that people don't set a poll timeout 
>>>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>>>> larger than the session timeout.   If we had a notion of 
>>>>>>>>>>>>>>>> implicit
>>>>>>>>>>>>>>>> heartbeats then we could also automatically make this work for 
>>>>>>>>>>>>>>>> consumers by
>>>>>>>>>>>>>>>> sending hearbeats at the appropriate interval even though the 
>>>>>>>>>>>>>>>> customers
>>>>>>>>>>>>>>>> want to do a long poll.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we
>>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the 
>>>>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Would love to hear how other people think about this
>>>>>>>>>>>>>>>> subject ?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <
>>>>>>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is that
>>>>>>>>>>>>>>>>> there are many ways the application's message processing 
>>>>>>>>>>>>>>>>> could fail and we
>>>>>>>>>>>>>>>>> wouldn't be able to model all of those in the consumer's 
>>>>>>>>>>>>>>>>> failure detection
>>>>>>>>>>>>>>>>> mechanism. So we should try to model as much of it as we can 
>>>>>>>>>>>>>>>>> so the
>>>>>>>>>>>>>>>>> consumer's failure detection is meaningful.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the
>>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept 
>>>>>>>>>>>>>>>>> and the problem
>>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of 
>>>>>>>>>>>>>>>>> individual
>>>>>>>>>>>>>>>>> consumer instances.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The 
>>>>>>>>>>>>>>>>> dilemma then is who
>>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but 
>>>>>>>>>>>>>>>>> the application
>>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for 
>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for 
>>>>>>>>>>>>>>>>> the user to
>>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too 
>>>>>>>>>>>>>>>>> often. The
>>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be *exactly*
>>>>>>>>>>>>>>>>> 99tile of processing latency but could be in the ballpark + 
>>>>>>>>>>>>>>>>> an error delta.
>>>>>>>>>>>>>>>>> The error delta is the application owner's acceptable risk 
>>>>>>>>>>>>>>>>> threshold during
>>>>>>>>>>>>>>>>> which they would be ok if the application remains part of the 
>>>>>>>>>>>>>>>>> group despite
>>>>>>>>>>>>>>>>> being dead. It is ultimately a tradeoff between operational 
>>>>>>>>>>>>>>>>> ease and more
>>>>>>>>>>>>>>>>> accurate failure detection.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range from
>>>>>>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most
>>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range 
>>>>>>>>>>>>>>>>> and there are
>>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC falls 
>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be predicted, 
>>>>>>>>>>>>>>>>> so has to be
>>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to observe 
>>>>>>>>>>>>>>>>> what this
>>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the 
>>>>>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>>>>> timeouts.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Since this is not something that can be automated, this is
>>>>>>>>>>>>>>>>> a config that the application owner has to set based on the 
>>>>>>>>>>>>>>>>> expected
>>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads 
>>>>>>>>>>>>>>>>> to ending up
>>>>>>>>>>>>>>>>> with bad consumption semantics where the application process 
>>>>>>>>>>>>>>>>> continues to
>>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming since 
>>>>>>>>>>>>>>>>> it has halted
>>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to 
>>>>>>>>>>>>>>>>> express that in
>>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the 
>>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>>> has to go through the process of measuring and then defining 
>>>>>>>>>>>>>>>>> this "max".
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The reverse where they don't do this and the application
>>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing 
>>>>>>>>>>>>>>>>> and frustrating
>>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as 
>>>>>>>>>>>>>>>>> long as the
>>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it 
>>>>>>>>>>>>>>>>> should be easy
>>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or 
>>>>>>>>>>>>>>>>> not and how
>>>>>>>>>>>>>>>>> that should be tuned.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source
>>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is 
>>>>>>>>>>>>>>>>> unblocked since you
>>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points
>>>>>>>>>>>>>>>>>> you discuss are all very valid.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue.
>>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry apps 
>>>>>>>>>>>>>>>>>> that get hit
>>>>>>>>>>>>>>>>>> by it.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are
>>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also 
>>>>>>>>>>>>>>>>>> real.  If the app
>>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is 
>>>>>>>>>>>>>>>>>> healthy is surely
>>>>>>>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the 
>>>>>>>>>>>>>>>>>> app is
>>>>>>>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>>>>>>>> In other cases the app might have even died in which case
>>>>>>>>>>>>>>>>>> this discussion is moot.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to
>>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time 
>>>>>>>>>>>>>>>>>> they take to
>>>>>>>>>>>>>>>>>> process events after every poll.   This turns out is a 
>>>>>>>>>>>>>>>>>> nontrivial thing to
>>>>>>>>>>>>>>>>>> do for an application todo. To start with when an 
>>>>>>>>>>>>>>>>>> application is new their
>>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on 
>>>>>>>>>>>>>>>>>> synthetic data.
>>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in 
>>>>>>>>>>>>>>>>>> production.  Once
>>>>>>>>>>>>>>>>>> the app is in production their processing latencies will 
>>>>>>>>>>>>>>>>>> potentially vary
>>>>>>>>>>>>>>>>>> over time.  It is extremely unlikely that the application 
>>>>>>>>>>>>>>>>>> owner does a
>>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over time 
>>>>>>>>>>>>>>>>>> and readjust
>>>>>>>>>>>>>>>>>> the settings.  Often times the latencies vary because of 
>>>>>>>>>>>>>>>>>> variance is other
>>>>>>>>>>>>>>>>>> services that are called by the consumer as part of 
>>>>>>>>>>>>>>>>>> processing the events.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events
>>>>>>>>>>>>>>>>>> and writes to Kafka.  With quotas the write latencies to 
>>>>>>>>>>>>>>>>>> kafka could range
>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds.  
>>>>>>>>>>>>>>>>>> As the scale of
>>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' 
>>>>>>>>>>>>>>>>>> could now get
>>>>>>>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the 
>>>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a 
>>>>>>>>>>>>>>>>>> potential outage
>>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us
>>>>>>>>>>>>>>>>>> to take care of this in the linkedin wrapper.  Whereby we 
>>>>>>>>>>>>>>>>>> would keep
>>>>>>>>>>>>>>>>>> calling poll on a separate thread periodically and enqueue 
>>>>>>>>>>>>>>>>>> the messages.
>>>>>>>>>>>>>>>>>> When the queue is full we would call pause().
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I
>>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as 
>>>>>>>>>>>>>>>>>> every major
>>>>>>>>>>>>>>>>>> customer will eventually be pained by it.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges
>>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or
>>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group 
>>>>>>>>>>>>>>>>>> management stuff
>>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is 
>>>>>>>>>>>>>>>>>> already a big
>>>>>>>>>>>>>>>>>> effort.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of
>>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when people 
>>>>>>>>>>>>>>>>>> are surprised
>>>>>>>>>>>>>>>>>> because we haven't labeled something that will be 
>>>>>>>>>>>>>>>>>> unintuitive. But the
>>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in 
>>>>>>>>>>>>>>>>>> programming
>>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond 
>>>>>>>>>>>>>>>>>> people if explained
>>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if we 
>>>>>>>>>>>>>>>>>> don't say
>>>>>>>>>>>>>>>>>> which is which any scheme will be confusing).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has
>>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people had 
>>>>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking 
>>>>>>>>>>>>>>>>>> iterator,
>>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, 
>>>>>>>>>>>>>>>>>> etc  have all
>>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that 
>>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people 
>>>>>>>>>>>>>>>>>> want to try to
>>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've 
>>>>>>>>>>>>>>>>>> heard not to use
>>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to lack 
>>>>>>>>>>>>>>>>>> of features).
>>>>>>>>>>>>>>>>>> It's important to have that context when people need to 
>>>>>>>>>>>>>>>>>> switch and they say
>>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Let me give some context related to your points, based on
>>>>>>>>>>>>>>>>>> our previous discussions:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was
>>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the 
>>>>>>>>>>>>>>>>>> lowest level
>>>>>>>>>>>>>>>>>> client. There are many, many possible higher level 
>>>>>>>>>>>>>>>>>> processing abstractions.
>>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level 
>>>>>>>>>>>>>>>>>> client was that
>>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with 
>>>>>>>>>>>>>>>>>> things that are
>>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading 
>>>>>>>>>>>>>>>>>> model. If you do
>>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level 
>>>>>>>>>>>>>>>>>> consumer api for
>>>>>>>>>>>>>>>>>> people to get around whatever you have done.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The second reason was that the internal threading in the
>>>>>>>>>>>>>>>>>> client became quite complex. The answer with threading is 
>>>>>>>>>>>>>>>>>> always that "it
>>>>>>>>>>>>>>>>>> won't be complex this time", but it always is.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to
>>>>>>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the 
>>>>>>>>>>>>>>>>>> application must
>>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. This 
>>>>>>>>>>>>>>>>>> allows the
>>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However 
>>>>>>>>>>>>>>>>>> it's important to
>>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do 
>>>>>>>>>>>>>>>>>> background polling a
>>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. 
>>>>>>>>>>>>>>>>>> This leads to
>>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because 
>>>>>>>>>>>>>>>>>> they have
>>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming 
>>>>>>>>>>>>>>>>>> partitions and
>>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. 
>>>>>>>>>>>>>>>>>> If you allow
>>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and 
>>>>>>>>>>>>>>>>>> knows they aren't
>>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if 
>>>>>>>>>>>>>>>>>> you allows false
>>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone 
>>>>>>>>>>>>>>>>>> notices that a
>>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which point 
>>>>>>>>>>>>>>>>>> the data is
>>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have 
>>>>>>>>>>>>>>>>>> regular false
>>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this 
>>>>>>>>>>>>>>>>>> in some depth
>>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the 
>>>>>>>>>>>>>>>>>> liveness notion tied
>>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition
>>>>>>>>>>>>>>>>>> of liveness.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus,
>>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll drives everything. The other methods 
>>>>>>>>>>>>>>>>>>> like subscribe,
>>>>>>>>>>>>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything 
>>>>>>>>>>>>>>>>>>> without a
>>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    The semantics of a commit() call should be
>>>>>>>>>>>>>>>>>>>    consistent between sync and async operations. Currently, 
>>>>>>>>>>>>>>>>>>> sync commit is a
>>>>>>>>>>>>>>>>>>>    blocking call which actually sends out an 
>>>>>>>>>>>>>>>>>>> OffsetCommitRequest and waits for
>>>>>>>>>>>>>>>>>>>    the response upon the user’s KafkaConsumer.commit call. 
>>>>>>>>>>>>>>>>>>> However, the async
>>>>>>>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the 
>>>>>>>>>>>>>>>>>>> OffsetCommitRequest.
>>>>>>>>>>>>>>>>>>>    The request itself is later sent out in the next poll. 
>>>>>>>>>>>>>>>>>>> The teams we talked
>>>>>>>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    Heartbeats are dependent on user application
>>>>>>>>>>>>>>>>>>>    behavior (i.e. user applications calling poll). This can 
>>>>>>>>>>>>>>>>>>> be a big problem
>>>>>>>>>>>>>>>>>>>    as we don’t control how different applications behave. 
>>>>>>>>>>>>>>>>>>> For example, we
>>>>>>>>>>>>>>>>>>>    might have an application which reads from Kafka and 
>>>>>>>>>>>>>>>>>>> writes to Espresso. If
>>>>>>>>>>>>>>>>>>>    Espresso is slow for whatever reason, then in rebalances 
>>>>>>>>>>>>>>>>>>> could happen.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current
>>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the 
>>>>>>>>>>>>>>>>>>> old simple
>>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with 
>>>>>>>>>>>>>>>>>>> raw protocols and
>>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for 
>>>>>>>>>>>>>>>>>>> users. However, for
>>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of 
>>>>>>>>>>>>>>>>>>> users), the
>>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level 
>>>>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while 
>>>>>>>>>>>>>>>>>>> KafkaConsumer
>>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and 
>>>>>>>>>>>>>>>>>>> is becoming
>>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the 
>>>>>>>>>>>>>>>>>>> javadoc growing
>>>>>>>>>>>>>>>>>>> bigger and bigger.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should take
>>>>>>>>>>>>>>>>>>> a step back and look at the big picture.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called 
>>>>>>>>>>>>>>>>>>> by the user
>>>>>>>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the
>>>>>>>>>>>>>>>>>>> end user, this ends up making us educate users on NIO and 
>>>>>>>>>>>>>>>>>>> the consequences
>>>>>>>>>>>>>>>>>>> of not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with
>>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume
>>>>>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or
>>>>>>>>>>>>>>>>>>> partitions(explicit group management)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't
>>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that there 
>>>>>>>>>>>>>>>>>>> should be no
>>>>>>>>>>>>>>>>>>> requirement from the end user to consistently 
>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll in order
>>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be 
>>>>>>>>>>>>>>>>>>> better to split
>>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on 
>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks
>>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the
>>>>>>>>>>>>>>>>>>> end user.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to
>>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you at 
>>>>>>>>>>>>>>>>>>> LinkedIn on *July
>>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it
>>>>>>>>>>>>>>>>>>> to open source.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Neha
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks,
>>>>>>>>> Ewen
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks,
>>>>>>> Neha
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Neha
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to