@Jason, (apologies, got your name wrong the first time round) On Thu, Jan 7, 2016 at 5:15 PM, Aarti Gupta <aartigup...@gmail.com> wrote:
> Hi Json, > > I am concerned about how many records can be prefetched into consumer > memory. > Currently we control the maximum number of bytes per topic and partition > by setting fetch.message.max.bytes > > The max.partition.fetch.bytes = #no of partitions * > fetch.message.max.bytes > However, partitions can be added dynamically, which would mean that a > single process (for example a single JVM with multiple consumers), that > consumes messages from large number of partitions may not able to keep all > the pre fetched messages in memory. > > Additionally, if the relative size of messages is highly variable, it > would be hard to correlate the max size in bytes for message fetch with the > number of records returned on a poll. > We previously observed (in a production setup), that, if the size of the > message is greater than fetch.message.max.bytes, the consumer gets stuck. > This encouraged us to increase the fetch.message.max.bytes to a > significantly large value. This would worsen the memory consumption fear > described above,( when the number of partitions is also large.) > > While there may not be a single magic formula to predict the correct > combination of fetch.message.max.bytes and #*max.poll.records, **maybe we > can make the prefetch algorithm a mathematical function of the > f*etch.message.max.bytes > and #noofpartitions? > > thoughts? > Thanks > aarti > > additional unimportant note: the link to the JIRA in the KIP is broken > > On Thu, Jan 7, 2016 at 2:37 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Thanks Jason. I think it is a good feature to add, +1. >> >> As suggested in KIP-32, we'd better to keep end state of the KIP wiki with >> finalized implementation details rather than leaving a list of options. I >> agree that for both fairness and pre-fetching the simpler approach would >> be >> sufficient for most of the time. So could we move the other approach to >> "rejected"? >> >> Guozhang >> >> On Wed, Jan 6, 2016 at 6:14 PM, Gwen Shapira <g...@confluent.io> wrote: >> >> > I like the fair-consumption approach you chose - "pull as many records >> as >> > possible from each partition in a similar round-robin fashion", it is >> very >> > intuitive and close enough to fair. >> > >> > Overall, I'm +1 on the KIP. But you'll need a formal vote :) >> > >> > On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson <ja...@confluent.io> >> > wrote: >> > >> > > Thanks for the suggestion, Ismael. I updated the KIP. >> > > >> > > -Jason >> > > >> > > On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma <ism...@juma.me.uk> >> wrote: >> > > >> > > > Thanks Jason. I read the KIP and it makes sense to me. A minor >> > > suggestion: >> > > > in the "Ensuring Fair Consumption" section, there are 3 paragraphs >> > with 2 >> > > > examples (2 partitions with 100 max.poll.records and 3 partitions >> with >> > 30 >> > > > max.poll.records). I think you could simplify this by using one of >> the >> > > > examples in the 3 paragraphs. >> > > > >> > > > Ismael >> > > > >> > > > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <ja...@confluent.io >> > >> > > > wrote: >> > > > >> > > > > I've updated the KIP with some implementation details. I also >> added >> > > more >> > > > > discussion on the heartbeat() alternative. The short answer for >> why >> > we >> > > > > rejected this API is that it doesn't seem to work well with offset >> > > > commits. >> > > > > This would tend to make correct usage complicated and difficult to >> > > > explain. >> > > > > Additionally, we don't see any clear advantages over having a way >> to >> > > set >> > > > > the max records. For example, using max.records=1 would be >> equivalent >> > > to >> > > > > invoking heartbeat() on each iteration of the message processing >> > loop. >> > > > > >> > > > > Going back to the discussion on whether we should use a >> configuration >> > > > value >> > > > > or overload poll(), I'm leaning toward the configuration option >> > mainly >> > > > for >> > > > > compatibility and to keep the KafkaConsumer API from getting any >> more >> > > > > complex. Also, as others have mentioned, it seems reasonable to >> want >> > to >> > > > > tune this setting in the same place that the session timeout and >> > > > heartbeat >> > > > > interval are configured. I still feel a little uncomfortable with >> the >> > > > need >> > > > > to do a lot of configuration tuning to get the consumer working >> for a >> > > > > particular environment, but hopefully the defaults are >> conservative >> > > > enough >> > > > > that most users won't need to. However, if it remains a problem, >> then >> > > we >> > > > > could still look into better options for managing the size of >> batches >> > > > > including overloading poll() with a max records argument or >> possibly >> > by >> > > > > implementing a batch scaling algorithm internally. >> > > > > >> > > > > -Jason >> > > > > >> > > > > >> > > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson < >> ja...@confluent.io >> > > >> > > > > wrote: >> > > > > >> > > > > > Hi Cliff, >> > > > > > >> > > > > > I think we're all agreed that the current contract of poll() >> should >> > > be >> > > > > > kept. The consumer wouldn't wait for max messages to become >> > available >> > > > in >> > > > > > this proposal; it would only sure that it never returns more >> than >> > max >> > > > > > messages. >> > > > > > >> > > > > > -Jason >> > > > > > >> > > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crh...@signal.co> >> > > wrote: >> > > > > > >> > > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever >> > > messages >> > > > > the >> > > > > >> client has. Either a) I don't care if I get less than my max >> > > message >> > > > > >> limit >> > > > > >> or b) I do care and will set a larger timeout. Case B is less >> > > common >> > > > > than >> > > > > >> A and is fairly easy to handle in the application's code. >> > > > > >> >> > > > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira < >> g...@confluent.io> >> > > > wrote: >> > > > > >> >> > > > > >> > 1. Agree that TCP window style scaling will be cool. I'll >> try to >> > > > think >> > > > > >> of a >> > > > > >> > good excuse to use it ;) >> > > > > >> > >> > > > > >> > 2. I'm very concerned about the challenges of getting the >> > > timeouts, >> > > > > >> > hearbeats and max messages right. >> > > > > >> > >> > > > > >> > Another option could be to expose "heartbeat" API to >> consumers. >> > If >> > > > my >> > > > > >> app >> > > > > >> > is still processing data but is still alive, it could >> initiate a >> > > > > >> heartbeat >> > > > > >> > to signal its alive without having to handle additional >> > messages. >> > > > > >> > >> > > > > >> > I don't know if this improves more than it complicates >> though :( >> > > > > >> > >> > > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson < >> > > > ja...@confluent.io> >> > > > > >> > wrote: >> > > > > >> > >> > > > > >> > > Hey Gwen, >> > > > > >> > > >> > > > > >> > > I was thinking along the lines of TCP window scaling in >> order >> > to >> > > > > >> > > dynamically find a good consumption rate. Basically you'd >> > start >> > > > off >> > > > > >> > > consuming say 100 records and you'd let it increase until >> the >> > > > > >> consumption >> > > > > >> > > took longer than half the session timeout (for example). >> You >> > > > /might/ >> > > > > >> be >> > > > > >> > > able to achieve the same thing using pause/resume, but it >> > would >> > > > be a >> > > > > >> lot >> > > > > >> > > trickier since you have to do it at the granularity of >> > > partitions. >> > > > > But >> > > > > >> > > yeah, database write performance doesn't always scale in a >> > > > > predictable >> > > > > >> > > enough way to accommodate this, so I'm not sure how useful >> it >> > > > would >> > > > > >> be in >> > > > > >> > > practice. It might also be more difficult to implement >> since >> > it >> > > > > >> wouldn't >> > > > > >> > be >> > > > > >> > > as clear when to initiate the next fetch. With a static >> > setting, >> > > > the >> > > > > >> > > consumer knows exactly how many records will be returned on >> > the >> > > > next >> > > > > >> call >> > > > > >> > > to poll() and can send fetches accordingly. >> > > > > >> > > >> > > > > >> > > On the other hand, I do feel a little wary of the need to >> tune >> > > the >> > > > > >> > session >> > > > > >> > > timeout and max messages though since these settings might >> > > depend >> > > > on >> > > > > >> the >> > > > > >> > > environment that the consumer is deployed in. It wouldn't >> be a >> > > big >> > > > > >> deal >> > > > > >> > if >> > > > > >> > > the impact was relatively minor, but getting them wrong can >> > > cause >> > > > a >> > > > > >> lot >> > > > > >> > of >> > > > > >> > > rebalance churn which could keep the consumer from making >> any >> > > > > >> progress. >> > > > > >> > > It's not a particularly graceful failure. >> > > > > >> > > >> > > > > >> > > -Jason >> > > > > >> > > >> > > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira < >> > > g...@confluent.io> >> > > > > >> wrote: >> > > > > >> > > >> > > > > >> > > > I can't speak to all use-cases, but for the database >> one, I >> > > > think >> > > > > >> > > > pause-resume will be necessary in any case, and therefore >> > > > dynamic >> > > > > >> batch >> > > > > >> > > > sizes are not needed. >> > > > > >> > > > >> > > > > >> > > > Databases are really unexpected regarding response times >> - >> > > load >> > > > > and >> > > > > >> > > locking >> > > > > >> > > > can affect this. I'm not sure there's a good way to know >> you >> > > are >> > > > > >> going >> > > > > >> > > into >> > > > > >> > > > rebalance hell before it is too late. So if I were >> writing >> > > code >> > > > > that >> > > > > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable >> batch >> > > > size >> > > > > >> (say >> > > > > >> > > 5000 >> > > > > >> > > > records), and basically pause, batch-insert all records, >> > > commit >> > > > > and >> > > > > >> > > resume. >> > > > > >> > > > >> > > > > >> > > > Does that make sense? >> > > > > >> > > > >> > > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson < >> > > > > >> ja...@confluent.io> >> > > > > >> > > > wrote: >> > > > > >> > > > >> > > > > >> > > > > Gwen and Ismael, >> > > > > >> > > > > >> > > > > >> > > > > I agree the configuration option is probably the way to >> > go, >> > > > but >> > > > > I >> > > > > >> was >> > > > > >> > > > > wondering whether there would be cases where it made >> sense >> > > to >> > > > > let >> > > > > >> the >> > > > > >> > > > > consumer dynamically set max messages to adjust for >> > > downstream >> > > > > >> > > slowness. >> > > > > >> > > > > For example, if the consumer is writing consumed >> records >> > to >> > > > > >> another >> > > > > >> > > > > database, and that database is experiencing heavier >> than >> > > > > expected >> > > > > >> > load, >> > > > > >> > > > > then the consumer could halve its current max messages >> in >> > > > order >> > > > > to >> > > > > >> > > adapt >> > > > > >> > > > > without risking rebalance hell. It could then increase >> max >> > > > > >> messages >> > > > > >> > as >> > > > > >> > > > the >> > > > > >> > > > > load on the database decreases. It's basically an >> easier >> > way >> > > > to >> > > > > >> > handle >> > > > > >> > > > flow >> > > > > >> > > > > control than we provide with pause/resume. >> > > > > >> > > > > >> > > > > >> > > > > -Jason >> > > > > >> > > > > >> > > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira < >> > > > g...@confluent.io >> > > > > > >> > > > > >> > > wrote: >> > > > > >> > > > > >> > > > > >> > > > > > The wiki you pointed to is no longer maintained and >> fell >> > > out >> > > > > of >> > > > > >> > sync >> > > > > >> > > > with >> > > > > >> > > > > > the code and protocol. >> > > > > >> > > > > > >> > > > > >> > > > > > You may want to refer to: >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol >> > > > > >> > > > > > >> > > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil < >> > > > > >> jens.ran...@tink.se> >> > > > > >> > > > wrote: >> > > > > >> > > > > > >> > > > > >> > > > > > > Hi guys, >> > > > > >> > > > > > > >> > > > > >> > > > > > > I realized I never thanked yall for your input - >> > thanks! >> > > > > >> > > > > > > Jason: I apologize for assuming your stance on the >> > > issue! >> > > > > >> Feels >> > > > > >> > > like >> > > > > >> > > > we >> > > > > >> > > > > > all >> > > > > >> > > > > > > agreed on the solution. +1 >> > > > > >> > > > > > > >> > > > > >> > > > > > > Follow-up: Jason made a point about defining >> prefetch >> > > and >> > > > > >> > fairness >> > > > > >> > > > > > > behaviour in the KIP. I am now working on putting >> that >> > > > down >> > > > > in >> > > > > >> > > > writing. >> > > > > >> > > > > > To >> > > > > >> > > > > > > do be able to do this I think I need to understand >> the >> > > > > current >> > > > > >> > > > prefetch >> > > > > >> > > > > > > behaviour in the new consumer API (0.9) a bit >> better. >> > > Some >> > > > > >> > specific >> > > > > >> > > > > > > questions: >> > > > > >> > > > > > > >> > > > > >> > > > > > > - How does a specific consumer balance incoming >> > > > messages >> > > > > >> from >> > > > > >> > > > > multiple >> > > > > >> > > > > > > partitions? Is the consumer simply issuing >> > > Multi-Fetch >> > > > > >> > > requests[1] >> > > > > >> > > > > for >> > > > > >> > > > > > > the >> > > > > >> > > > > > > consumed assigned partitions of the relevant >> > topics? >> > > Or >> > > > > is >> > > > > >> the >> > > > > >> > > > > > consumer >> > > > > >> > > > > > > fetching from one partition at a time and >> balancing >> > > > > between >> > > > > >> > them >> > > > > >> > > > > > > internally? That is, is the responsibility of >> > > partition >> > > > > >> > > balancing >> > > > > >> > > > > (and >> > > > > >> > > > > > > fairness) on the broker side or consumer side? >> > > > > >> > > > > > > - Is the above documented somewhere? >> > > > > >> > > > > > > >> > > > > >> > > > > > > [1] >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka >> > > > > >> > > > > > > , >> > > > > >> > > > > > > see "Multi-Fetch". >> > > > > >> > > > > > > >> > > > > >> > > > > > > Thanks, >> > > > > >> > > > > > > Jens >> > > > > >> > > > > > > >> > > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma < >> > > > > >> ism...@juma.me.uk> >> > > > > >> > > > > wrote: >> > > > > >> > > > > > > >> > > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira < >> > > > > >> > g...@confluent.io >> > > > > >> > > > >> > > > > >> > > > > > wrote: >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > > Given the background, it sounds like you'll >> > > generally >> > > > > want >> > > > > >> > each >> > > > > >> > > > > call >> > > > > >> > > > > > to >> > > > > >> > > > > > > > > poll() to return the same number of events >> (which >> > is >> > > > the >> > > > > >> > number >> > > > > >> > > > you >> > > > > >> > > > > > > > planned >> > > > > >> > > > > > > > > on having enough memory / time for). It also >> > sounds >> > > > like >> > > > > >> > tuning >> > > > > >> > > > the >> > > > > >> > > > > > > > number >> > > > > >> > > > > > > > > of events will be closely tied to tuning the >> > session >> > > > > >> timeout. >> > > > > >> > > > That >> > > > > >> > > > > > is - >> > > > > >> > > > > > > > if >> > > > > >> > > > > > > > > I choose to lower the session timeout for some >> > > > reason, I >> > > > > >> will >> > > > > >> > > > have >> > > > > >> > > > > to >> > > > > >> > > > > > > > > modify the number of records returning too. >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > If those assumptions are correct, I think a >> > > > > configuration >> > > > > >> > makes >> > > > > >> > > > > more >> > > > > >> > > > > > > > sense. >> > > > > >> > > > > > > > > 1. We are unlikely to want this parameter to be >> > > change >> > > > > at >> > > > > >> the >> > > > > >> > > > > > lifetime >> > > > > >> > > > > > > of >> > > > > >> > > > > > > > > the consumer >> > > > > >> > > > > > > > > 2. The correct value is tied to another >> > > configuration >> > > > > >> > > parameter, >> > > > > >> > > > so >> > > > > >> > > > > > > they >> > > > > >> > > > > > > > > will be controlled together. >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > I was thinking the same thing. >> > > > > >> > > > > > > > >> > > > > >> > > > > > > > Ismael >> > > > > >> > > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > -- >> > > > > >> > > > > > > Jens Rantil >> > > > > >> > > > > > > Backend engineer >> > > > > >> > > > > > > Tink AB >> > > > > >> > > > > > > >> > > > > >> > > > > > > Email: jens.ran...@tink.se >> > > > > >> > > > > > > Phone: +46 708 84 18 32 >> > > > > >> > > > > > > Web: www.tink.se >> > > > > >> > > > > > > >> > > > > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se> >> > Linkedin >> > > > > >> > > > > > > < >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > >> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary >> > > > > >> > > > > > > > >> > > > > >> > > > > > > Twitter <https://twitter.com/tink> >> > > > > >> > > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> -- >> > > > > >> Cliff Rhyne >> > > > > >> Software Engineering Lead >> > > > > >> e: crh...@signal.co >> > > > > >> signal.co >> > > > > >> ________________________ >> > > > > >> >> > > > > >> Cut Through the Noise >> > > > > >> >> > > > > >> This e-mail and any files transmitted with it are for the sole >> use >> > > of >> > > > > the >> > > > > >> intended recipient(s) and may contain confidential and >> privileged >> > > > > >> information. Any unauthorized use of this email is strictly >> > > > prohibited. >> > > > > >> ©2015 Signal. All rights reserved. >> > > > > >> >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >