Hey Adi,

When we designed the initial version, the producer API was still changing.
I thought about adding the Future and then just didn't get to it. I agree
that we should look into adding it for consistency.

Thanks,
Neha

On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <aaurad...@linkedin.com>
wrote:

> Great discussion everyone!
>
> One general comment on the sync/async API's on the new consumer. I think
> the producer tackles sync vs async API's well. For API's that can either be
> sync or async, can we simply return a future? That seems more elegant for
> the API's that make sense either in both flavors. From the users
> perspective, it is more consistent with the new producer. One easy example
> is the commit call with the CommitType enum.. we can make that call always
> async and users can block on the future if they want to make sure their
> offsets are committed.
>
> Aditya
>
>
> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <okara...@linkedin.com>
> wrote:
>
>> Thanks for the great responses, everyone!
>>
>> To expand a tiny bit on my initial post: while I did bring up old high
>> level consumers, the teams we spoke to were actually not the types of
>> services that simply wanted an easy way to get ConsumerRecords. We spoke to
>> infrastructure teams that I would consider to be closer to the "power-user"
>> end of the spectrum and would want KafkaConsumer's level of granularity.
>> Some would use auto group management. Some would use explicit group
>> management. All of them would turn off auto offset commits. Yes, the Samza
>> team had prior experience with the old SimpleConsumer, but this is the
>> first kafka consumer being used by the Databus team. So I don't really
>> think the feedback received was about the simpler times or wanting
>> additional higher-level clients.
>>
>> - Onur
>>
>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>>> I think if we recommend a longer session timeout, then we should expose
>>> the heartbeat frequency in configuration since this generally controls how
>>> long normal rebalances will take. I think it's currently hard-coded to 3
>>> heartbeats per session timeout. It could also be nice to have an explicit
>>> LeaveGroup request to implement clean shutdown of a consumer. Then the
>>> coordinator doesn't have to wait for the timeout to reassign partitions.
>>>
>>> -Jason
>>>
>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> wrote:
>>>
>>>> Hey Kartik,
>>>>
>>>> Totally agree we don't want people tuning timeouts in the common case.
>>>>
>>>> However there are two ways to avoid this:
>>>> 1. Default the timeout high
>>>> 2. Put the heartbeat in a separate thread
>>>>
>>>> When we were doing the consumer design we discussed this tradeoff and I
>>>> think the conclusion we came to was that defaulting to a high timeout was
>>>> actually better. This means it takes a little longer to detect a failure,
>>>> but usually that is not a big problem and people who want faster failure
>>>> detection can tune it down. This seemed better than having the failure
>>>> detection not really cover the consumption and just be a background ping.
>>>> The two reasons where (a) you still have the GC problem even for the
>>>> background thread, (b) consumption is in some sense a better definition of
>>>> an active healthy consumer and a lot of problems crop up when you have an
>>>> inactive consumer with an active background thread (as today).
>>>>
>>>> When we had the discussion I think what we realized was that most
>>>> people who were worried about the timeout where imagining a very low
>>>> default (500ms) say. But in fact just setting this to 60 seconds or higher
>>>> as a default would be okay, this adds to the failure detection time but
>>>> only apps that care about this need to tune. This should largely eliminate
>>>> false positives since after all if you disappear for 60 seconds that
>>>> actually starts to be more of a true positive, even if you come back... :-)
>>>>
>>>> -Jay
>>>>
>>>>
>>>>
>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>> kparamasi...@linkedin.com> wrote:
>>>>
>>>>> adding the open source alias.  This email started off as a broader
>>>>> discussion around the new consumer.  I was zooming into only the aspect of
>>>>> poll() being the only mechanism for driving the heartbeats.
>>>>>
>>>>> Yes the lag is the effect of the problem (not the problem).
>>>>> Monitoring the lag is important as it is the primary way to tell if the
>>>>> application is wedged.  There might be other metrics which can possibly
>>>>> capture the same essence. Yes the lag is at the consumer group level, but
>>>>> you can tell that one of the consumers is messed up if one of the
>>>>> partitions in the application start generating lag and others are good for
>>>>> e.g.
>>>>>
>>>>> Monitoring aside, I think the main point of concern is that in the old
>>>>> consumer most customers don't have to worry about unnecessary rebalances
>>>>> and most of the things that they do in their app doesn't have an impact on
>>>>> the session timeout..  (i.e. the only thing that causes rebalances is when
>>>>> the GC is out of whack).    For the handful of customers who are impacted
>>>>> by GC related rebalances, i would imagine that all of them would really
>>>>> want us to make the system more resilient.    I agree that the GC problem
>>>>> can't be solved easily in the java client, however it appears that now we
>>>>> would be expecting the consuming applications to be even more careful with
>>>>> ongoing tuning of the timeouts.  At LinkedIn, we have seen that most kafka
>>>>> applications don't have much of a clue about configuring the timeouts and
>>>>> just end up calling the Kafka team when their application sees rebalances.
>>>>>
>>>>> The other side effect of poll driving the heartbeats is that we have
>>>>> to make sure that people don't set a poll timeout that is larger than the
>>>>> session timeout.   If we had a notion of implicit heartbeats then we could
>>>>> also automatically make this work for consumers by sending hearbeats at 
>>>>> the
>>>>> appropriate interval even though the customers want to do a long poll.
>>>>>
>>>>> We could surely work around this in LinkedIn if either we have the
>>>>> Pause() api or an explicit HeartBeat() api on the consumer.
>>>>>
>>>>> Would love to hear how other people think about this subject ?
>>>>>
>>>>> Thanks
>>>>> Kartik
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Agree with the dilemma you are pointing out, which is that there are
>>>>>> many ways the application's message processing could fail and we wouldn't
>>>>>> be able to model all of those in the consumer's failure detection
>>>>>> mechanism. So we should try to model as much of it as we can so the
>>>>>> consumer's failure detection is meaningful.
>>>>>>
>>>>>>
>>>>>>> Point being that the only absolute way to really detect that an app
>>>>>>> is healthy is to monitor lag. If the lag increases then for sure 
>>>>>>> something
>>>>>>> is wrong.
>>>>>>
>>>>>>
>>>>>> The lag is merely the effect of the problem, not the problem itself.
>>>>>> Lag is also a consumer group level concept and the problem we have is 
>>>>>> being
>>>>>> able to detect failures at the level of individual consumer instances.
>>>>>>
>>>>>> As you pointed out, a consumer that poll() is a stronger indicator of
>>>>>> whether the consumer is alive or not. The dilemma then is who defines 
>>>>>> what
>>>>>> a healthy poll() frequency is. No one else but the application owner can
>>>>>> define what a "normal" processing latency is for their application. Now 
>>>>>> the
>>>>>> question is what's the easiest way for the user to define this without
>>>>>> having to tune and fine tune this too often. The heartbeat interval
>>>>>> certainly does not have to be *exactly* 99tile of processing latency
>>>>>> but could be in the ballpark + an error delta. The error delta is the
>>>>>> application owner's acceptable risk threshold during which they would be 
>>>>>> ok
>>>>>> if the application remains part of the group despite being dead. It is
>>>>>> ultimately a tradeoff between operational ease and more accurate failure
>>>>>> detection.
>>>>>>
>>>>>> With quotas the write latencies to kafka could range from a few
>>>>>>> milliseconds all the way to a tens of seconds.
>>>>>>
>>>>>>
>>>>>> This is actually no different from the GC problem. Most most of the
>>>>>> times, the normal GC falls in the few ms range and there are many
>>>>>> applications even at LinkedIn for which the max GC falls in the multiple
>>>>>> seconds range. Note that it also can't be predicted, so has to be an
>>>>>> observed value. One way or the other, you have to observe what this
>>>>>> acceptable "max" is for your application and then set the appropriate
>>>>>> timeouts.
>>>>>>
>>>>>> Since this is not something that can be automated, this is a config
>>>>>> that the application owner has to set based on the expected behavior of
>>>>>> their application. Not wanting to do that leads to ending up with bad
>>>>>> consumption semantics where the application process continues to be part 
>>>>>> of
>>>>>> a group owning partitions but not consuming since it has halted due to a
>>>>>> problem. The fact that the design requires them to express that in poll()
>>>>>> frequency or not doesn't change the fact that the application owner has 
>>>>>> to
>>>>>> go through the process of measuring and then defining this "max".
>>>>>>
>>>>>> The reverse where they don't do this and the application remains in
>>>>>> the group despite being dead is super confusing and frustrating too. So 
>>>>>> the
>>>>>> due diligence up front is actually worth. And as long as the poll() 
>>>>>> latency
>>>>>> and processing latency can be monitored, it should be easy to tell the
>>>>>> reason for a rebalance, whether that is valid or not and how that should 
>>>>>> be
>>>>>> tuned.
>>>>>>
>>>>>> As for the wrapper, KIP-28 is the wrapper in open source that will
>>>>>> hide this complexity and I agree that LI is unblocked since you can do 
>>>>>> this
>>>>>> in TrackerConsumer in the meantime.
>>>>>>
>>>>>> Thanks,
>>>>>> Neha
>>>>>>
>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>
>>>>>>> For commit(), I think it should hopefully be an easier discussion,
>>>>>>> so maybe we can follow up when we meet up next.
>>>>>>>
>>>>>>> As far as the heartbeat is concerned, I think the points you discuss
>>>>>>> are all very valid.
>>>>>>>
>>>>>>> GC pauses impacting the heartbeats is a real issue. However there
>>>>>>> are a smaller percentage of memory hungry apps that get hit by it.
>>>>>>>
>>>>>>> The broader issue whereby even if the heartbeats are healthy, the
>>>>>>> app might not be behaving correctly is also real.  If the app is calling
>>>>>>> poll() then the probability that the app is healthy is surely higher.  
>>>>>>> But
>>>>>>> this again isn't an absolute measure that the app is processing 
>>>>>>> correctly.
>>>>>>> In other cases the app might have even died in which case this
>>>>>>> discussion is moot.
>>>>>>>
>>>>>>> Point being that the only absolute way to really detect that an app
>>>>>>> is healthy is to monitor lag. If the lag increases then for sure 
>>>>>>> something
>>>>>>> is wrong.
>>>>>>>
>>>>>>> The proposal seems to be that the application needs to tune their
>>>>>>> session timeout based on the 99tile of the time they take to process 
>>>>>>> events
>>>>>>> after every poll.   This turns out is a nontrivial thing to do for an
>>>>>>> application todo. To start with when an application is new their data is
>>>>>>> going to be based on tests that they have done on synthetic data.  This
>>>>>>> often times doesn't represent what they will see in production.  Once 
>>>>>>> the
>>>>>>> app is in production their processing latencies will potentially vary 
>>>>>>> over
>>>>>>> time.  It is extremely unlikely that the application owner does a 
>>>>>>> careful
>>>>>>> job of monitoring the 99tile of latencies over time and readjust the
>>>>>>> settings.  Often times the latencies vary because of variance is other
>>>>>>> services that are called by the consumer as part of processing the 
>>>>>>> events.
>>>>>>>
>>>>>>> Case in point would be a simple app which reads events and writes to
>>>>>>> Kafka.  With quotas the write latencies to kafka could range from a few
>>>>>>> milliseconds all the way to a tens of seconds.  As the scale of 
>>>>>>> processing
>>>>>>> for an app increasing the app or that 'user' could now get quotaed.
>>>>>>> Instead of slowing down gracefully unless the application owner has
>>>>>>> carefully tuned the timeout, now we are looking at a potential outage 
>>>>>>> where
>>>>>>> the app could get hit by constant rebalances.
>>>>>>>
>>>>>>> If we expose the pause() Api then It is possible for us to take care
>>>>>>> of this in the linkedin wrapper.  Whereby we would keep calling poll on 
>>>>>>> a
>>>>>>> separate thread periodically and enqueue the messages. When the queue is
>>>>>>> full we would call pause().
>>>>>>>
>>>>>>> In essence we can work around it in LinkedIn, however I think it is
>>>>>>> vastly better if we address this in the Api as every major customer will
>>>>>>> eventually be pained by it.
>>>>>>>
>>>>>>> Kartik
>>>>>>>
>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> wrote:
>>>>>>>
>>>>>>> Hey guys,
>>>>>>>
>>>>>>> Happy to discuss. I agree there may be some rough edges and now is
>>>>>>> definitely the time to clean them up.
>>>>>>>
>>>>>>> I'm pretty reluctant to change the threading model or undergo a big
>>>>>>> api redesign at this point beyond the group management stuff we've
>>>>>>> discussed in the context of Samza/copycat which is already a big effort.
>>>>>>>
>>>>>>> Overall I agree that we have done a poor job of documenting which
>>>>>>> apis block and which don't and when people are surprised because we 
>>>>>>> haven't
>>>>>>> labeled something that will be unintuitive. But the overall style of
>>>>>>> poll/select-based apis is quite common in programming going back to unix
>>>>>>> select so I don't think it's beyond people if explained well (after all 
>>>>>>> we
>>>>>>> need to mix sync and async apis and if we don't say which is which any
>>>>>>> scheme will be confusing).
>>>>>>>
>>>>>>> For what it's worth the experience with this api has actually been
>>>>>>> about 1000x better than the issues people had around intuitiveness with 
>>>>>>> the
>>>>>>> high-level api. The crazy blocking iterator, impossible internal queue
>>>>>>> sizing, baroque threading model, etc  have all caused endless amounts of
>>>>>>> anger. Not to mention that that client effectively disqualifies about 
>>>>>>> 50%
>>>>>>> of the use cases people want to try to use it for (plus I regularly hear
>>>>>>> people tell me they've heard not to use it at all for various reasons
>>>>>>> ranging from data loss to lack of features). It's important to have that
>>>>>>> context when people need to switch and they say "oh the old way was so
>>>>>>> simple and the new way complex!" :-)
>>>>>>>
>>>>>>> Let me give some context related to your points, based on our
>>>>>>> previous discussions:
>>>>>>>
>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>
>>>>>>> The motivation for avoiding additional threading was two-fold. First
>>>>>>> this client is really intended to be the lowest level client. There are
>>>>>>> many, many possible higher level processing abstractions. One thing we
>>>>>>> found to be a big problem with the high-level client was that it coupled
>>>>>>> things everyone must have--failover, etc--with things that are 
>>>>>>> different in
>>>>>>> each use case like the appropriate threading model. If you do this you 
>>>>>>> need
>>>>>>> to also maintain a thread free low-level consumer api for people to get
>>>>>>> around whatever you have done.
>>>>>>>
>>>>>>> The second reason was that the internal threading in the client
>>>>>>> became quite complex. The answer with threading is always that "it 
>>>>>>> won't be
>>>>>>> complex this time", but it always is.
>>>>>>>
>>>>>>> For the heartbeat you correctly describe the downside to coupling
>>>>>>> heartbeat with poll--the contract is that the application must regularly
>>>>>>> consume to be considered an active consumer. This allows the 
>>>>>>> possibility of
>>>>>>> false positive failure detections. However it's important to understand 
>>>>>>> the
>>>>>>> downside of the alternative. If you do background polling a consumer is
>>>>>>> considered active as long as it isn't shutdown. This leads to all kinds 
>>>>>>> of
>>>>>>> active consumers that aren't consuming because they have leaked or
>>>>>>> otherwise stopped but are still claiming partitions and heart-beating. 
>>>>>>> This
>>>>>>> failure mode is actually far far worse. If you allow false positives the
>>>>>>> user sees the frequent rebalances and knows they aren't consuming
>>>>>>> frequently enough to be considered active but if you allows false 
>>>>>>> negatives
>>>>>>> you end up having weeks go by before someone notices that a partition 
>>>>>>> has
>>>>>>> been unconsumed the whole time at which point the data is gone. Plus of
>>>>>>> course even if you do this you still have regular false positives anyway
>>>>>>> from GC pauses (as now). We discussed this in some depth at the time and
>>>>>>> decided that it is better to have the liveness notion tied to
>>>>>>> *actual* consumption which is the actual definition of liveness.
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> -Jay
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <okara...@linkedin.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi Confluent Team.
>>>>>>>>
>>>>>>>> There has recently been a lot of open source activity regarding the
>>>>>>>> new KafkaConsumer:
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>
>>>>>>>>
>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>
>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza, and
>>>>>>>> some other teams and we got similar feedback.
>>>>>>>>
>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>    KafkaConsumer.poll drives everything. The other methods like 
>>>>>>>> subscribe,
>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything without a
>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    The semantics of a commit() call should be consistent between
>>>>>>>>    sync and async operations. Currently, sync commit is a blocking 
>>>>>>>> call which
>>>>>>>>    actually sends out an OffsetCommitRequest and waits for the 
>>>>>>>> response upon
>>>>>>>>    the user’s KafkaConsumer.commit call. However, the async commit is a
>>>>>>>>    nonblocking call which just queues up the OffsetCommitRequest. The 
>>>>>>>> request
>>>>>>>>    itself is later sent out in the next poll. The teams we talked to 
>>>>>>>> found
>>>>>>>>    this misleading.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    Heartbeats are dependent on user application behavior (i.e.
>>>>>>>>    user applications calling poll). This can be a big problem as we 
>>>>>>>> don’t
>>>>>>>>    control how different applications behave. For example, we might 
>>>>>>>> have an
>>>>>>>>    application which reads from Kafka and writes to Espresso. If 
>>>>>>>> Espresso is
>>>>>>>>    slow for whatever reason, then in rebalances could happen.
>>>>>>>>
>>>>>>>> Generally speaking, we feel that the current KafkaConsumer API
>>>>>>>> design is more of a wrapping around the old simple consumer, i.e. in 
>>>>>>>> old
>>>>>>>> consumer we ask users to deal with raw protocols and error handlings 
>>>>>>>> while
>>>>>>>> in KafkaConsumer we do that for users. However, for old high level 
>>>>>>>> consumer
>>>>>>>> users (which are the majority of users), the experience is a noticeable
>>>>>>>> regression. The old high level consumer interface is simple and easy 
>>>>>>>> to use
>>>>>>>> for end user, while KafkaConsumer requires users to be aware of many
>>>>>>>> underlying details and is becoming prohibitive for users to adopt. 
>>>>>>>> This is
>>>>>>>> hinted by the javadoc growing bigger and bigger.
>>>>>>>>
>>>>>>>> We think it's getting to the point where we should take a step back
>>>>>>>> and look at the big picture.
>>>>>>>>
>>>>>>>> The current state of KafkaConsumer is that it's single-threaded.
>>>>>>>> There's one big KafkaConsumer.poll called by the user which pretty much
>>>>>>>> drives everything:
>>>>>>>>
>>>>>>>> - data fetches
>>>>>>>>
>>>>>>>> - heartbeats
>>>>>>>>
>>>>>>>> - join groups (new consumer joining a group, topic subscription
>>>>>>>> changes, reacting to group rebalance)
>>>>>>>>
>>>>>>>> - async offset commits
>>>>>>>>
>>>>>>>> - executing callbacks
>>>>>>>>
>>>>>>>> Given that the selector's poll is being driven by the end user,
>>>>>>>> this ends up making us educate users on NIO and the consequences of not
>>>>>>>> calling KafkaConsumer.poll frequently enough:
>>>>>>>>
>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>
>>>>>>>> - async commits won't send
>>>>>>>>
>>>>>>>> - callbacks won't fire
>>>>>>>>
>>>>>>>> More generally speaking, there are many surprises with the current
>>>>>>>> KafkaConsumer implementation.
>>>>>>>>
>>>>>>>> Here's what we consider to be the goals of KafkaConsumer:
>>>>>>>>
>>>>>>>> - NIO
>>>>>>>>
>>>>>>>> - ability to commit, manipulate offsets, and consume messages
>>>>>>>>
>>>>>>>> - a way to subscribe to topics(auto group management) or
>>>>>>>> partitions(explicit group management)
>>>>>>>>
>>>>>>>> - no surprises in the user experience
>>>>>>>>
>>>>>>>> The last point is the big one that we think we aren't hitting. We
>>>>>>>> think the most important example is that there should be no requirement
>>>>>>>> from the end user to consistently KafkaConsumer.poll in order for all 
>>>>>>>> of
>>>>>>>> the above tasks to happen. We think it would be better to split those 
>>>>>>>> tasks
>>>>>>>> into tasks that should not rely on KafkaConsumer.poll and tasks that 
>>>>>>>> should
>>>>>>>> rely on KafkaConsumer.poll.
>>>>>>>>
>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>
>>>>>>>> - heartbeats
>>>>>>>>
>>>>>>>> - join groups
>>>>>>>>
>>>>>>>> - commits
>>>>>>>>
>>>>>>>> - executing callbacks
>>>>>>>>
>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>
>>>>>>>> This would help reduce the amount of surprises to the end user.
>>>>>>>>
>>>>>>>> We’ve sketched out a proposal and we’ll send it out to you guys
>>>>>>>> early next week. We’d like to meet up with you at LinkedIn on *July
>>>>>>>> 31, 2015* so we can talk about it before proposing it to open
>>>>>>>> source.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Neha
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Thanks,
Neha

Reply via email to