Works now. Thanks Becket!

On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com> wrote:

> Ah... My bad, forgot to change the URL link for pictures.
> Thanks for the quick response, Neha. It should be fixed now, can you try
> again?
>
> Jiangjie (Becket) Qin
>
> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> wrote:
>
>> Thanks Becket. Quick comment - there seem to be a bunch of images that
>> the wiki refers to, but none loaded for me. Just making sure if its just me
>> or can everyone not see the pictures?
>>
>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>>
>>> I agree with Ewen that a single threaded model will be tricky to
>>> implement the same conventional semantic of async or Future. We just
>>> drafted the following wiki which explains our thoughts in LinkedIn on the
>>> new consumer API and threading model.
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>>
>>> We were trying to see:
>>> 1. If we can use some kind of methodology to help us think about what
>>> API we want to provide to user for different use cases.
>>> 2. What is the pros and cons of current single threaded model. Is there
>>> a way that we can maintain the benefits while solve the issues we are
>>> facing now with single threaded model.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>>> e...@confluent.io> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>
>>>>> I think Ewen has proposed these APIs for using callbacks along with
>>>>> returning future in the commit calls, i.e. something similar to:
>>>>>
>>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>>
>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>>> ConsumerCommitCallback callback);
>>>>>
>>>>> At that time I was slightly intending not to include the Future
>>>>> besides adding the callback mainly because of the implementation 
>>>>> complexity
>>>>> I feel it could introduce along with the retry settings after looking
>>>>> through the code base. I would happy to change my mind if we could propose
>>>>> a prototype implementation that is simple enough.
>>>>>
>>>>>
>>>> One of the reasons that interface ended up being difficult (or maybe
>>>> impossible) to make work reasonably is because the consumer was thread-safe
>>>> at the time. That made it impossible to know what should be done when
>>>> Future.get() is called -- should the implementation call poll() itself, or
>>>> would the fact that the user is calling get() imply that there's a
>>>> background thread running the poll() loop and we just need to wait for it?
>>>>
>>>> The consumer is no longer thread safe, but I think the same problem
>>>> remains because the expectation with Futures is that they are thread safe.
>>>> Which means that even if the consumer isn't thread safe, I would expect to
>>>> be able to hand that Future off to some other thread, have the second
>>>> thread call get(), and then continue driving the poll loop in my thread
>>>> (which in turn would eventually resolve the Future).
>>>>
>>>> I quite dislike the sync/async enum. While both operations commit
>>>> offsets, their semantics are so different that overloading a single method
>>>> with both is messy. That said, I don't think we should consider this an
>>>> inconsistency wrt the new producer API's use of Future because the two APIs
>>>> have a much more fundamental difference that justifies it: they have
>>>> completely different threading and execution models.
>>>>
>>>> I think a Future-based API only makes sense if you can guarantee the
>>>> operations that Futures are waiting on will continue to make progress
>>>> regardless of what the thread using the Future does. The producer API makes
>>>> that work by processing asynchronous requests in a background thread. The
>>>> new consumer does not, and so it becomes difficult/impossible to implement
>>>> the Future correctly. (Or, you have to make assumptions which break other
>>>> use cases; if you want to support the simple use case of just making a
>>>> commit() synchronous by calling get(), the Future has to call poll()
>>>> internally; but if you do that, then if any user ever wants to add
>>>> synchronization to the consumer via some external mechanism, then the
>>>> implementation of the Future's get() method will not be subject to that
>>>> synchronization and things will break).
>>>>
>>>> -Ewen
>>>>
>>>>
>>>>> Guozhang
>>>>>
>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Hey Adi,
>>>>>>
>>>>>> When we designed the initial version, the producer API was still
>>>>>> changing. I thought about adding the Future and then just didn't get to 
>>>>>> it.
>>>>>> I agree that we should look into adding it for consistency.
>>>>>>
>>>>>> Thanks,
>>>>>> Neha
>>>>>>
>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>>> aaurad...@linkedin.com> wrote:
>>>>>>
>>>>>>> Great discussion everyone!
>>>>>>>
>>>>>>> One general comment on the sync/async API's on the new consumer. I
>>>>>>> think the producer tackles sync vs async API's well. For API's that can
>>>>>>> either be sync or async, can we simply return a future? That seems more
>>>>>>> elegant for the API's that make sense either in both flavors. From the
>>>>>>> users perspective, it is more consistent with the new producer. One easy
>>>>>>> example is the commit call with the CommitType enum.. we can make that 
>>>>>>> call
>>>>>>> always async and users can block on the future if they want to make sure
>>>>>>> their offsets are committed.
>>>>>>>
>>>>>>> Aditya
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <okara...@linkedin.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Thanks for the great responses, everyone!
>>>>>>>>
>>>>>>>> To expand a tiny bit on my initial post: while I did bring up old
>>>>>>>> high level consumers, the teams we spoke to were actually not the 
>>>>>>>> types of
>>>>>>>> services that simply wanted an easy way to get ConsumerRecords. We 
>>>>>>>> spoke to
>>>>>>>> infrastructure teams that I would consider to be closer to the 
>>>>>>>> "power-user"
>>>>>>>> end of the spectrum and would want KafkaConsumer's level of 
>>>>>>>> granularity.
>>>>>>>> Some would use auto group management. Some would use explicit group
>>>>>>>> management. All of them would turn off auto offset commits. Yes, the 
>>>>>>>> Samza
>>>>>>>> team had prior experience with the old SimpleConsumer, but this is the
>>>>>>>> first kafka consumer being used by the Databus team. So I don't really
>>>>>>>> think the feedback received was about the simpler times or wanting
>>>>>>>> additional higher-level clients.
>>>>>>>>
>>>>>>>> - Onur
>>>>>>>>
>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <
>>>>>>>> ja...@confluent.io> wrote:
>>>>>>>>
>>>>>>>>> I think if we recommend a longer session timeout, then we should
>>>>>>>>> expose the heartbeat frequency in configuration since this generally
>>>>>>>>> controls how long normal rebalances will take. I think it's currently
>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be nice 
>>>>>>>>> to
>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown of a
>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the timeout to
>>>>>>>>> reassign partitions.
>>>>>>>>>
>>>>>>>>> -Jason
>>>>>>>>>
>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Kartik,
>>>>>>>>>>
>>>>>>>>>> Totally agree we don't want people tuning timeouts in the common
>>>>>>>>>> case.
>>>>>>>>>>
>>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>>> 1. Default the timeout high
>>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>>
>>>>>>>>>> When we were doing the consumer design we discussed this tradeoff
>>>>>>>>>> and I think the conclusion we came to was that defaulting to a high 
>>>>>>>>>> timeout
>>>>>>>>>> was actually better. This means it takes a little longer to detect a
>>>>>>>>>> failure, but usually that is not a big problem and people who want 
>>>>>>>>>> faster
>>>>>>>>>> failure detection can tune it down. This seemed better than having 
>>>>>>>>>> the
>>>>>>>>>> failure detection not really cover the consumption and just be a 
>>>>>>>>>> background
>>>>>>>>>> ping. The two reasons where (a) you still have the GC problem even 
>>>>>>>>>> for the
>>>>>>>>>> background thread, (b) consumption is in some sense a better 
>>>>>>>>>> definition of
>>>>>>>>>> an active healthy consumer and a lot of problems crop up when you 
>>>>>>>>>> have an
>>>>>>>>>> inactive consumer with an active background thread (as today).
>>>>>>>>>>
>>>>>>>>>> When we had the discussion I think what we realized was that most
>>>>>>>>>> people who were worried about the timeout where imagining a very low
>>>>>>>>>> default (500ms) say. But in fact just setting this to 60 seconds or 
>>>>>>>>>> higher
>>>>>>>>>> as a default would be okay, this adds to the failure detection time 
>>>>>>>>>> but
>>>>>>>>>> only apps that care about this need to tune. This should largely 
>>>>>>>>>> eliminate
>>>>>>>>>> false positives since after all if you disappear for 60 seconds that
>>>>>>>>>> actually starts to be more of a true positive, even if you come 
>>>>>>>>>> back... :-)
>>>>>>>>>>
>>>>>>>>>> -Jay
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> adding the open source alias.  This email started off as a
>>>>>>>>>>> broader discussion around the new consumer.  I was zooming into 
>>>>>>>>>>> only the
>>>>>>>>>>> aspect of poll() being the only mechanism for driving the 
>>>>>>>>>>> heartbeats.
>>>>>>>>>>>
>>>>>>>>>>> Yes the lag is the effect of the problem (not the problem).
>>>>>>>>>>> Monitoring the lag is important as it is the primary way to tell if 
>>>>>>>>>>> the
>>>>>>>>>>> application is wedged.  There might be other metrics which can 
>>>>>>>>>>> possibly
>>>>>>>>>>> capture the same essence. Yes the lag is at the consumer group 
>>>>>>>>>>> level, but
>>>>>>>>>>> you can tell that one of the consumers is messed up if one of the
>>>>>>>>>>> partitions in the application start generating lag and others are 
>>>>>>>>>>> good for
>>>>>>>>>>> e.g.
>>>>>>>>>>>
>>>>>>>>>>> Monitoring aside, I think the main point of concern is that in
>>>>>>>>>>> the old consumer most customers don't have to worry about 
>>>>>>>>>>> unnecessary
>>>>>>>>>>> rebalances and most of the things that they do in their app doesn't 
>>>>>>>>>>> have an
>>>>>>>>>>> impact on the session timeout..  (i.e. the only thing that causes
>>>>>>>>>>> rebalances is when the GC is out of whack).    For the handful of 
>>>>>>>>>>> customers
>>>>>>>>>>> who are impacted by GC related rebalances, i would imagine that all 
>>>>>>>>>>> of them
>>>>>>>>>>> would really want us to make the system more resilient.    I agree 
>>>>>>>>>>> that the
>>>>>>>>>>> GC problem can't be solved easily in the java client, however it 
>>>>>>>>>>> appears
>>>>>>>>>>> that now we would be expecting the consuming applications to be 
>>>>>>>>>>> even more
>>>>>>>>>>> careful with ongoing tuning of the timeouts.  At LinkedIn, we have 
>>>>>>>>>>> seen
>>>>>>>>>>> that most kafka applications don't have much of a clue about 
>>>>>>>>>>> configuring
>>>>>>>>>>> the timeouts and just end up calling the Kafka team when their 
>>>>>>>>>>> application
>>>>>>>>>>> sees rebalances.
>>>>>>>>>>>
>>>>>>>>>>> The other side effect of poll driving the heartbeats is that we
>>>>>>>>>>> have to make sure that people don't set a poll timeout that is 
>>>>>>>>>>> larger than
>>>>>>>>>>> the session timeout.   If we had a notion of implicit heartbeats 
>>>>>>>>>>> then we
>>>>>>>>>>> could also automatically make this work for consumers by sending 
>>>>>>>>>>> hearbeats
>>>>>>>>>>> at the appropriate interval even though the customers want to do a 
>>>>>>>>>>> long
>>>>>>>>>>> poll.
>>>>>>>>>>>
>>>>>>>>>>> We could surely work around this in LinkedIn if either we have
>>>>>>>>>>> the Pause() api or an explicit HeartBeat() api on the consumer.
>>>>>>>>>>>
>>>>>>>>>>> Would love to hear how other people think about this subject ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Kartik
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <
>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is that
>>>>>>>>>>>> there are many ways the application's message processing could 
>>>>>>>>>>>> fail and we
>>>>>>>>>>>> wouldn't be able to model all of those in the consumer's failure 
>>>>>>>>>>>> detection
>>>>>>>>>>>> mechanism. So we should try to model as much of it as we can so the
>>>>>>>>>>>> consumer's failure detection is meaningful.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> Point being that the only absolute way to really detect that
>>>>>>>>>>>>> an app is healthy is to monitor lag. If the lag increases then 
>>>>>>>>>>>>> for sure
>>>>>>>>>>>>> something is wrong.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The lag is merely the effect of the problem, not the problem
>>>>>>>>>>>> itself. Lag is also a consumer group level concept and the problem 
>>>>>>>>>>>> we have
>>>>>>>>>>>> is being able to detect failures at the level of individual 
>>>>>>>>>>>> consumer
>>>>>>>>>>>> instances.
>>>>>>>>>>>>
>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma 
>>>>>>>>>>>> then is who
>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but the 
>>>>>>>>>>>> application
>>>>>>>>>>>> owner can define what a "normal" processing latency is for their
>>>>>>>>>>>> application. Now the question is what's the easiest way for the 
>>>>>>>>>>>> user to
>>>>>>>>>>>> define this without having to tune and fine tune this too often. 
>>>>>>>>>>>> The
>>>>>>>>>>>> heartbeat interval certainly does not have to be *exactly*
>>>>>>>>>>>> 99tile of processing latency but could be in the ballpark + an 
>>>>>>>>>>>> error delta.
>>>>>>>>>>>> The error delta is the application owner's acceptable risk 
>>>>>>>>>>>> threshold during
>>>>>>>>>>>> which they would be ok if the application remains part of the 
>>>>>>>>>>>> group despite
>>>>>>>>>>>> being dead. It is ultimately a tradeoff between operational ease 
>>>>>>>>>>>> and more
>>>>>>>>>>>> accurate failure detection.
>>>>>>>>>>>>
>>>>>>>>>>>> With quotas the write latencies to kafka could range from a few
>>>>>>>>>>>>> milliseconds all the way to a tens of seconds.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is actually no different from the GC problem. Most most of
>>>>>>>>>>>> the times, the normal GC falls in the few ms range and there are 
>>>>>>>>>>>> many
>>>>>>>>>>>> applications even at LinkedIn for which the max GC falls in the 
>>>>>>>>>>>> multiple
>>>>>>>>>>>> seconds range. Note that it also can't be predicted, so has to be 
>>>>>>>>>>>> an
>>>>>>>>>>>> observed value. One way or the other, you have to observe what this
>>>>>>>>>>>> acceptable "max" is for your application and then set the 
>>>>>>>>>>>> appropriate
>>>>>>>>>>>> timeouts.
>>>>>>>>>>>>
>>>>>>>>>>>> Since this is not something that can be automated, this is a
>>>>>>>>>>>> config that the application owner has to set based on the expected 
>>>>>>>>>>>> behavior
>>>>>>>>>>>> of their application. Not wanting to do that leads to ending up 
>>>>>>>>>>>> with bad
>>>>>>>>>>>> consumption semantics where the application process continues to 
>>>>>>>>>>>> be part of
>>>>>>>>>>>> a group owning partitions but not consuming since it has halted 
>>>>>>>>>>>> due to a
>>>>>>>>>>>> problem. The fact that the design requires them to express that in 
>>>>>>>>>>>> poll()
>>>>>>>>>>>> frequency or not doesn't change the fact that the application 
>>>>>>>>>>>> owner has to
>>>>>>>>>>>> go through the process of measuring and then defining this "max".
>>>>>>>>>>>>
>>>>>>>>>>>> The reverse where they don't do this and the application
>>>>>>>>>>>> remains in the group despite being dead is super confusing and 
>>>>>>>>>>>> frustrating
>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as long 
>>>>>>>>>>>> as the
>>>>>>>>>>>> poll() latency and processing latency can be monitored, it should 
>>>>>>>>>>>> be easy
>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or not 
>>>>>>>>>>>> and how
>>>>>>>>>>>> that should be tuned.
>>>>>>>>>>>>
>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source that
>>>>>>>>>>>> will hide this complexity and I agree that LI is unblocked since 
>>>>>>>>>>>> you can do
>>>>>>>>>>>> this in TrackerConsumer in the meantime.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Neha
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points you
>>>>>>>>>>>>> discuss are all very valid.
>>>>>>>>>>>>>
>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. However
>>>>>>>>>>>>> there are a smaller percentage of memory hungry apps that get hit 
>>>>>>>>>>>>> by it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are healthy,
>>>>>>>>>>>>> the app might not be behaving correctly is also real.  If the app 
>>>>>>>>>>>>> is
>>>>>>>>>>>>> calling poll() then the probability that the app is healthy is 
>>>>>>>>>>>>> surely
>>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the app is
>>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>>> In other cases the app might have even died in which case this
>>>>>>>>>>>>> discussion is moot.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Point being that the only absolute way to really detect that
>>>>>>>>>>>>> an app is healthy is to monitor lag. If the lag increases then 
>>>>>>>>>>>>> for sure
>>>>>>>>>>>>> something is wrong.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The proposal seems to be that the application needs to tune
>>>>>>>>>>>>> their session timeout based on the 99tile of the time they take 
>>>>>>>>>>>>> to process
>>>>>>>>>>>>> events after every poll.   This turns out is a nontrivial thing 
>>>>>>>>>>>>> to do for
>>>>>>>>>>>>> an application todo. To start with when an application is new 
>>>>>>>>>>>>> their data is
>>>>>>>>>>>>> going to be based on tests that they have done on synthetic data. 
>>>>>>>>>>>>>  This
>>>>>>>>>>>>> often times doesn't represent what they will see in production.  
>>>>>>>>>>>>> Once the
>>>>>>>>>>>>> app is in production their processing latencies will potentially 
>>>>>>>>>>>>> vary over
>>>>>>>>>>>>> time.  It is extremely unlikely that the application owner does a 
>>>>>>>>>>>>> careful
>>>>>>>>>>>>> job of monitoring the 99tile of latencies over time and readjust 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> settings.  Often times the latencies vary because of variance is 
>>>>>>>>>>>>> other
>>>>>>>>>>>>> services that are called by the consumer as part of processing 
>>>>>>>>>>>>> the events.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Case in point would be a simple app which reads events and
>>>>>>>>>>>>> writes to Kafka.  With quotas the write latencies to kafka could 
>>>>>>>>>>>>> range from
>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds.  As the 
>>>>>>>>>>>>> scale of
>>>>>>>>>>>>> processing for an app increasing the app or that 'user' could now 
>>>>>>>>>>>>> get
>>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the 
>>>>>>>>>>>>> application owner
>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a 
>>>>>>>>>>>>> potential outage
>>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us to
>>>>>>>>>>>>> take care of this in the linkedin wrapper.  Whereby we would keep 
>>>>>>>>>>>>> calling
>>>>>>>>>>>>> poll on a separate thread periodically and enqueue the messages. 
>>>>>>>>>>>>> When the
>>>>>>>>>>>>> queue is full we would call pause().
>>>>>>>>>>>>>
>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I think
>>>>>>>>>>>>> it is vastly better if we address this in the Api as every major 
>>>>>>>>>>>>> customer
>>>>>>>>>>>>> will eventually be pained by it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges and
>>>>>>>>>>>>> now is definitely the time to clean them up.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or undergo
>>>>>>>>>>>>> a big api redesign at this point beyond the group management 
>>>>>>>>>>>>> stuff we've
>>>>>>>>>>>>> discussed in the context of Samza/copycat which is already a big 
>>>>>>>>>>>>> effort.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Overall I agree that we have done a poor job of documenting
>>>>>>>>>>>>> which apis block and which don't and when people are surprised 
>>>>>>>>>>>>> because we
>>>>>>>>>>>>> haven't labeled something that will be unintuitive. But the 
>>>>>>>>>>>>> overall style
>>>>>>>>>>>>> of poll/select-based apis is quite common in programming going 
>>>>>>>>>>>>> back to unix
>>>>>>>>>>>>> select so I don't think it's beyond people if explained well 
>>>>>>>>>>>>> (after all we
>>>>>>>>>>>>> need to mix sync and async apis and if we don't say which is 
>>>>>>>>>>>>> which any
>>>>>>>>>>>>> scheme will be confusing).
>>>>>>>>>>>>>
>>>>>>>>>>>>> For what it's worth the experience with this api has actually
>>>>>>>>>>>>> been about 1000x better than the issues people had around 
>>>>>>>>>>>>> intuitiveness
>>>>>>>>>>>>> with the high-level api. The crazy blocking iterator, impossible 
>>>>>>>>>>>>> internal
>>>>>>>>>>>>> queue sizing, baroque threading model, etc  have all caused 
>>>>>>>>>>>>> endless amounts
>>>>>>>>>>>>> of anger. Not to mention that that client effectively 
>>>>>>>>>>>>> disqualifies about
>>>>>>>>>>>>> 50% of the use cases people want to try to use it for (plus I 
>>>>>>>>>>>>> regularly
>>>>>>>>>>>>> hear people tell me they've heard not to use it at all for 
>>>>>>>>>>>>> various reasons
>>>>>>>>>>>>> ranging from data loss to lack of features). It's important to 
>>>>>>>>>>>>> have that
>>>>>>>>>>>>> context when people need to switch and they say "oh the old way 
>>>>>>>>>>>>> was so
>>>>>>>>>>>>> simple and the new way complex!" :-)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me give some context related to your points, based on our
>>>>>>>>>>>>> previous discussions:
>>>>>>>>>>>>>
>>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The motivation for avoiding additional threading was two-fold.
>>>>>>>>>>>>> First this client is really intended to be the lowest level 
>>>>>>>>>>>>> client. There
>>>>>>>>>>>>> are many, many possible higher level processing abstractions. One 
>>>>>>>>>>>>> thing we
>>>>>>>>>>>>> found to be a big problem with the high-level client was that it 
>>>>>>>>>>>>> coupled
>>>>>>>>>>>>> things everyone must have--failover, etc--with things that are 
>>>>>>>>>>>>> different in
>>>>>>>>>>>>> each use case like the appropriate threading model. If you do 
>>>>>>>>>>>>> this you need
>>>>>>>>>>>>> to also maintain a thread free low-level consumer api for people 
>>>>>>>>>>>>> to get
>>>>>>>>>>>>> around whatever you have done.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The second reason was that the internal threading in the
>>>>>>>>>>>>> client became quite complex. The answer with threading is always 
>>>>>>>>>>>>> that "it
>>>>>>>>>>>>> won't be complex this time", but it always is.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to
>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the 
>>>>>>>>>>>>> application must
>>>>>>>>>>>>> regularly consume to be considered an active consumer. This 
>>>>>>>>>>>>> allows the
>>>>>>>>>>>>> possibility of false positive failure detections. However it's 
>>>>>>>>>>>>> important to
>>>>>>>>>>>>> understand the downside of the alternative. If you do background 
>>>>>>>>>>>>> polling a
>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. This 
>>>>>>>>>>>>> leads to
>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because they 
>>>>>>>>>>>>> have
>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions and
>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. If 
>>>>>>>>>>>>> you allow
>>>>>>>>>>>>> false positives the user sees the frequent rebalances and knows 
>>>>>>>>>>>>> they aren't
>>>>>>>>>>>>> consuming frequently enough to be considered active but if you 
>>>>>>>>>>>>> allows false
>>>>>>>>>>>>> negatives you end up having weeks go by before someone notices 
>>>>>>>>>>>>> that a
>>>>>>>>>>>>> partition has been unconsumed the whole time at which point the 
>>>>>>>>>>>>> data is
>>>>>>>>>>>>> gone. Plus of course even if you do this you still have regular 
>>>>>>>>>>>>> false
>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this in 
>>>>>>>>>>>>> some depth
>>>>>>>>>>>>> at the time and decided that it is better to have the liveness 
>>>>>>>>>>>>> notion tied
>>>>>>>>>>>>> to *actual* consumption which is the actual definition of
>>>>>>>>>>>>> liveness.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza,
>>>>>>>>>>>>>> and some other teams and we got similar feedback.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>>>>>>>    KafkaConsumer.poll drives everything. The other methods like 
>>>>>>>>>>>>>> subscribe,
>>>>>>>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything without a
>>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    The semantics of a commit() call should be consistent
>>>>>>>>>>>>>>    between sync and async operations. Currently, sync commit is 
>>>>>>>>>>>>>> a blocking
>>>>>>>>>>>>>>    call which actually sends out an OffsetCommitRequest and 
>>>>>>>>>>>>>> waits for the
>>>>>>>>>>>>>>    response upon the user’s KafkaConsumer.commit call. However, 
>>>>>>>>>>>>>> the async
>>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the 
>>>>>>>>>>>>>> OffsetCommitRequest.
>>>>>>>>>>>>>>    The request itself is later sent out in the next poll. The 
>>>>>>>>>>>>>> teams we talked
>>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    Heartbeats are dependent on user application behavior
>>>>>>>>>>>>>>    (i.e. user applications calling poll). This can be a big 
>>>>>>>>>>>>>> problem as we
>>>>>>>>>>>>>>    don’t control how different applications behave. For example, 
>>>>>>>>>>>>>> we might have
>>>>>>>>>>>>>>    an application which reads from Kafka and writes to Espresso. 
>>>>>>>>>>>>>> If Espresso
>>>>>>>>>>>>>>    is slow for whatever reason, then in rebalances could happen.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Generally speaking, we feel that the current KafkaConsumer
>>>>>>>>>>>>>> API design is more of a wrapping around the old simple consumer, 
>>>>>>>>>>>>>> i.e. in
>>>>>>>>>>>>>> old consumer we ask users to deal with raw protocols and error 
>>>>>>>>>>>>>> handlings
>>>>>>>>>>>>>> while in KafkaConsumer we do that for users. However, for old 
>>>>>>>>>>>>>> high level
>>>>>>>>>>>>>> consumer users (which are the majority of users), the experience 
>>>>>>>>>>>>>> is a
>>>>>>>>>>>>>> noticeable regression. The old high level consumer interface is 
>>>>>>>>>>>>>> simple and
>>>>>>>>>>>>>> easy to use for end user, while KafkaConsumer requires users to 
>>>>>>>>>>>>>> be aware of
>>>>>>>>>>>>>> many underlying details and is becoming prohibitive for users to 
>>>>>>>>>>>>>> adopt.
>>>>>>>>>>>>>> This is hinted by the javadoc growing bigger and bigger.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We think it's getting to the point where we should take a
>>>>>>>>>>>>>> step back and look at the big picture.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called by 
>>>>>>>>>>>>>> the user
>>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Given that the selector's poll is being driven by the end
>>>>>>>>>>>>>> user, this ends up making us educate users on NIO and the 
>>>>>>>>>>>>>> consequences of
>>>>>>>>>>>>>> not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> More generally speaking, there are many surprises with the
>>>>>>>>>>>>>> current KafkaConsumer implementation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume messages
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or
>>>>>>>>>>>>>> partitions(explicit group management)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The last point is the big one that we think we aren't
>>>>>>>>>>>>>> hitting. We think the most important example is that there 
>>>>>>>>>>>>>> should be no
>>>>>>>>>>>>>> requirement from the end user to consistently KafkaConsumer.poll 
>>>>>>>>>>>>>> in order
>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be 
>>>>>>>>>>>>>> better to split
>>>>>>>>>>>>>> those tasks into tasks that should not rely on 
>>>>>>>>>>>>>> KafkaConsumer.poll and tasks
>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This would help reduce the amount of surprises to the end
>>>>>>>>>>>>>> user.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to you
>>>>>>>>>>>>>> guys early next week. We’d like to meet up with you at LinkedIn 
>>>>>>>>>>>>>> on *July
>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it to
>>>>>>>>>>>>>> open source.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Neha
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Neha
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Ewen
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Neha
>>
>
>


-- 
Thanks,
Neha

Reply via email to