Thanks Jason. I read the KIP and it makes sense to me. A minor suggestion: in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2 examples (2 partitions with 100 max.poll.records and 3 partitions with 30 max.poll.records). I think you could simplify this by using one of the examples in the 3 paragraphs.
Ismael On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <ja...@confluent.io> wrote: > I've updated the KIP with some implementation details. I also added more > discussion on the heartbeat() alternative. The short answer for why we > rejected this API is that it doesn't seem to work well with offset commits. > This would tend to make correct usage complicated and difficult to explain. > Additionally, we don't see any clear advantages over having a way to set > the max records. For example, using max.records=1 would be equivalent to > invoking heartbeat() on each iteration of the message processing loop. > > Going back to the discussion on whether we should use a configuration value > or overload poll(), I'm leaning toward the configuration option mainly for > compatibility and to keep the KafkaConsumer API from getting any more > complex. Also, as others have mentioned, it seems reasonable to want to > tune this setting in the same place that the session timeout and heartbeat > interval are configured. I still feel a little uncomfortable with the need > to do a lot of configuration tuning to get the consumer working for a > particular environment, but hopefully the defaults are conservative enough > that most users won't need to. However, if it remains a problem, then we > could still look into better options for managing the size of batches > including overloading poll() with a max records argument or possibly by > implementing a batch scaling algorithm internally. > > -Jason > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hi Cliff, > > > > I think we're all agreed that the current contract of poll() should be > > kept. The consumer wouldn't wait for max messages to become available in > > this proposal; it would only sure that it never returns more than max > > messages. > > > > -Jason > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crh...@signal.co> wrote: > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever messages > the > >> client has. Either a) I don't care if I get less than my max message > >> limit > >> or b) I do care and will set a larger timeout. Case B is less common > than > >> A and is fairly easy to handle in the application's code. > >> > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <g...@confluent.io> wrote: > >> > >> > 1. Agree that TCP window style scaling will be cool. I'll try to think > >> of a > >> > good excuse to use it ;) > >> > > >> > 2. I'm very concerned about the challenges of getting the timeouts, > >> > hearbeats and max messages right. > >> > > >> > Another option could be to expose "heartbeat" API to consumers. If my > >> app > >> > is still processing data but is still alive, it could initiate a > >> heartbeat > >> > to signal its alive without having to handle additional messages. > >> > > >> > I don't know if this improves more than it complicates though :( > >> > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson <ja...@confluent.io> > >> > wrote: > >> > > >> > > Hey Gwen, > >> > > > >> > > I was thinking along the lines of TCP window scaling in order to > >> > > dynamically find a good consumption rate. Basically you'd start off > >> > > consuming say 100 records and you'd let it increase until the > >> consumption > >> > > took longer than half the session timeout (for example). You /might/ > >> be > >> > > able to achieve the same thing using pause/resume, but it would be a > >> lot > >> > > trickier since you have to do it at the granularity of partitions. > But > >> > > yeah, database write performance doesn't always scale in a > predictable > >> > > enough way to accommodate this, so I'm not sure how useful it would > >> be in > >> > > practice. It might also be more difficult to implement since it > >> wouldn't > >> > be > >> > > as clear when to initiate the next fetch. With a static setting, the > >> > > consumer knows exactly how many records will be returned on the next > >> call > >> > > to poll() and can send fetches accordingly. > >> > > > >> > > On the other hand, I do feel a little wary of the need to tune the > >> > session > >> > > timeout and max messages though since these settings might depend on > >> the > >> > > environment that the consumer is deployed in. It wouldn't be a big > >> deal > >> > if > >> > > the impact was relatively minor, but getting them wrong can cause a > >> lot > >> > of > >> > > rebalance churn which could keep the consumer from making any > >> progress. > >> > > It's not a particularly graceful failure. > >> > > > >> > > -Jason > >> > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira <g...@confluent.io> > >> wrote: > >> > > > >> > > > I can't speak to all use-cases, but for the database one, I think > >> > > > pause-resume will be necessary in any case, and therefore dynamic > >> batch > >> > > > sizes are not needed. > >> > > > > >> > > > Databases are really unexpected regarding response times - load > and > >> > > locking > >> > > > can affect this. I'm not sure there's a good way to know you are > >> going > >> > > into > >> > > > rebalance hell before it is too late. So if I were writing code > that > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch size > >> (say > >> > > 5000 > >> > > > records), and basically pause, batch-insert all records, commit > and > >> > > resume. > >> > > > > >> > > > Does that make sense? > >> > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < > >> ja...@confluent.io> > >> > > > wrote: > >> > > > > >> > > > > Gwen and Ismael, > >> > > > > > >> > > > > I agree the configuration option is probably the way to go, but > I > >> was > >> > > > > wondering whether there would be cases where it made sense to > let > >> the > >> > > > > consumer dynamically set max messages to adjust for downstream > >> > > slowness. > >> > > > > For example, if the consumer is writing consumed records to > >> another > >> > > > > database, and that database is experiencing heavier than > expected > >> > load, > >> > > > > then the consumer could halve its current max messages in order > to > >> > > adapt > >> > > > > without risking rebalance hell. It could then increase max > >> messages > >> > as > >> > > > the > >> > > > > load on the database decreases. It's basically an easier way to > >> > handle > >> > > > flow > >> > > > > control than we provide with pause/resume. > >> > > > > > >> > > > > -Jason > >> > > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira <g...@confluent.io > > > >> > > wrote: > >> > > > > > >> > > > > > The wiki you pointed to is no longer maintained and fell out > of > >> > sync > >> > > > with > >> > > > > > the code and protocol. > >> > > > > > > >> > > > > > You may want to refer to: > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > >> > > > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < > >> jens.ran...@tink.se> > >> > > > wrote: > >> > > > > > > >> > > > > > > Hi guys, > >> > > > > > > > >> > > > > > > I realized I never thanked yall for your input - thanks! > >> > > > > > > Jason: I apologize for assuming your stance on the issue! > >> Feels > >> > > like > >> > > > we > >> > > > > > all > >> > > > > > > agreed on the solution. +1 > >> > > > > > > > >> > > > > > > Follow-up: Jason made a point about defining prefetch and > >> > fairness > >> > > > > > > behaviour in the KIP. I am now working on putting that down > in > >> > > > writing. > >> > > > > > To > >> > > > > > > do be able to do this I think I need to understand the > current > >> > > > prefetch > >> > > > > > > behaviour in the new consumer API (0.9) a bit better. Some > >> > specific > >> > > > > > > questions: > >> > > > > > > > >> > > > > > > - How does a specific consumer balance incoming messages > >> from > >> > > > > multiple > >> > > > > > > partitions? Is the consumer simply issuing Multi-Fetch > >> > > requests[1] > >> > > > > for > >> > > > > > > the > >> > > > > > > consumed assigned partitions of the relevant topics? Or > is > >> the > >> > > > > > consumer > >> > > > > > > fetching from one partition at a time and balancing > between > >> > them > >> > > > > > > internally? That is, is the responsibility of partition > >> > > balancing > >> > > > > (and > >> > > > > > > fairness) on the broker side or consumer side? > >> > > > > > > - Is the above documented somewhere? > >> > > > > > > > >> > > > > > > [1] > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka > >> > > > > > > , > >> > > > > > > see "Multi-Fetch". > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > Jens > >> > > > > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < > >> ism...@juma.me.uk> > >> > > > > wrote: > >> > > > > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < > >> > g...@confluent.io > >> > > > > >> > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Given the background, it sounds like you'll generally > want > >> > each > >> > > > > call > >> > > > > > to > >> > > > > > > > > poll() to return the same number of events (which is the > >> > number > >> > > > you > >> > > > > > > > planned > >> > > > > > > > > on having enough memory / time for). It also sounds like > >> > tuning > >> > > > the > >> > > > > > > > number > >> > > > > > > > > of events will be closely tied to tuning the session > >> timeout. > >> > > > That > >> > > > > > is - > >> > > > > > > > if > >> > > > > > > > > I choose to lower the session timeout for some reason, I > >> will > >> > > > have > >> > > > > to > >> > > > > > > > > modify the number of records returning too. > >> > > > > > > > > > >> > > > > > > > > If those assumptions are correct, I think a > configuration > >> > makes > >> > > > > more > >> > > > > > > > sense. > >> > > > > > > > > 1. We are unlikely to want this parameter to be change > at > >> the > >> > > > > > lifetime > >> > > > > > > of > >> > > > > > > > > the consumer > >> > > > > > > > > 2. The correct value is tied to another configuration > >> > > parameter, > >> > > > so > >> > > > > > > they > >> > > > > > > > > will be controlled together. > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > I was thinking the same thing. > >> > > > > > > > > >> > > > > > > > Ismael > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > -- > >> > > > > > > Jens Rantil > >> > > > > > > Backend engineer > >> > > > > > > Tink AB > >> > > > > > > > >> > > > > > > Email: jens.ran...@tink.se > >> > > > > > > Phone: +46 708 84 18 32 > >> > > > > > > Web: www.tink.se > >> > > > > > > > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin > >> > > > > > > < > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > >> > > > > > > > > >> > > > > > > Twitter <https://twitter.com/tink> > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > >> -- > >> Cliff Rhyne > >> Software Engineering Lead > >> e: crh...@signal.co > >> signal.co > >> ________________________ > >> > >> Cut Through the Noise > >> > >> This e-mail and any files transmitted with it are for the sole use of > the > >> intended recipient(s) and may contain confidential and privileged > >> information. Any unauthorized use of this email is strictly prohibited. > >> ©2015 Signal. All rights reserved. > >> > > > > >