Hey Aarti, Sorry for the late response. Your help with memory management would be great! Since this does seem to be one of the major outstanding issues for the consumer, I was going to start looking at it in detail. I found that Jay previously filed KAFKA-2063, which basically proposes to add a max.fetch.bytes which restricts the total size of fetch responses (across all partitions). That'll require another KIP since it requires a protocol change in addition to a configuration change. Feel free to take that issue if you don't want to wait for me.
As far as this KIP is concerned, from the discussion this morning, everyone seems to agree that max.poll.records is a minimally intrusive way to address this problem for users who are hitting it. As we get more feedback, we can consider whether deeper changes are needed. -Jason On Fri, Jan 8, 2016 at 4:11 PM, Aarti Gupta <aartigup...@gmail.com> wrote: > Hi Jason, > > +1 on the idea of adding max.poll.bytes as an optional configuration > (default set to -1, would mean that the setting does not come into play) > The pre-fetching optimization, (pre fetch again only those partitions with > no retained data), seems slightly better(same as what we have in production > today), in preventing massive build up of pre fetched messages in memory, > (in the interim of KAFKA-2045's introduction). > Maybe some perf testing with variable message sizes and JVM profiling of > both the variants of the algorithm might help tell us if it actually > matters, I can help work on these perf results with you as we get the JIRA > rolled out) > > thanks > aarti > > > On Fri, Jan 8, 2016 at 11:50 AM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Thanks Jens for all of your work as well! Unless there are any more > > concerns, perhaps we can open the vote early next week. > > > > As a quick summary for newcomers to this thread, the problem we're trying > > to solve in this KIP is how to give users more predictable control over > the > > message processing loop. Because the new consumer is single-threaded, the > > poll() API must be called frequently enough to ensure that the consumer > can > > send heartbeats before its session timeout expires. Typically we > recommend > > setting the session timeout large enough to make expiration unlikely, but > > that can be difficult advice to follow in practice when either the number > > of partitions is unknown or increases over time. In some cases, such as > in > > Jens' initial bug report, the processing time does not even depend > directly > > on the size of the total data to be processed. > > > > To address this problem, we have proposed to offer a new configuration > > option "max.poll.records" which sets an upper bound on the number of > > records returned in a single call to poll(). The point is to give users a > > way to limit message processing time so that the session timeout can be > set > > without risking unexpected rebalances. This change is backward compatible > > with the current API and users only need to change their configuration to > > take advantage of it. As a bonus, it provides an easy mechanism to > > implement commit policies which ensure commits at least as often as > every N > > records. > > > > As a final subject for consideration, it may make sense to also add a > > configuration "max.poll.bytes," which places an upper bound on the total > > size of the data returned in a call to poll(). This would solve the > problem > > more generally since some use cases may actually have processing time > which > > is more dependent on the total size of the data than the number of > records. > > Others might require a mix of the two. > > > > -Jason > > > > On Fri, Jan 8, 2016 at 9:42 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hi Aarti, > > > > > > Thanks for the feedback. I think the concern about memory overhead is > > > valid. As Guozhang mentioned, the problem already exists in the current > > > consumer, so this probably deserves consideration outside of this KIP. > > That > > > said, it's a good question whether our prefetching strategy makes it > more > > > difficult to control the memory overhead. The approach we've proposed > for > > > prefetching is basically the following: fetch all partitions whenever > the > > > number of retained messages is less than max.poll.records. In the worst > > > case, this increases the maximum memory used by the consumer by the > size > > of > > > those retained messages. As you've pointed out, messages could be very > > > large. We could reduce this requirement with a slight change: instead > of > > > fetching all partitions, we could fetch only those with no retained > data. > > > That would reduce the worst-case overhead to #no partitions * > > > max.partition.fetch.bytes, which matches the existing memory overhead. > > > Would that address your concern? > > > > > > A couple other points worth mentioning is that users have the option > not > > > to use max.poll.records, in which case the behavior will be the same as > > in > > > the current consumer. Additionally, the implementation can be changed > > over > > > time without affecting users, so we can adjust it in particular when we > > > address memory concerns in KAFKA-2045. > > > > > > On a side note, I'm wondering if it would be useful to extend this KIP > to > > > include a max.poll.bytes? For some use cases, it may make more sense to > > > control the processing time by the size of data instead of the number > of > > > records. Not that I'm in anxious to draw this out, but if we'll need > this > > > setting eventually, we may as well do it now. Thoughts? > > > > > > > > > -Jason > > > > > > On Fri, Jan 8, 2016 at 1:03 AM, Jens Rantil <jens.ran...@tink.se> > wrote: > > > > > >> Hi, > > >> > > >> I just publicly wanted to thank Jason for the work he's done with the > > KIP > > >> and say that I've been in touch with him privately back and forth to > > work > > >> out of some of its details. Thanks! > > >> > > >> Since it feels like I initiated this KIP a bit I also want to say that > > I'm > > >> happy with it and that its proposal solves the initial issue I > reported > > in > > >> https://issues.apache.org/jira/browse/KAFKA-2986. That said, I open > > for a > > >> [VOTE] on my behalf. I propose Jason decides when voting starts. > > >> > > >> Cheers and keep up the good work, > > >> Jens > > >> > > >> On Tue, Jan 5, 2016 at 8:32 PM, Jason Gustafson <ja...@confluent.io> > > >> wrote: > > >> > > >> > I've updated the KIP with some implementation details. I also added > > more > > >> > discussion on the heartbeat() alternative. The short answer for why > we > > >> > rejected this API is that it doesn't seem to work well with offset > > >> commits. > > >> > This would tend to make correct usage complicated and difficult to > > >> explain. > > >> > Additionally, we don't see any clear advantages over having a way to > > set > > >> > the max records. For example, using max.records=1 would be > equivalent > > to > > >> > invoking heartbeat() on each iteration of the message processing > loop. > > >> > > > >> > Going back to the discussion on whether we should use a > configuration > > >> value > > >> > or overload poll(), I'm leaning toward the configuration option > mainly > > >> for > > >> > compatibility and to keep the KafkaConsumer API from getting any > more > > >> > complex. Also, as others have mentioned, it seems reasonable to want > > to > > >> > tune this setting in the same place that the session timeout and > > >> heartbeat > > >> > interval are configured. I still feel a little uncomfortable with > the > > >> need > > >> > to do a lot of configuration tuning to get the consumer working for > a > > >> > particular environment, but hopefully the defaults are conservative > > >> enough > > >> > that most users won't need to. However, if it remains a problem, > then > > we > > >> > could still look into better options for managing the size of > batches > > >> > including overloading poll() with a max records argument or possibly > > by > > >> > implementing a batch scaling algorithm internally. > > >> > > > >> > -Jason > > >> > > > >> > > > >> > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson < > ja...@confluent.io> > > >> > wrote: > > >> > > > >> > > Hi Cliff, > > >> > > > > >> > > I think we're all agreed that the current contract of poll() > should > > be > > >> > > kept. The consumer wouldn't wait for max messages to become > > available > > >> in > > >> > > this proposal; it would only sure that it never returns more than > > max > > >> > > messages. > > >> > > > > >> > > -Jason > > >> > > > > >> > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crh...@signal.co> > > >> wrote: > > >> > > > > >> > >> Instead of a heartbeat, I'd prefer poll() to return whatever > > messages > > >> > the > > >> > >> client has. Either a) I don't care if I get less than my max > > message > > >> > >> limit > > >> > >> or b) I do care and will set a larger timeout. Case B is less > > common > > >> > than > > >> > >> A and is fairly easy to handle in the application's code. > > >> > >> > > >> > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <g...@confluent.io> > > >> wrote: > > >> > >> > > >> > >> > 1. Agree that TCP window style scaling will be cool. I'll try > to > > >> think > > >> > >> of a > > >> > >> > good excuse to use it ;) > > >> > >> > > > >> > >> > 2. I'm very concerned about the challenges of getting the > > timeouts, > > >> > >> > hearbeats and max messages right. > > >> > >> > > > >> > >> > Another option could be to expose "heartbeat" API to consumers. > > If > > >> my > > >> > >> app > > >> > >> > is still processing data but is still alive, it could initiate > a > > >> > >> heartbeat > > >> > >> > to signal its alive without having to handle additional > messages. > > >> > >> > > > >> > >> > I don't know if this improves more than it complicates though > :( > > >> > >> > > > >> > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < > > >> ja...@confluent.io> > > >> > >> > wrote: > > >> > >> > > > >> > >> > > Hey Gwen, > > >> > >> > > > > >> > >> > > I was thinking along the lines of TCP window scaling in order > > to > > >> > >> > > dynamically find a good consumption rate. Basically you'd > start > > >> off > > >> > >> > > consuming say 100 records and you'd let it increase until the > > >> > >> consumption > > >> > >> > > took longer than half the session timeout (for example). You > > >> /might/ > > >> > >> be > > >> > >> > > able to achieve the same thing using pause/resume, but it > would > > >> be a > > >> > >> lot > > >> > >> > > trickier since you have to do it at the granularity of > > >> partitions. > > >> > But > > >> > >> > > yeah, database write performance doesn't always scale in a > > >> > predictable > > >> > >> > > enough way to accommodate this, so I'm not sure how useful it > > >> would > > >> > >> be in > > >> > >> > > practice. It might also be more difficult to implement since > it > > >> > >> wouldn't > > >> > >> > be > > >> > >> > > as clear when to initiate the next fetch. With a static > > setting, > > >> the > > >> > >> > > consumer knows exactly how many records will be returned on > the > > >> next > > >> > >> call > > >> > >> > > to poll() and can send fetches accordingly. > > >> > >> > > > > >> > >> > > On the other hand, I do feel a little wary of the need to > tune > > >> the > > >> > >> > session > > >> > >> > > timeout and max messages though since these settings might > > >> depend on > > >> > >> the > > >> > >> > > environment that the consumer is deployed in. It wouldn't be > a > > >> big > > >> > >> deal > > >> > >> > if > > >> > >> > > the impact was relatively minor, but getting them wrong can > > >> cause a > > >> > >> lot > > >> > >> > of > > >> > >> > > rebalance churn which could keep the consumer from making any > > >> > >> progress. > > >> > >> > > It's not a particularly graceful failure. > > >> > >> > > > > >> > >> > > -Jason > > >> > >> > > > > >> > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira < > > g...@confluent.io > > >> > > > >> > >> wrote: > > >> > >> > > > > >> > >> > > > I can't speak to all use-cases, but for the database one, I > > >> think > > >> > >> > > > pause-resume will be necessary in any case, and therefore > > >> dynamic > > >> > >> batch > > >> > >> > > > sizes are not needed. > > >> > >> > > > > > >> > >> > > > Databases are really unexpected regarding response times - > > load > > >> > and > > >> > >> > > locking > > >> > >> > > > can affect this. I'm not sure there's a good way to know > you > > >> are > > >> > >> going > > >> > >> > > into > > >> > >> > > > rebalance hell before it is too late. So if I were writing > > code > > >> > that > > >> > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable > batch > > >> size > > >> > >> (say > > >> > >> > > 5000 > > >> > >> > > > records), and basically pause, batch-insert all records, > > commit > > >> > and > > >> > >> > > resume. > > >> > >> > > > > > >> > >> > > > Does that make sense? > > >> > >> > > > > > >> > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < > > >> > >> ja...@confluent.io> > > >> > >> > > > wrote: > > >> > >> > > > > > >> > >> > > > > Gwen and Ismael, > > >> > >> > > > > > > >> > >> > > > > I agree the configuration option is probably the way to > go, > > >> but > > >> > I > > >> > >> was > > >> > >> > > > > wondering whether there would be cases where it made > sense > > to > > >> > let > > >> > >> the > > >> > >> > > > > consumer dynamically set max messages to adjust for > > >> downstream > > >> > >> > > slowness. > > >> > >> > > > > For example, if the consumer is writing consumed records > to > > >> > >> another > > >> > >> > > > > database, and that database is experiencing heavier than > > >> > expected > > >> > >> > load, > > >> > >> > > > > then the consumer could halve its current max messages in > > >> order > > >> > to > > >> > >> > > adapt > > >> > >> > > > > without risking rebalance hell. It could then increase > max > > >> > >> messages > > >> > >> > as > > >> > >> > > > the > > >> > >> > > > > load on the database decreases. It's basically an easier > > way > > >> to > > >> > >> > handle > > >> > >> > > > flow > > >> > >> > > > > control than we provide with pause/resume. > > >> > >> > > > > > > >> > >> > > > > -Jason > > >> > >> > > > > > > >> > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < > > >> g...@confluent.io > > >> > > > > >> > >> > > wrote: > > >> > >> > > > > > > >> > >> > > > > > The wiki you pointed to is no longer maintained and > fell > > >> out > > >> > of > > >> > >> > sync > > >> > >> > > > with > > >> > >> > > > > > the code and protocol. > > >> > >> > > > > > > > >> > >> > > > > > You may want to refer to: > > >> > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > >> > >> > > > > > > > >> > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < > > >> > >> jens.ran...@tink.se> > > >> > >> > > > wrote: > > >> > >> > > > > > > > >> > >> > > > > > > Hi guys, > > >> > >> > > > > > > > > >> > >> > > > > > > I realized I never thanked yall for your input - > > thanks! > > >> > >> > > > > > > Jason: I apologize for assuming your stance on the > > issue! > > >> > >> Feels > > >> > >> > > like > > >> > >> > > > we > > >> > >> > > > > > all > > >> > >> > > > > > > agreed on the solution. +1 > > >> > >> > > > > > > > > >> > >> > > > > > > Follow-up: Jason made a point about defining prefetch > > and > > >> > >> > fairness > > >> > >> > > > > > > behaviour in the KIP. I am now working on putting > that > > >> down > > >> > in > > >> > >> > > > writing. > > >> > >> > > > > > To > > >> > >> > > > > > > do be able to do this I think I need to understand > the > > >> > current > > >> > >> > > > prefetch > > >> > >> > > > > > > behaviour in the new consumer API (0.9) a bit better. > > >> Some > > >> > >> > specific > > >> > >> > > > > > > questions: > > >> > >> > > > > > > > > >> > >> > > > > > > - How does a specific consumer balance incoming > > >> messages > > >> > >> from > > >> > >> > > > > multiple > > >> > >> > > > > > > partitions? Is the consumer simply issuing > > Multi-Fetch > > >> > >> > > requests[1] > > >> > >> > > > > for > > >> > >> > > > > > > the > > >> > >> > > > > > > consumed assigned partitions of the relevant > topics? > > >> Or > > >> > is > > >> > >> the > > >> > >> > > > > > consumer > > >> > >> > > > > > > fetching from one partition at a time and > balancing > > >> > between > > >> > >> > them > > >> > >> > > > > > > internally? That is, is the responsibility of > > >> partition > > >> > >> > > balancing > > >> > >> > > > > (and > > >> > >> > > > > > > fairness) on the broker side or consumer side? > > >> > >> > > > > > > - Is the above documented somewhere? > > >> > >> > > > > > > > > >> > >> > > > > > > [1] > > >> > >> > > > > > > > > >> > >> > > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka > > >> > >> > > > > > > , > > >> > >> > > > > > > see "Multi-Fetch". > > >> > >> > > > > > > > > >> > >> > > > > > > Thanks, > > >> > >> > > > > > > Jens > > >> > >> > > > > > > > > >> > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < > > >> > >> ism...@juma.me.uk> > > >> > >> > > > > wrote: > > >> > >> > > > > > > > > >> > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < > > >> > >> > g...@confluent.io > > >> > >> > > > > > >> > >> > > > > > wrote: > > >> > >> > > > > > > > > > >> > >> > > > > > > > > Given the background, it sounds like you'll > > generally > > >> > want > > >> > >> > each > > >> > >> > > > > call > > >> > >> > > > > > to > > >> > >> > > > > > > > > poll() to return the same number of events (which > > is > > >> the > > >> > >> > number > > >> > >> > > > you > > >> > >> > > > > > > > planned > > >> > >> > > > > > > > > on having enough memory / time for). It also > sounds > > >> like > > >> > >> > tuning > > >> > >> > > > the > > >> > >> > > > > > > > number > > >> > >> > > > > > > > > of events will be closely tied to tuning the > > session > > >> > >> timeout. > > >> > >> > > > That > > >> > >> > > > > > is - > > >> > >> > > > > > > > if > > >> > >> > > > > > > > > I choose to lower the session timeout for some > > >> reason, I > > >> > >> will > > >> > >> > > > have > > >> > >> > > > > to > > >> > >> > > > > > > > > modify the number of records returning too. > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > If those assumptions are correct, I think a > > >> > configuration > > >> > >> > makes > > >> > >> > > > > more > > >> > >> > > > > > > > sense. > > >> > >> > > > > > > > > 1. We are unlikely to want this parameter to be > > >> change > > >> > at > > >> > >> the > > >> > >> > > > > > lifetime > > >> > >> > > > > > > of > > >> > >> > > > > > > > > the consumer > > >> > >> > > > > > > > > 2. The correct value is tied to another > > configuration > > >> > >> > > parameter, > > >> > >> > > > so > > >> > >> > > > > > > they > > >> > >> > > > > > > > > will be controlled together. > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > I was thinking the same thing. > > >> > >> > > > > > > > > > >> > >> > > > > > > > Ismael > > >> > >> > > > > > > > > > >> > >> > > > > > > > > >> > >> > > > > > > > > >> > >> > > > > > > > > >> > >> > > > > > > -- > > >> > >> > > > > > > Jens Rantil > > >> > >> > > > > > > Backend engineer > > >> > >> > > > > > > Tink AB > > >> > >> > > > > > > > > >> > >> > > > > > > Email: jens.ran...@tink.se > > >> > >> > > > > > > Phone: +46 708 84 18 32 > > >> > >> > > > > > > Web: www.tink.se > > >> > >> > > > > > > > > >> > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se> > > Linkedin > > >> > >> > > > > > > < > > >> > >> > > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > >> > >> > > > > > > > > > >> > >> > > > > > > Twitter <https://twitter.com/tink> > > >> > >> > > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > > >> > >> > > >> > >> -- > > >> > >> Cliff Rhyne > > >> > >> Software Engineering Lead > > >> > >> e: crh...@signal.co > > >> > >> signal.co > > >> > >> ________________________ > > >> > >> > > >> > >> Cut Through the Noise > > >> > >> > > >> > >> This e-mail and any files transmitted with it are for the sole > use > > of > > >> > the > > >> > >> intended recipient(s) and may contain confidential and privileged > > >> > >> information. Any unauthorized use of this email is strictly > > >> prohibited. > > >> > >> ©2015 Signal. All rights reserved. > > >> > >> > > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> Jens Rantil > > >> Backend engineer > > >> Tink AB > > >> > > >> Email: jens.ran...@tink.se > > >> Phone: +46 708 84 18 32 > > >> Web: www.tink.se > > >> > > >> Facebook <https://www.facebook.com/#!/tink.se> Linkedin > > >> < > > >> > > > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > >> > > > >> Twitter <https://twitter.com/tink> > > >> > > > > > > > > >