I like the fair-consumption approach you chose - "pull as many records as possible from each partition in a similar round-robin fashion", it is very intuitive and close enough to fair.
Overall, I'm +1 on the KIP. But you'll need a formal vote :) On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson <ja...@confluent.io> wrote: > Thanks for the suggestion, Ismael. I updated the KIP. > > -Jason > > On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > Thanks Jason. I read the KIP and it makes sense to me. A minor > suggestion: > > in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2 > > examples (2 partitions with 100 max.poll.records and 3 partitions with 30 > > max.poll.records). I think you could simplify this by using one of the > > examples in the 3 paragraphs. > > > > Ismael > > > > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > I've updated the KIP with some implementation details. I also added > more > > > discussion on the heartbeat() alternative. The short answer for why we > > > rejected this API is that it doesn't seem to work well with offset > > commits. > > > This would tend to make correct usage complicated and difficult to > > explain. > > > Additionally, we don't see any clear advantages over having a way to > set > > > the max records. For example, using max.records=1 would be equivalent > to > > > invoking heartbeat() on each iteration of the message processing loop. > > > > > > Going back to the discussion on whether we should use a configuration > > value > > > or overload poll(), I'm leaning toward the configuration option mainly > > for > > > compatibility and to keep the KafkaConsumer API from getting any more > > > complex. Also, as others have mentioned, it seems reasonable to want to > > > tune this setting in the same place that the session timeout and > > heartbeat > > > interval are configured. I still feel a little uncomfortable with the > > need > > > to do a lot of configuration tuning to get the consumer working for a > > > particular environment, but hopefully the defaults are conservative > > enough > > > that most users won't need to. However, if it remains a problem, then > we > > > could still look into better options for managing the size of batches > > > including overloading poll() with a max records argument or possibly by > > > implementing a batch scaling algorithm internally. > > > > > > -Jason > > > > > > > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <ja...@confluent.io> > > > wrote: > > > > > > > Hi Cliff, > > > > > > > > I think we're all agreed that the current contract of poll() should > be > > > > kept. The consumer wouldn't wait for max messages to become available > > in > > > > this proposal; it would only sure that it never returns more than max > > > > messages. > > > > > > > > -Jason > > > > > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crh...@signal.co> > wrote: > > > > > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever > messages > > > the > > > >> client has. Either a) I don't care if I get less than my max > message > > > >> limit > > > >> or b) I do care and will set a larger timeout. Case B is less > common > > > than > > > >> A and is fairly easy to handle in the application's code. > > > >> > > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <g...@confluent.io> > > wrote: > > > >> > > > >> > 1. Agree that TCP window style scaling will be cool. I'll try to > > think > > > >> of a > > > >> > good excuse to use it ;) > > > >> > > > > >> > 2. I'm very concerned about the challenges of getting the > timeouts, > > > >> > hearbeats and max messages right. > > > >> > > > > >> > Another option could be to expose "heartbeat" API to consumers. If > > my > > > >> app > > > >> > is still processing data but is still alive, it could initiate a > > > >> heartbeat > > > >> > to signal its alive without having to handle additional messages. > > > >> > > > > >> > I don't know if this improves more than it complicates though :( > > > >> > > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < > > ja...@confluent.io> > > > >> > wrote: > > > >> > > > > >> > > Hey Gwen, > > > >> > > > > > >> > > I was thinking along the lines of TCP window scaling in order to > > > >> > > dynamically find a good consumption rate. Basically you'd start > > off > > > >> > > consuming say 100 records and you'd let it increase until the > > > >> consumption > > > >> > > took longer than half the session timeout (for example). You > > /might/ > > > >> be > > > >> > > able to achieve the same thing using pause/resume, but it would > > be a > > > >> lot > > > >> > > trickier since you have to do it at the granularity of > partitions. > > > But > > > >> > > yeah, database write performance doesn't always scale in a > > > predictable > > > >> > > enough way to accommodate this, so I'm not sure how useful it > > would > > > >> be in > > > >> > > practice. It might also be more difficult to implement since it > > > >> wouldn't > > > >> > be > > > >> > > as clear when to initiate the next fetch. With a static setting, > > the > > > >> > > consumer knows exactly how many records will be returned on the > > next > > > >> call > > > >> > > to poll() and can send fetches accordingly. > > > >> > > > > > >> > > On the other hand, I do feel a little wary of the need to tune > the > > > >> > session > > > >> > > timeout and max messages though since these settings might > depend > > on > > > >> the > > > >> > > environment that the consumer is deployed in. It wouldn't be a > big > > > >> deal > > > >> > if > > > >> > > the impact was relatively minor, but getting them wrong can > cause > > a > > > >> lot > > > >> > of > > > >> > > rebalance churn which could keep the consumer from making any > > > >> progress. > > > >> > > It's not a particularly graceful failure. > > > >> > > > > > >> > > -Jason > > > >> > > > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira < > g...@confluent.io> > > > >> wrote: > > > >> > > > > > >> > > > I can't speak to all use-cases, but for the database one, I > > think > > > >> > > > pause-resume will be necessary in any case, and therefore > > dynamic > > > >> batch > > > >> > > > sizes are not needed. > > > >> > > > > > > >> > > > Databases are really unexpected regarding response times - > load > > > and > > > >> > > locking > > > >> > > > can affect this. I'm not sure there's a good way to know you > are > > > >> going > > > >> > > into > > > >> > > > rebalance hell before it is too late. So if I were writing > code > > > that > > > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch > > size > > > >> (say > > > >> > > 5000 > > > >> > > > records), and basically pause, batch-insert all records, > commit > > > and > > > >> > > resume. > > > >> > > > > > > >> > > > Does that make sense? > > > >> > > > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < > > > >> ja...@confluent.io> > > > >> > > > wrote: > > > >> > > > > > > >> > > > > Gwen and Ismael, > > > >> > > > > > > > >> > > > > I agree the configuration option is probably the way to go, > > but > > > I > > > >> was > > > >> > > > > wondering whether there would be cases where it made sense > to > > > let > > > >> the > > > >> > > > > consumer dynamically set max messages to adjust for > downstream > > > >> > > slowness. > > > >> > > > > For example, if the consumer is writing consumed records to > > > >> another > > > >> > > > > database, and that database is experiencing heavier than > > > expected > > > >> > load, > > > >> > > > > then the consumer could halve its current max messages in > > order > > > to > > > >> > > adapt > > > >> > > > > without risking rebalance hell. It could then increase max > > > >> messages > > > >> > as > > > >> > > > the > > > >> > > > > load on the database decreases. It's basically an easier way > > to > > > >> > handle > > > >> > > > flow > > > >> > > > > control than we provide with pause/resume. > > > >> > > > > > > > >> > > > > -Jason > > > >> > > > > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < > > g...@confluent.io > > > > > > > >> > > wrote: > > > >> > > > > > > > >> > > > > > The wiki you pointed to is no longer maintained and fell > out > > > of > > > >> > sync > > > >> > > > with > > > >> > > > > > the code and protocol. > > > >> > > > > > > > > >> > > > > > You may want to refer to: > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > > >> > > > > > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < > > > >> jens.ran...@tink.se> > > > >> > > > wrote: > > > >> > > > > > > > > >> > > > > > > Hi guys, > > > >> > > > > > > > > > >> > > > > > > I realized I never thanked yall for your input - thanks! > > > >> > > > > > > Jason: I apologize for assuming your stance on the > issue! > > > >> Feels > > > >> > > like > > > >> > > > we > > > >> > > > > > all > > > >> > > > > > > agreed on the solution. +1 > > > >> > > > > > > > > > >> > > > > > > Follow-up: Jason made a point about defining prefetch > and > > > >> > fairness > > > >> > > > > > > behaviour in the KIP. I am now working on putting that > > down > > > in > > > >> > > > writing. > > > >> > > > > > To > > > >> > > > > > > do be able to do this I think I need to understand the > > > current > > > >> > > > prefetch > > > >> > > > > > > behaviour in the new consumer API (0.9) a bit better. > Some > > > >> > specific > > > >> > > > > > > questions: > > > >> > > > > > > > > > >> > > > > > > - How does a specific consumer balance incoming > > messages > > > >> from > > > >> > > > > multiple > > > >> > > > > > > partitions? Is the consumer simply issuing > Multi-Fetch > > > >> > > requests[1] > > > >> > > > > for > > > >> > > > > > > the > > > >> > > > > > > consumed assigned partitions of the relevant topics? > Or > > > is > > > >> the > > > >> > > > > > consumer > > > >> > > > > > > fetching from one partition at a time and balancing > > > between > > > >> > them > > > >> > > > > > > internally? That is, is the responsibility of > partition > > > >> > > balancing > > > >> > > > > (and > > > >> > > > > > > fairness) on the broker side or consumer side? > > > >> > > > > > > - Is the above documented somewhere? > > > >> > > > > > > > > > >> > > > > > > [1] > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka > > > >> > > > > > > , > > > >> > > > > > > see "Multi-Fetch". > > > >> > > > > > > > > > >> > > > > > > Thanks, > > > >> > > > > > > Jens > > > >> > > > > > > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < > > > >> ism...@juma.me.uk> > > > >> > > > > wrote: > > > >> > > > > > > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < > > > >> > g...@confluent.io > > > >> > > > > > > >> > > > > > wrote: > > > >> > > > > > > > > > > >> > > > > > > > > Given the background, it sounds like you'll > generally > > > want > > > >> > each > > > >> > > > > call > > > >> > > > > > to > > > >> > > > > > > > > poll() to return the same number of events (which is > > the > > > >> > number > > > >> > > > you > > > >> > > > > > > > planned > > > >> > > > > > > > > on having enough memory / time for). It also sounds > > like > > > >> > tuning > > > >> > > > the > > > >> > > > > > > > number > > > >> > > > > > > > > of events will be closely tied to tuning the session > > > >> timeout. > > > >> > > > That > > > >> > > > > > is - > > > >> > > > > > > > if > > > >> > > > > > > > > I choose to lower the session timeout for some > > reason, I > > > >> will > > > >> > > > have > > > >> > > > > to > > > >> > > > > > > > > modify the number of records returning too. > > > >> > > > > > > > > > > > >> > > > > > > > > If those assumptions are correct, I think a > > > configuration > > > >> > makes > > > >> > > > > more > > > >> > > > > > > > sense. > > > >> > > > > > > > > 1. We are unlikely to want this parameter to be > change > > > at > > > >> the > > > >> > > > > > lifetime > > > >> > > > > > > of > > > >> > > > > > > > > the consumer > > > >> > > > > > > > > 2. The correct value is tied to another > configuration > > > >> > > parameter, > > > >> > > > so > > > >> > > > > > > they > > > >> > > > > > > > > will be controlled together. > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > I was thinking the same thing. > > > >> > > > > > > > > > > >> > > > > > > > Ismael > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > -- > > > >> > > > > > > Jens Rantil > > > >> > > > > > > Backend engineer > > > >> > > > > > > Tink AB > > > >> > > > > > > > > > >> > > > > > > Email: jens.ran...@tink.se > > > >> > > > > > > Phone: +46 708 84 18 32 > > > >> > > > > > > Web: www.tink.se > > > >> > > > > > > > > > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin > > > >> > > > > > > < > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > > >> > > > > > > > > > > >> > > > > > > Twitter <https://twitter.com/tink> > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> Cliff Rhyne > > > >> Software Engineering Lead > > > >> e: crh...@signal.co > > > >> signal.co > > > >> ________________________ > > > >> > > > >> Cut Through the Noise > > > >> > > > >> This e-mail and any files transmitted with it are for the sole use > of > > > the > > > >> intended recipient(s) and may contain confidential and privileged > > > >> information. Any unauthorized use of this email is strictly > > prohibited. > > > >> ©2015 Signal. All rights reserved. > > > >> > > > > > > > > > > > > > >