If there are no more concerns, I'll open the vote for this KIP later today.
Thanks, Jason On Tue, Jan 12, 2016 at 5:07 PM, Jason Gustafson <ja...@confluent.io> wrote: > Hey Aarti, > > Sorry for the late response. Your help with memory management would be > great! Since this does seem to be one of the major outstanding issues for > the consumer, I was going to start looking at it in detail. I found that > Jay previously filed KAFKA-2063, which basically proposes to add a > max.fetch.bytes which restricts the total size of fetch responses (across > all partitions). That'll require another KIP since it requires a protocol > change in addition to a configuration change. Feel free to take that issue > if you don't want to wait for me. > > As far as this KIP is concerned, from the discussion this morning, > everyone seems to agree that max.poll.records is a minimally intrusive way > to address this problem for users who are hitting it. As we get more > feedback, we can consider whether deeper changes are needed. > > -Jason > > On Fri, Jan 8, 2016 at 4:11 PM, Aarti Gupta <aartigup...@gmail.com> wrote: > >> Hi Jason, >> >> +1 on the idea of adding max.poll.bytes as an optional configuration >> (default set to -1, would mean that the setting does not come into play) >> The pre-fetching optimization, (pre fetch again only those partitions >> with >> no retained data), seems slightly better(same as what we have in >> production >> today), in preventing massive build up of pre fetched messages in memory, >> (in the interim of KAFKA-2045's introduction). >> Maybe some perf testing with variable message sizes and JVM profiling of >> both the variants of the algorithm might help tell us if it actually >> matters, I can help work on these perf results with you as we get the JIRA >> rolled out) >> >> thanks >> aarti >> >> >> On Fri, Jan 8, 2016 at 11:50 AM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >> > Thanks Jens for all of your work as well! Unless there are any more >> > concerns, perhaps we can open the vote early next week. >> > >> > As a quick summary for newcomers to this thread, the problem we're >> trying >> > to solve in this KIP is how to give users more predictable control over >> the >> > message processing loop. Because the new consumer is single-threaded, >> the >> > poll() API must be called frequently enough to ensure that the consumer >> can >> > send heartbeats before its session timeout expires. Typically we >> recommend >> > setting the session timeout large enough to make expiration unlikely, >> but >> > that can be difficult advice to follow in practice when either the >> number >> > of partitions is unknown or increases over time. In some cases, such as >> in >> > Jens' initial bug report, the processing time does not even depend >> directly >> > on the size of the total data to be processed. >> > >> > To address this problem, we have proposed to offer a new configuration >> > option "max.poll.records" which sets an upper bound on the number of >> > records returned in a single call to poll(). The point is to give users >> a >> > way to limit message processing time so that the session timeout can be >> set >> > without risking unexpected rebalances. This change is backward >> compatible >> > with the current API and users only need to change their configuration >> to >> > take advantage of it. As a bonus, it provides an easy mechanism to >> > implement commit policies which ensure commits at least as often as >> every N >> > records. >> > >> > As a final subject for consideration, it may make sense to also add a >> > configuration "max.poll.bytes," which places an upper bound on the total >> > size of the data returned in a call to poll(). This would solve the >> problem >> > more generally since some use cases may actually have processing time >> which >> > is more dependent on the total size of the data than the number of >> records. >> > Others might require a mix of the two. >> > >> > -Jason >> > >> > On Fri, Jan 8, 2016 at 9:42 AM, Jason Gustafson <ja...@confluent.io> >> > wrote: >> > >> > > Hi Aarti, >> > > >> > > Thanks for the feedback. I think the concern about memory overhead is >> > > valid. As Guozhang mentioned, the problem already exists in the >> current >> > > consumer, so this probably deserves consideration outside of this KIP. >> > That >> > > said, it's a good question whether our prefetching strategy makes it >> more >> > > difficult to control the memory overhead. The approach we've proposed >> for >> > > prefetching is basically the following: fetch all partitions whenever >> the >> > > number of retained messages is less than max.poll.records. In the >> worst >> > > case, this increases the maximum memory used by the consumer by the >> size >> > of >> > > those retained messages. As you've pointed out, messages could be very >> > > large. We could reduce this requirement with a slight change: instead >> of >> > > fetching all partitions, we could fetch only those with no retained >> data. >> > > That would reduce the worst-case overhead to #no partitions * >> > > max.partition.fetch.bytes, which matches the existing memory overhead. >> > > Would that address your concern? >> > > >> > > A couple other points worth mentioning is that users have the option >> not >> > > to use max.poll.records, in which case the behavior will be the same >> as >> > in >> > > the current consumer. Additionally, the implementation can be changed >> > over >> > > time without affecting users, so we can adjust it in particular when >> we >> > > address memory concerns in KAFKA-2045. >> > > >> > > On a side note, I'm wondering if it would be useful to extend this >> KIP to >> > > include a max.poll.bytes? For some use cases, it may make more sense >> to >> > > control the processing time by the size of data instead of the number >> of >> > > records. Not that I'm in anxious to draw this out, but if we'll need >> this >> > > setting eventually, we may as well do it now. Thoughts? >> > > >> > > >> > > -Jason >> > > >> > > On Fri, Jan 8, 2016 at 1:03 AM, Jens Rantil <jens.ran...@tink.se> >> wrote: >> > > >> > >> Hi, >> > >> >> > >> I just publicly wanted to thank Jason for the work he's done with the >> > KIP >> > >> and say that I've been in touch with him privately back and forth to >> > work >> > >> out of some of its details. Thanks! >> > >> >> > >> Since it feels like I initiated this KIP a bit I also want to say >> that >> > I'm >> > >> happy with it and that its proposal solves the initial issue I >> reported >> > in >> > >> https://issues.apache.org/jira/browse/KAFKA-2986. That said, I open >> > for a >> > >> [VOTE] on my behalf. I propose Jason decides when voting starts. >> > >> >> > >> Cheers and keep up the good work, >> > >> Jens >> > >> >> > >> On Tue, Jan 5, 2016 at 8:32 PM, Jason Gustafson <ja...@confluent.io> >> > >> wrote: >> > >> >> > >> > I've updated the KIP with some implementation details. I also added >> > more >> > >> > discussion on the heartbeat() alternative. The short answer for >> why we >> > >> > rejected this API is that it doesn't seem to work well with offset >> > >> commits. >> > >> > This would tend to make correct usage complicated and difficult to >> > >> explain. >> > >> > Additionally, we don't see any clear advantages over having a way >> to >> > set >> > >> > the max records. For example, using max.records=1 would be >> equivalent >> > to >> > >> > invoking heartbeat() on each iteration of the message processing >> loop. >> > >> > >> > >> > Going back to the discussion on whether we should use a >> configuration >> > >> value >> > >> > or overload poll(), I'm leaning toward the configuration option >> mainly >> > >> for >> > >> > compatibility and to keep the KafkaConsumer API from getting any >> more >> > >> > complex. Also, as others have mentioned, it seems reasonable to >> want >> > to >> > >> > tune this setting in the same place that the session timeout and >> > >> heartbeat >> > >> > interval are configured. I still feel a little uncomfortable with >> the >> > >> need >> > >> > to do a lot of configuration tuning to get the consumer working >> for a >> > >> > particular environment, but hopefully the defaults are conservative >> > >> enough >> > >> > that most users won't need to. However, if it remains a problem, >> then >> > we >> > >> > could still look into better options for managing the size of >> batches >> > >> > including overloading poll() with a max records argument or >> possibly >> > by >> > >> > implementing a batch scaling algorithm internally. >> > >> > >> > >> > -Jason >> > >> > >> > >> > >> > >> > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson < >> ja...@confluent.io> >> > >> > wrote: >> > >> > >> > >> > > Hi Cliff, >> > >> > > >> > >> > > I think we're all agreed that the current contract of poll() >> should >> > be >> > >> > > kept. The consumer wouldn't wait for max messages to become >> > available >> > >> in >> > >> > > this proposal; it would only sure that it never returns more than >> > max >> > >> > > messages. >> > >> > > >> > >> > > -Jason >> > >> > > >> > >> > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crh...@signal.co> >> > >> wrote: >> > >> > > >> > >> > >> Instead of a heartbeat, I'd prefer poll() to return whatever >> > messages >> > >> > the >> > >> > >> client has. Either a) I don't care if I get less than my max >> > message >> > >> > >> limit >> > >> > >> or b) I do care and will set a larger timeout. Case B is less >> > common >> > >> > than >> > >> > >> A and is fairly easy to handle in the application's code. >> > >> > >> >> > >> > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <g...@confluent.io >> > >> > >> wrote: >> > >> > >> >> > >> > >> > 1. Agree that TCP window style scaling will be cool. I'll try >> to >> > >> think >> > >> > >> of a >> > >> > >> > good excuse to use it ;) >> > >> > >> > >> > >> > >> > 2. I'm very concerned about the challenges of getting the >> > timeouts, >> > >> > >> > hearbeats and max messages right. >> > >> > >> > >> > >> > >> > Another option could be to expose "heartbeat" API to >> consumers. >> > If >> > >> my >> > >> > >> app >> > >> > >> > is still processing data but is still alive, it could >> initiate a >> > >> > >> heartbeat >> > >> > >> > to signal its alive without having to handle additional >> messages. >> > >> > >> > >> > >> > >> > I don't know if this improves more than it complicates though >> :( >> > >> > >> > >> > >> > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < >> > >> ja...@confluent.io> >> > >> > >> > wrote: >> > >> > >> > >> > >> > >> > > Hey Gwen, >> > >> > >> > > >> > >> > >> > > I was thinking along the lines of TCP window scaling in >> order >> > to >> > >> > >> > > dynamically find a good consumption rate. Basically you'd >> start >> > >> off >> > >> > >> > > consuming say 100 records and you'd let it increase until >> the >> > >> > >> consumption >> > >> > >> > > took longer than half the session timeout (for example). You >> > >> /might/ >> > >> > >> be >> > >> > >> > > able to achieve the same thing using pause/resume, but it >> would >> > >> be a >> > >> > >> lot >> > >> > >> > > trickier since you have to do it at the granularity of >> > >> partitions. >> > >> > But >> > >> > >> > > yeah, database write performance doesn't always scale in a >> > >> > predictable >> > >> > >> > > enough way to accommodate this, so I'm not sure how useful >> it >> > >> would >> > >> > >> be in >> > >> > >> > > practice. It might also be more difficult to implement >> since it >> > >> > >> wouldn't >> > >> > >> > be >> > >> > >> > > as clear when to initiate the next fetch. With a static >> > setting, >> > >> the >> > >> > >> > > consumer knows exactly how many records will be returned on >> the >> > >> next >> > >> > >> call >> > >> > >> > > to poll() and can send fetches accordingly. >> > >> > >> > > >> > >> > >> > > On the other hand, I do feel a little wary of the need to >> tune >> > >> the >> > >> > >> > session >> > >> > >> > > timeout and max messages though since these settings might >> > >> depend on >> > >> > >> the >> > >> > >> > > environment that the consumer is deployed in. It wouldn't >> be a >> > >> big >> > >> > >> deal >> > >> > >> > if >> > >> > >> > > the impact was relatively minor, but getting them wrong can >> > >> cause a >> > >> > >> lot >> > >> > >> > of >> > >> > >> > > rebalance churn which could keep the consumer from making >> any >> > >> > >> progress. >> > >> > >> > > It's not a particularly graceful failure. >> > >> > >> > > >> > >> > >> > > -Jason >> > >> > >> > > >> > >> > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira < >> > g...@confluent.io >> > >> > >> > >> > >> wrote: >> > >> > >> > > >> > >> > >> > > > I can't speak to all use-cases, but for the database one, >> I >> > >> think >> > >> > >> > > > pause-resume will be necessary in any case, and therefore >> > >> dynamic >> > >> > >> batch >> > >> > >> > > > sizes are not needed. >> > >> > >> > > > >> > >> > >> > > > Databases are really unexpected regarding response times - >> > load >> > >> > and >> > >> > >> > > locking >> > >> > >> > > > can affect this. I'm not sure there's a good way to know >> you >> > >> are >> > >> > >> going >> > >> > >> > > into >> > >> > >> > > > rebalance hell before it is too late. So if I were writing >> > code >> > >> > that >> > >> > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable >> batch >> > >> size >> > >> > >> (say >> > >> > >> > > 5000 >> > >> > >> > > > records), and basically pause, batch-insert all records, >> > commit >> > >> > and >> > >> > >> > > resume. >> > >> > >> > > > >> > >> > >> > > > Does that make sense? >> > >> > >> > > > >> > >> > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < >> > >> > >> ja...@confluent.io> >> > >> > >> > > > wrote: >> > >> > >> > > > >> > >> > >> > > > > Gwen and Ismael, >> > >> > >> > > > > >> > >> > >> > > > > I agree the configuration option is probably the way to >> go, >> > >> but >> > >> > I >> > >> > >> was >> > >> > >> > > > > wondering whether there would be cases where it made >> sense >> > to >> > >> > let >> > >> > >> the >> > >> > >> > > > > consumer dynamically set max messages to adjust for >> > >> downstream >> > >> > >> > > slowness. >> > >> > >> > > > > For example, if the consumer is writing consumed >> records to >> > >> > >> another >> > >> > >> > > > > database, and that database is experiencing heavier than >> > >> > expected >> > >> > >> > load, >> > >> > >> > > > > then the consumer could halve its current max messages >> in >> > >> order >> > >> > to >> > >> > >> > > adapt >> > >> > >> > > > > without risking rebalance hell. It could then increase >> max >> > >> > >> messages >> > >> > >> > as >> > >> > >> > > > the >> > >> > >> > > > > load on the database decreases. It's basically an easier >> > way >> > >> to >> > >> > >> > handle >> > >> > >> > > > flow >> > >> > >> > > > > control than we provide with pause/resume. >> > >> > >> > > > > >> > >> > >> > > > > -Jason >> > >> > >> > > > > >> > >> > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < >> > >> g...@confluent.io >> > >> > > >> > >> > >> > > wrote: >> > >> > >> > > > > >> > >> > >> > > > > > The wiki you pointed to is no longer maintained and >> fell >> > >> out >> > >> > of >> > >> > >> > sync >> > >> > >> > > > with >> > >> > >> > > > > > the code and protocol. >> > >> > >> > > > > > >> > >> > >> > > > > > You may want to refer to: >> > >> > >> > > > > > >> > >> > >> > > > > > >> > >> > >> > > > > >> > >> > >> > > > >> > >> > >> > > >> > >> > >> > >> > >> > >> >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol >> > >> > >> > > > > > >> > >> > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < >> > >> > >> jens.ran...@tink.se> >> > >> > >> > > > wrote: >> > >> > >> > > > > > >> > >> > >> > > > > > > Hi guys, >> > >> > >> > > > > > > >> > >> > >> > > > > > > I realized I never thanked yall for your input - >> > thanks! >> > >> > >> > > > > > > Jason: I apologize for assuming your stance on the >> > issue! >> > >> > >> Feels >> > >> > >> > > like >> > >> > >> > > > we >> > >> > >> > > > > > all >> > >> > >> > > > > > > agreed on the solution. +1 >> > >> > >> > > > > > > >> > >> > >> > > > > > > Follow-up: Jason made a point about defining >> prefetch >> > and >> > >> > >> > fairness >> > >> > >> > > > > > > behaviour in the KIP. I am now working on putting >> that >> > >> down >> > >> > in >> > >> > >> > > > writing. >> > >> > >> > > > > > To >> > >> > >> > > > > > > do be able to do this I think I need to understand >> the >> > >> > current >> > >> > >> > > > prefetch >> > >> > >> > > > > > > behaviour in the new consumer API (0.9) a bit >> better. >> > >> Some >> > >> > >> > specific >> > >> > >> > > > > > > questions: >> > >> > >> > > > > > > >> > >> > >> > > > > > > - How does a specific consumer balance incoming >> > >> messages >> > >> > >> from >> > >> > >> > > > > multiple >> > >> > >> > > > > > > partitions? Is the consumer simply issuing >> > Multi-Fetch >> > >> > >> > > requests[1] >> > >> > >> > > > > for >> > >> > >> > > > > > > the >> > >> > >> > > > > > > consumed assigned partitions of the relevant >> topics? >> > >> Or >> > >> > is >> > >> > >> the >> > >> > >> > > > > > consumer >> > >> > >> > > > > > > fetching from one partition at a time and >> balancing >> > >> > between >> > >> > >> > them >> > >> > >> > > > > > > internally? That is, is the responsibility of >> > >> partition >> > >> > >> > > balancing >> > >> > >> > > > > (and >> > >> > >> > > > > > > fairness) on the broker side or consumer side? >> > >> > >> > > > > > > - Is the above documented somewhere? >> > >> > >> > > > > > > >> > >> > >> > > > > > > [1] >> > >> > >> > > > > > > >> > >> > >> > > > > > > >> > >> > >> > > > > > >> > >> > >> > > > > >> > >> > >> > > > >> > >> > >> > > >> > >> > >> > >> > >> > >> >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka >> > >> > >> > > > > > > , >> > >> > >> > > > > > > see "Multi-Fetch". >> > >> > >> > > > > > > >> > >> > >> > > > > > > Thanks, >> > >> > >> > > > > > > Jens >> > >> > >> > > > > > > >> > >> > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < >> > >> > >> ism...@juma.me.uk> >> > >> > >> > > > > wrote: >> > >> > >> > > > > > > >> > >> > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < >> > >> > >> > g...@confluent.io >> > >> > >> > > > >> > >> > >> > > > > > wrote: >> > >> > >> > > > > > > > >> > >> > >> > > > > > > > > Given the background, it sounds like you'll >> > generally >> > >> > want >> > >> > >> > each >> > >> > >> > > > > call >> > >> > >> > > > > > to >> > >> > >> > > > > > > > > poll() to return the same number of events >> (which >> > is >> > >> the >> > >> > >> > number >> > >> > >> > > > you >> > >> > >> > > > > > > > planned >> > >> > >> > > > > > > > > on having enough memory / time for). It also >> sounds >> > >> like >> > >> > >> > tuning >> > >> > >> > > > the >> > >> > >> > > > > > > > number >> > >> > >> > > > > > > > > of events will be closely tied to tuning the >> > session >> > >> > >> timeout. >> > >> > >> > > > That >> > >> > >> > > > > > is - >> > >> > >> > > > > > > > if >> > >> > >> > > > > > > > > I choose to lower the session timeout for some >> > >> reason, I >> > >> > >> will >> > >> > >> > > > have >> > >> > >> > > > > to >> > >> > >> > > > > > > > > modify the number of records returning too. >> > >> > >> > > > > > > > > >> > >> > >> > > > > > > > > If those assumptions are correct, I think a >> > >> > configuration >> > >> > >> > makes >> > >> > >> > > > > more >> > >> > >> > > > > > > > sense. >> > >> > >> > > > > > > > > 1. We are unlikely to want this parameter to be >> > >> change >> > >> > at >> > >> > >> the >> > >> > >> > > > > > lifetime >> > >> > >> > > > > > > of >> > >> > >> > > > > > > > > the consumer >> > >> > >> > > > > > > > > 2. The correct value is tied to another >> > configuration >> > >> > >> > > parameter, >> > >> > >> > > > so >> > >> > >> > > > > > > they >> > >> > >> > > > > > > > > will be controlled together. >> > >> > >> > > > > > > > > >> > >> > >> > > > > > > > >> > >> > >> > > > > > > > I was thinking the same thing. >> > >> > >> > > > > > > > >> > >> > >> > > > > > > > Ismael >> > >> > >> > > > > > > > >> > >> > >> > > > > > > >> > >> > >> > > > > > > >> > >> > >> > > > > > > >> > >> > >> > > > > > > -- >> > >> > >> > > > > > > Jens Rantil >> > >> > >> > > > > > > Backend engineer >> > >> > >> > > > > > > Tink AB >> > >> > >> > > > > > > >> > >> > >> > > > > > > Email: jens.ran...@tink.se >> > >> > >> > > > > > > Phone: +46 708 84 18 32 >> > >> > >> > > > > > > Web: www.tink.se >> > >> > >> > > > > > > >> > >> > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se> >> > Linkedin >> > >> > >> > > > > > > < >> > >> > >> > > > > > > >> > >> > >> > > > > > >> > >> > >> > > > > >> > >> > >> > > > >> > >> > >> > > >> > >> > >> > >> > >> > >> >> > >> > >> > >> >> > >> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary >> > >> > >> > > > > > > > >> > >> > >> > > > > > > Twitter <https://twitter.com/tink> >> > >> > >> > > > > > > >> > >> > >> > > > > > >> > >> > >> > > > > >> > >> > >> > > > >> > >> > >> > > >> > >> > >> > >> > >> > >> >> > >> > >> >> > >> > >> >> > >> > >> -- >> > >> > >> Cliff Rhyne >> > >> > >> Software Engineering Lead >> > >> > >> e: crh...@signal.co >> > >> > >> signal.co >> > >> > >> ________________________ >> > >> > >> >> > >> > >> Cut Through the Noise >> > >> > >> >> > >> > >> This e-mail and any files transmitted with it are for the sole >> use >> > of >> > >> > the >> > >> > >> intended recipient(s) and may contain confidential and >> privileged >> > >> > >> information. Any unauthorized use of this email is strictly >> > >> prohibited. >> > >> > >> ©2015 Signal. All rights reserved. >> > >> > >> >> > >> > > >> > >> > > >> > >> > >> > >> >> > >> >> > >> >> > >> -- >> > >> Jens Rantil >> > >> Backend engineer >> > >> Tink AB >> > >> >> > >> Email: jens.ran...@tink.se >> > >> Phone: +46 708 84 18 32 >> > >> Web: www.tink.se >> > >> >> > >> Facebook <https://www.facebook.com/#!/tink.se> Linkedin >> > >> < >> > >> >> > >> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary >> > >> > >> > >> Twitter <https://twitter.com/tink> >> > >> >> > > >> > > >> > >> > >