The images load for me well.

On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> wrote:

> Thanks Becket. Quick comment - there seem to be a bunch of images that the
> wiki refers to, but none loaded for me. Just making sure if its just me or
> can everyone not see the pictures?
>
> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>
>> I agree with Ewen that a single threaded model will be tricky to
>> implement the same conventional semantic of async or Future. We just
>> drafted the following wiki which explains our thoughts in LinkedIn on the
>> new consumer API and threading model.
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>
>> We were trying to see:
>> 1. If we can use some kind of methodology to help us think about what API
>> we want to provide to user for different use cases.
>> 2. What is the pros and cons of current single threaded model. Is there a
>> way that we can maintain the benefits while solve the issues we are facing
>> now with single threaded model.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>> e...@confluent.io> wrote:
>>
>>>
>>>
>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>>> I think Ewen has proposed these APIs for using callbacks along with
>>>> returning future in the commit calls, i.e. something similar to:
>>>>
>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>
>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>> ConsumerCommitCallback callback);
>>>>
>>>> At that time I was slightly intending not to include the Future besides
>>>> adding the callback mainly because of the implementation complexity I feel
>>>> it could introduce along with the retry settings after looking through the
>>>> code base. I would happy to change my mind if we could propose a prototype
>>>> implementation that is simple enough.
>>>>
>>>>
>>> One of the reasons that interface ended up being difficult (or maybe
>>> impossible) to make work reasonably is because the consumer was thread-safe
>>> at the time. That made it impossible to know what should be done when
>>> Future.get() is called -- should the implementation call poll() itself, or
>>> would the fact that the user is calling get() imply that there's a
>>> background thread running the poll() loop and we just need to wait for it?
>>>
>>> The consumer is no longer thread safe, but I think the same problem
>>> remains because the expectation with Futures is that they are thread safe.
>>> Which means that even if the consumer isn't thread safe, I would expect to
>>> be able to hand that Future off to some other thread, have the second
>>> thread call get(), and then continue driving the poll loop in my thread
>>> (which in turn would eventually resolve the Future).
>>>
>>> I quite dislike the sync/async enum. While both operations commit
>>> offsets, their semantics are so different that overloading a single method
>>> with both is messy. That said, I don't think we should consider this an
>>> inconsistency wrt the new producer API's use of Future because the two APIs
>>> have a much more fundamental difference that justifies it: they have
>>> completely different threading and execution models.
>>>
>>> I think a Future-based API only makes sense if you can guarantee the
>>> operations that Futures are waiting on will continue to make progress
>>> regardless of what the thread using the Future does. The producer API makes
>>> that work by processing asynchronous requests in a background thread. The
>>> new consumer does not, and so it becomes difficult/impossible to implement
>>> the Future correctly. (Or, you have to make assumptions which break other
>>> use cases; if you want to support the simple use case of just making a
>>> commit() synchronous by calling get(), the Future has to call poll()
>>> internally; but if you do that, then if any user ever wants to add
>>> synchronization to the consumer via some external mechanism, then the
>>> implementation of the Future's get() method will not be subject to that
>>> synchronization and things will break).
>>>
>>> -Ewen
>>>
>>>
>>>> Guozhang
>>>>
>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io>
>>>> wrote:
>>>>
>>>>> Hey Adi,
>>>>>
>>>>> When we designed the initial version, the producer API was still
>>>>> changing. I thought about adding the Future and then just didn't get to 
>>>>> it.
>>>>> I agree that we should look into adding it for consistency.
>>>>>
>>>>> Thanks,
>>>>> Neha
>>>>>
>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>> aaurad...@linkedin.com> wrote:
>>>>>
>>>>>> Great discussion everyone!
>>>>>>
>>>>>> One general comment on the sync/async API's on the new consumer. I
>>>>>> think the producer tackles sync vs async API's well. For API's that can
>>>>>> either be sync or async, can we simply return a future? That seems more
>>>>>> elegant for the API's that make sense either in both flavors. From the
>>>>>> users perspective, it is more consistent with the new producer. One easy
>>>>>> example is the commit call with the CommitType enum.. we can make that 
>>>>>> call
>>>>>> always async and users can block on the future if they want to make sure
>>>>>> their offsets are committed.
>>>>>>
>>>>>> Aditya
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <okara...@linkedin.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the great responses, everyone!
>>>>>>>
>>>>>>> To expand a tiny bit on my initial post: while I did bring up old
>>>>>>> high level consumers, the teams we spoke to were actually not the types 
>>>>>>> of
>>>>>>> services that simply wanted an easy way to get ConsumerRecords. We 
>>>>>>> spoke to
>>>>>>> infrastructure teams that I would consider to be closer to the 
>>>>>>> "power-user"
>>>>>>> end of the spectrum and would want KafkaConsumer's level of granularity.
>>>>>>> Some would use auto group management. Some would use explicit group
>>>>>>> management. All of them would turn off auto offset commits. Yes, the 
>>>>>>> Samza
>>>>>>> team had prior experience with the old SimpleConsumer, but this is the
>>>>>>> first kafka consumer being used by the Databus team. So I don't really
>>>>>>> think the feedback received was about the simpler times or wanting
>>>>>>> additional higher-level clients.
>>>>>>>
>>>>>>> - Onur
>>>>>>>
>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <ja...@confluent.io
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> I think if we recommend a longer session timeout, then we should
>>>>>>>> expose the heartbeat frequency in configuration since this generally
>>>>>>>> controls how long normal rebalances will take. I think it's currently
>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be nice 
>>>>>>>> to
>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown of a
>>>>>>>> consumer. Then the coordinator doesn't have to wait for the timeout to
>>>>>>>> reassign partitions.
>>>>>>>>
>>>>>>>> -Jason
>>>>>>>>
>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey Kartik,
>>>>>>>>>
>>>>>>>>> Totally agree we don't want people tuning timeouts in the common
>>>>>>>>> case.
>>>>>>>>>
>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>> 1. Default the timeout high
>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>
>>>>>>>>> When we were doing the consumer design we discussed this tradeoff
>>>>>>>>> and I think the conclusion we came to was that defaulting to a high 
>>>>>>>>> timeout
>>>>>>>>> was actually better. This means it takes a little longer to detect a
>>>>>>>>> failure, but usually that is not a big problem and people who want 
>>>>>>>>> faster
>>>>>>>>> failure detection can tune it down. This seemed better than having the
>>>>>>>>> failure detection not really cover the consumption and just be a 
>>>>>>>>> background
>>>>>>>>> ping. The two reasons where (a) you still have the GC problem even 
>>>>>>>>> for the
>>>>>>>>> background thread, (b) consumption is in some sense a better 
>>>>>>>>> definition of
>>>>>>>>> an active healthy consumer and a lot of problems crop up when you 
>>>>>>>>> have an
>>>>>>>>> inactive consumer with an active background thread (as today).
>>>>>>>>>
>>>>>>>>> When we had the discussion I think what we realized was that most
>>>>>>>>> people who were worried about the timeout where imagining a very low
>>>>>>>>> default (500ms) say. But in fact just setting this to 60 seconds or 
>>>>>>>>> higher
>>>>>>>>> as a default would be okay, this adds to the failure detection time 
>>>>>>>>> but
>>>>>>>>> only apps that care about this need to tune. This should largely 
>>>>>>>>> eliminate
>>>>>>>>> false positives since after all if you disappear for 60 seconds that
>>>>>>>>> actually starts to be more of a true positive, even if you come 
>>>>>>>>> back... :-)
>>>>>>>>>
>>>>>>>>> -Jay
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>
>>>>>>>>>> adding the open source alias.  This email started off as a
>>>>>>>>>> broader discussion around the new consumer.  I was zooming into only 
>>>>>>>>>> the
>>>>>>>>>> aspect of poll() being the only mechanism for driving the heartbeats.
>>>>>>>>>>
>>>>>>>>>> Yes the lag is the effect of the problem (not the problem).
>>>>>>>>>> Monitoring the lag is important as it is the primary way to tell if 
>>>>>>>>>> the
>>>>>>>>>> application is wedged.  There might be other metrics which can 
>>>>>>>>>> possibly
>>>>>>>>>> capture the same essence. Yes the lag is at the consumer group 
>>>>>>>>>> level, but
>>>>>>>>>> you can tell that one of the consumers is messed up if one of the
>>>>>>>>>> partitions in the application start generating lag and others are 
>>>>>>>>>> good for
>>>>>>>>>> e.g.
>>>>>>>>>>
>>>>>>>>>> Monitoring aside, I think the main point of concern is that in
>>>>>>>>>> the old consumer most customers don't have to worry about unnecessary
>>>>>>>>>> rebalances and most of the things that they do in their app doesn't 
>>>>>>>>>> have an
>>>>>>>>>> impact on the session timeout..  (i.e. the only thing that causes
>>>>>>>>>> rebalances is when the GC is out of whack).    For the handful of 
>>>>>>>>>> customers
>>>>>>>>>> who are impacted by GC related rebalances, i would imagine that all 
>>>>>>>>>> of them
>>>>>>>>>> would really want us to make the system more resilient.    I agree 
>>>>>>>>>> that the
>>>>>>>>>> GC problem can't be solved easily in the java client, however it 
>>>>>>>>>> appears
>>>>>>>>>> that now we would be expecting the consuming applications to be even 
>>>>>>>>>> more
>>>>>>>>>> careful with ongoing tuning of the timeouts.  At LinkedIn, we have 
>>>>>>>>>> seen
>>>>>>>>>> that most kafka applications don't have much of a clue about 
>>>>>>>>>> configuring
>>>>>>>>>> the timeouts and just end up calling the Kafka team when their 
>>>>>>>>>> application
>>>>>>>>>> sees rebalances.
>>>>>>>>>>
>>>>>>>>>> The other side effect of poll driving the heartbeats is that we
>>>>>>>>>> have to make sure that people don't set a poll timeout that is 
>>>>>>>>>> larger than
>>>>>>>>>> the session timeout.   If we had a notion of implicit heartbeats 
>>>>>>>>>> then we
>>>>>>>>>> could also automatically make this work for consumers by sending 
>>>>>>>>>> hearbeats
>>>>>>>>>> at the appropriate interval even though the customers want to do a 
>>>>>>>>>> long
>>>>>>>>>> poll.
>>>>>>>>>>
>>>>>>>>>> We could surely work around this in LinkedIn if either we have
>>>>>>>>>> the Pause() api or an explicit HeartBeat() api on the consumer.
>>>>>>>>>>
>>>>>>>>>> Would love to hear how other people think about this subject ?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Kartik
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Agree with the dilemma you are pointing out, which is that there
>>>>>>>>>>> are many ways the application's message processing could fail and we
>>>>>>>>>>> wouldn't be able to model all of those in the consumer's failure 
>>>>>>>>>>> detection
>>>>>>>>>>> mechanism. So we should try to model as much of it as we can so the
>>>>>>>>>>> consumer's failure detection is meaningful.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Point being that the only absolute way to really detect that an
>>>>>>>>>>>> app is healthy is to monitor lag. If the lag increases then for 
>>>>>>>>>>>> sure
>>>>>>>>>>>> something is wrong.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The lag is merely the effect of the problem, not the problem
>>>>>>>>>>> itself. Lag is also a consumer group level concept and the problem 
>>>>>>>>>>> we have
>>>>>>>>>>> is being able to detect failures at the level of individual consumer
>>>>>>>>>>> instances.
>>>>>>>>>>>
>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma then 
>>>>>>>>>>> is who
>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but the 
>>>>>>>>>>> application
>>>>>>>>>>> owner can define what a "normal" processing latency is for their
>>>>>>>>>>> application. Now the question is what's the easiest way for the 
>>>>>>>>>>> user to
>>>>>>>>>>> define this without having to tune and fine tune this too often. The
>>>>>>>>>>> heartbeat interval certainly does not have to be *exactly*
>>>>>>>>>>> 99tile of processing latency but could be in the ballpark + an 
>>>>>>>>>>> error delta.
>>>>>>>>>>> The error delta is the application owner's acceptable risk 
>>>>>>>>>>> threshold during
>>>>>>>>>>> which they would be ok if the application remains part of the group 
>>>>>>>>>>> despite
>>>>>>>>>>> being dead. It is ultimately a tradeoff between operational ease 
>>>>>>>>>>> and more
>>>>>>>>>>> accurate failure detection.
>>>>>>>>>>>
>>>>>>>>>>> With quotas the write latencies to kafka could range from a few
>>>>>>>>>>>> milliseconds all the way to a tens of seconds.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is actually no different from the GC problem. Most most of
>>>>>>>>>>> the times, the normal GC falls in the few ms range and there are 
>>>>>>>>>>> many
>>>>>>>>>>> applications even at LinkedIn for which the max GC falls in the 
>>>>>>>>>>> multiple
>>>>>>>>>>> seconds range. Note that it also can't be predicted, so has to be an
>>>>>>>>>>> observed value. One way or the other, you have to observe what this
>>>>>>>>>>> acceptable "max" is for your application and then set the 
>>>>>>>>>>> appropriate
>>>>>>>>>>> timeouts.
>>>>>>>>>>>
>>>>>>>>>>> Since this is not something that can be automated, this is a
>>>>>>>>>>> config that the application owner has to set based on the expected 
>>>>>>>>>>> behavior
>>>>>>>>>>> of their application. Not wanting to do that leads to ending up 
>>>>>>>>>>> with bad
>>>>>>>>>>> consumption semantics where the application process continues to be 
>>>>>>>>>>> part of
>>>>>>>>>>> a group owning partitions but not consuming since it has halted due 
>>>>>>>>>>> to a
>>>>>>>>>>> problem. The fact that the design requires them to express that in 
>>>>>>>>>>> poll()
>>>>>>>>>>> frequency or not doesn't change the fact that the application owner 
>>>>>>>>>>> has to
>>>>>>>>>>> go through the process of measuring and then defining this "max".
>>>>>>>>>>>
>>>>>>>>>>> The reverse where they don't do this and the application remains
>>>>>>>>>>> in the group despite being dead is super confusing and frustrating 
>>>>>>>>>>> too. So
>>>>>>>>>>> the due diligence up front is actually worth. And as long as the 
>>>>>>>>>>> poll()
>>>>>>>>>>> latency and processing latency can be monitored, it should be easy 
>>>>>>>>>>> to tell
>>>>>>>>>>> the reason for a rebalance, whether that is valid or not and how 
>>>>>>>>>>> that
>>>>>>>>>>> should be tuned.
>>>>>>>>>>>
>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source that
>>>>>>>>>>> will hide this complexity and I agree that LI is unblocked since 
>>>>>>>>>>> you can do
>>>>>>>>>>> this in TrackerConsumer in the meantime.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Neha
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>
>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points you
>>>>>>>>>>>> discuss are all very valid.
>>>>>>>>>>>>
>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. However
>>>>>>>>>>>> there are a smaller percentage of memory hungry apps that get hit 
>>>>>>>>>>>> by it.
>>>>>>>>>>>>
>>>>>>>>>>>> The broader issue whereby even if the heartbeats are healthy,
>>>>>>>>>>>> the app might not be behaving correctly is also real.  If the app 
>>>>>>>>>>>> is
>>>>>>>>>>>> calling poll() then the probability that the app is healthy is 
>>>>>>>>>>>> surely
>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the app is
>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>> In other cases the app might have even died in which case this
>>>>>>>>>>>> discussion is moot.
>>>>>>>>>>>>
>>>>>>>>>>>> Point being that the only absolute way to really detect that an
>>>>>>>>>>>> app is healthy is to monitor lag. If the lag increases then for 
>>>>>>>>>>>> sure
>>>>>>>>>>>> something is wrong.
>>>>>>>>>>>>
>>>>>>>>>>>> The proposal seems to be that the application needs to tune
>>>>>>>>>>>> their session timeout based on the 99tile of the time they take to 
>>>>>>>>>>>> process
>>>>>>>>>>>> events after every poll.   This turns out is a nontrivial thing to 
>>>>>>>>>>>> do for
>>>>>>>>>>>> an application todo. To start with when an application is new 
>>>>>>>>>>>> their data is
>>>>>>>>>>>> going to be based on tests that they have done on synthetic data.  
>>>>>>>>>>>> This
>>>>>>>>>>>> often times doesn't represent what they will see in production.  
>>>>>>>>>>>> Once the
>>>>>>>>>>>> app is in production their processing latencies will potentially 
>>>>>>>>>>>> vary over
>>>>>>>>>>>> time.  It is extremely unlikely that the application owner does a 
>>>>>>>>>>>> careful
>>>>>>>>>>>> job of monitoring the 99tile of latencies over time and readjust 
>>>>>>>>>>>> the
>>>>>>>>>>>> settings.  Often times the latencies vary because of variance is 
>>>>>>>>>>>> other
>>>>>>>>>>>> services that are called by the consumer as part of processing the 
>>>>>>>>>>>> events.
>>>>>>>>>>>>
>>>>>>>>>>>> Case in point would be a simple app which reads events and
>>>>>>>>>>>> writes to Kafka.  With quotas the write latencies to kafka could 
>>>>>>>>>>>> range from
>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds.  As the scale 
>>>>>>>>>>>> of
>>>>>>>>>>>> processing for an app increasing the app or that 'user' could now 
>>>>>>>>>>>> get
>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the 
>>>>>>>>>>>> application owner
>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a potential 
>>>>>>>>>>>> outage
>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>
>>>>>>>>>>>> If we expose the pause() Api then It is possible for us to take
>>>>>>>>>>>> care of this in the linkedin wrapper.  Whereby we would keep 
>>>>>>>>>>>> calling poll
>>>>>>>>>>>> on a separate thread periodically and enqueue the messages. When 
>>>>>>>>>>>> the queue
>>>>>>>>>>>> is full we would call pause().
>>>>>>>>>>>>
>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I think
>>>>>>>>>>>> it is vastly better if we address this in the Api as every major 
>>>>>>>>>>>> customer
>>>>>>>>>>>> will eventually be pained by it.
>>>>>>>>>>>>
>>>>>>>>>>>> Kartik
>>>>>>>>>>>>
>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>
>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges and now
>>>>>>>>>>>> is definitely the time to clean them up.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm pretty reluctant to change the threading model or undergo a
>>>>>>>>>>>> big api redesign at this point beyond the group management stuff 
>>>>>>>>>>>> we've
>>>>>>>>>>>> discussed in the context of Samza/copycat which is already a big 
>>>>>>>>>>>> effort.
>>>>>>>>>>>>
>>>>>>>>>>>> Overall I agree that we have done a poor job of documenting
>>>>>>>>>>>> which apis block and which don't and when people are surprised 
>>>>>>>>>>>> because we
>>>>>>>>>>>> haven't labeled something that will be unintuitive. But the 
>>>>>>>>>>>> overall style
>>>>>>>>>>>> of poll/select-based apis is quite common in programming going 
>>>>>>>>>>>> back to unix
>>>>>>>>>>>> select so I don't think it's beyond people if explained well 
>>>>>>>>>>>> (after all we
>>>>>>>>>>>> need to mix sync and async apis and if we don't say which is which 
>>>>>>>>>>>> any
>>>>>>>>>>>> scheme will be confusing).
>>>>>>>>>>>>
>>>>>>>>>>>> For what it's worth the experience with this api has actually
>>>>>>>>>>>> been about 1000x better than the issues people had around 
>>>>>>>>>>>> intuitiveness
>>>>>>>>>>>> with the high-level api. The crazy blocking iterator, impossible 
>>>>>>>>>>>> internal
>>>>>>>>>>>> queue sizing, baroque threading model, etc  have all caused 
>>>>>>>>>>>> endless amounts
>>>>>>>>>>>> of anger. Not to mention that that client effectively disqualifies 
>>>>>>>>>>>> about
>>>>>>>>>>>> 50% of the use cases people want to try to use it for (plus I 
>>>>>>>>>>>> regularly
>>>>>>>>>>>> hear people tell me they've heard not to use it at all for various 
>>>>>>>>>>>> reasons
>>>>>>>>>>>> ranging from data loss to lack of features). It's important to 
>>>>>>>>>>>> have that
>>>>>>>>>>>> context when people need to switch and they say "oh the old way 
>>>>>>>>>>>> was so
>>>>>>>>>>>> simple and the new way complex!" :-)
>>>>>>>>>>>>
>>>>>>>>>>>> Let me give some context related to your points, based on our
>>>>>>>>>>>> previous discussions:
>>>>>>>>>>>>
>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>
>>>>>>>>>>>> The motivation for avoiding additional threading was two-fold.
>>>>>>>>>>>> First this client is really intended to be the lowest level 
>>>>>>>>>>>> client. There
>>>>>>>>>>>> are many, many possible higher level processing abstractions. One 
>>>>>>>>>>>> thing we
>>>>>>>>>>>> found to be a big problem with the high-level client was that it 
>>>>>>>>>>>> coupled
>>>>>>>>>>>> things everyone must have--failover, etc--with things that are 
>>>>>>>>>>>> different in
>>>>>>>>>>>> each use case like the appropriate threading model. If you do this 
>>>>>>>>>>>> you need
>>>>>>>>>>>> to also maintain a thread free low-level consumer api for people 
>>>>>>>>>>>> to get
>>>>>>>>>>>> around whatever you have done.
>>>>>>>>>>>>
>>>>>>>>>>>> The second reason was that the internal threading in the client
>>>>>>>>>>>> became quite complex. The answer with threading is always that "it 
>>>>>>>>>>>> won't be
>>>>>>>>>>>> complex this time", but it always is.
>>>>>>>>>>>>
>>>>>>>>>>>> For the heartbeat you correctly describe the downside to
>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the application 
>>>>>>>>>>>> must
>>>>>>>>>>>> regularly consume to be considered an active consumer. This allows 
>>>>>>>>>>>> the
>>>>>>>>>>>> possibility of false positive failure detections. However it's 
>>>>>>>>>>>> important to
>>>>>>>>>>>> understand the downside of the alternative. If you do background 
>>>>>>>>>>>> polling a
>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. This 
>>>>>>>>>>>> leads to
>>>>>>>>>>>> all kinds of active consumers that aren't consuming because they 
>>>>>>>>>>>> have
>>>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions and
>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. If you 
>>>>>>>>>>>> allow
>>>>>>>>>>>> false positives the user sees the frequent rebalances and knows 
>>>>>>>>>>>> they aren't
>>>>>>>>>>>> consuming frequently enough to be considered active but if you 
>>>>>>>>>>>> allows false
>>>>>>>>>>>> negatives you end up having weeks go by before someone notices 
>>>>>>>>>>>> that a
>>>>>>>>>>>> partition has been unconsumed the whole time at which point the 
>>>>>>>>>>>> data is
>>>>>>>>>>>> gone. Plus of course even if you do this you still have regular 
>>>>>>>>>>>> false
>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this in 
>>>>>>>>>>>> some depth
>>>>>>>>>>>> at the time and decided that it is better to have the liveness 
>>>>>>>>>>>> notion tied
>>>>>>>>>>>> to *actual* consumption which is the actual definition of
>>>>>>>>>>>> liveness.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>
>>>>>>>>>>>> -Jay
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>
>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>>>>>>
>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza,
>>>>>>>>>>>>> and some other teams and we got similar feedback.
>>>>>>>>>>>>>
>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>>>>>>    KafkaConsumer.poll drives everything. The other methods like 
>>>>>>>>>>>>> subscribe,
>>>>>>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything without a
>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    The semantics of a commit() call should be consistent
>>>>>>>>>>>>>    between sync and async operations. Currently, sync commit is a 
>>>>>>>>>>>>> blocking
>>>>>>>>>>>>>    call which actually sends out an OffsetCommitRequest and waits 
>>>>>>>>>>>>> for the
>>>>>>>>>>>>>    response upon the user’s KafkaConsumer.commit call. However, 
>>>>>>>>>>>>> the async
>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the 
>>>>>>>>>>>>> OffsetCommitRequest.
>>>>>>>>>>>>>    The request itself is later sent out in the next poll. The 
>>>>>>>>>>>>> teams we talked
>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    Heartbeats are dependent on user application behavior
>>>>>>>>>>>>>    (i.e. user applications calling poll). This can be a big 
>>>>>>>>>>>>> problem as we
>>>>>>>>>>>>>    don’t control how different applications behave. For example, 
>>>>>>>>>>>>> we might have
>>>>>>>>>>>>>    an application which reads from Kafka and writes to Espresso. 
>>>>>>>>>>>>> If Espresso
>>>>>>>>>>>>>    is slow for whatever reason, then in rebalances could happen.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Generally speaking, we feel that the current KafkaConsumer API
>>>>>>>>>>>>> design is more of a wrapping around the old simple consumer, i.e. 
>>>>>>>>>>>>> in old
>>>>>>>>>>>>> consumer we ask users to deal with raw protocols and error 
>>>>>>>>>>>>> handlings while
>>>>>>>>>>>>> in KafkaConsumer we do that for users. However, for old high 
>>>>>>>>>>>>> level consumer
>>>>>>>>>>>>> users (which are the majority of users), the experience is a 
>>>>>>>>>>>>> noticeable
>>>>>>>>>>>>> regression. The old high level consumer interface is simple and 
>>>>>>>>>>>>> easy to use
>>>>>>>>>>>>> for end user, while KafkaConsumer requires users to be aware of 
>>>>>>>>>>>>> many
>>>>>>>>>>>>> underlying details and is becoming prohibitive for users to 
>>>>>>>>>>>>> adopt. This is
>>>>>>>>>>>>> hinted by the javadoc growing bigger and bigger.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We think it's getting to the point where we should take a step
>>>>>>>>>>>>> back and look at the big picture.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called by the 
>>>>>>>>>>>>> user
>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>
>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>
>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>
>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>
>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>
>>>>>>>>>>>>> Given that the selector's poll is being driven by the end
>>>>>>>>>>>>> user, this ends up making us educate users on NIO and the 
>>>>>>>>>>>>> consequences of
>>>>>>>>>>>>> not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>
>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>
>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>
>>>>>>>>>>>>> More generally speaking, there are many surprises with the
>>>>>>>>>>>>> current KafkaConsumer implementation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>
>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume messages
>>>>>>>>>>>>>
>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or
>>>>>>>>>>>>> partitions(explicit group management)
>>>>>>>>>>>>>
>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>
>>>>>>>>>>>>> The last point is the big one that we think we aren't hitting.
>>>>>>>>>>>>> We think the most important example is that there should be no 
>>>>>>>>>>>>> requirement
>>>>>>>>>>>>> from the end user to consistently KafkaConsumer.poll in order for 
>>>>>>>>>>>>> all of
>>>>>>>>>>>>> the above tasks to happen. We think it would be better to split 
>>>>>>>>>>>>> those tasks
>>>>>>>>>>>>> into tasks that should not rely on KafkaConsumer.poll and tasks 
>>>>>>>>>>>>> that should
>>>>>>>>>>>>> rely on KafkaConsumer.poll.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>
>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>
>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>
>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>
>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>
>>>>>>>>>>>>> This would help reduce the amount of surprises to the end user.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to you
>>>>>>>>>>>>> guys early next week. We’d like to meet up with you at LinkedIn 
>>>>>>>>>>>>> on *July
>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it to open
>>>>>>>>>>>>> source.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Neha
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Neha
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ewen
>>>
>>
>>
>
>
> --
> Thanks,
> Neha
>



-- 
-- Guozhang

Reply via email to