Yeah, that's a good point. Perhaps in retrospect, it would have been better to define `buffer.memory` first and let `fetch.max.bytes` be based off of it. I like `buffer.memory` since it gives the consumer nice symmetry with the producer and its generic naming gives us some flexibility internally with how we use it. We could still do that I guess, if we're willing to deprecate `fetch.max.bytes` (one release after adding it!).
As for `max.partition.fetch.bytes`, it's noted in KIP-74 that it is still useful in Kafka Streams, but I agree it makes sense to lower its priority in favor of `fetch.max.bytes`. -Jason On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io> wrote: > Jason, it's not just decompression but also the conversion from packed > bytes to java objects, right? That can be even larger than the > decompression blow up. I think this may be okay, the problem may just be > that the naming is a bit misleading. In the producer you are literally > allocating a buffer of that size, so the name buffer.memory makes sense. In > this case it is something more like max.bytes.read.per.poll.call (terrible > name, but maybe something like that?). > > Mickael, I'd second Jason's request for the default and expand on it. We > currently have several consumer-related memory > settings--max.partition.fetch.bytes, fetch.max.bytes. I don't think it is > clear today how to set these. For example we mark max.partition.fetch.bytes > as high importance and fetch.max.bytes as medium, but it seems like it > would be the other way around. Can we think this through from the point of > view of a lazy user? I.e. I have 64MB of space to use for my consumer, in > an ideal world I'd say, "hey consumer here is 64MB go use that as > efficiently as possible" and not have to tune a bunch of individual things > with complex relationships. Maybe one or both of the existing settings can > either be eliminated or at the least marked as low priority and we can > infer a reasonable default from the new config your introducing? > > -jay > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hi Mickael, > > > > I think the approach looks good, just a few minor questions: > > > > 1. The KIP doesn't say what the default value of `buffer.memory` will be. > > Looks like we use 50MB as the default for `fetch.max.bytes`, so perhaps > it > > makes sense to set the default based on that. Might also be worth > > mentioning somewhere the constraint between the two configs. > > 2. To clarify, this limit only affects the uncompressed size of the > fetched > > data, right? The consumer may still exceed it in order to store the > > decompressed record data. We delay decompression until the records are > > returned to the user, but because of max.poll.records, we may end up > > holding onto the decompressed data from a single partition for a few > > iterations. I think this is fine, but probably worth noting in the KIP. > > 3. Is there any risk using the MemoryPool that, after we fill up the > memory > > with fetch data, we can starve the coordinator's connection? Suppose, for > > example, that we send a bunch of pre-fetches right before returning to > the > > user. These fetches might return before the next call to poll(), in which > > case we might not have enough memory to receive heartbeats, which would > > block us from sending additional heartbeats until the next call to > poll(). > > Not sure it's a big problem since heartbeats are tiny, but might be worth > > thinking about. > > > > Thanks, > > Jason > > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <mickael.mai...@gmail.com > > > > wrote: > > > > > It's been a few days since the last comments. KIP-72 vote seems to > > > have passed so if I don't get any new comments I'll start the vote on > > > Monday. > > > Thanks > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai <radai.rosenbl...@gmail.com> > > wrote: > > > > +1 - there's is a need for an effective way to control kafka memory > > > > consumption - both on the broker and on clients. > > > > i think we could even reuse the exact same param name - > > > *queued.max.bytes *- > > > > as it would serve the exact same purpose. > > > > > > > > also (and again its the same across the broker and clients) this > bound > > > > should also cover decompression, at some point. > > > > the problem with that is that to the best of my knowledge the current > > > wire > > > > protocol does not declare the final, uncompressed size of anything up > > > front > > > > - all we know is the size of the compressed buffer. this may require > a > > > > format change in the future to properly support? > > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison < > > > mickael.mai...@gmail.com> > > > > wrote: > > > > > > > >> Thanks for all the replies. > > > >> > > > >> I've updated the KIP: > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer > > > >> The main point is to selectively read from sockets instead of > > > >> throttling FetchRequests sends. I also mentioned it will be reusing > > > >> the MemoryPool implementation created in KIP-72 instead of adding > > > >> another memory tracking method. > > > >> > > > >> Please have another look. As always, comments are welcome ! > > > >> > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <radai.rosenbl...@gmail.com> > > > wrote: > > > >> > selectively reading from sockets achieves memory control (up to > and > > > not > > > >> > including talk of (de)compression) > > > >> > > > > >> > this is exactly what i (also, even mostly) did for kip-72 - which > i > > > hope > > > >> in > > > >> > itself should be a reason to think about both KIPs at the same > time > > > >> because > > > >> > the changes will be similar (at least in intent) and might result > in > > > >> > duplicated effort. > > > >> > > > > >> > a pool API is a way to "scale" all the way from just maintaining a > > > >> variable > > > >> > holding amount of available memory (which is what my current > kip-72 > > > code > > > >> > does and what this kip proposes IIUC) all the way up to actually > > > re-using > > > >> > buffers without any changes to the code using the pool - just drop > > in > > > a > > > >> > different pool impl. > > > >> > > > > >> > for "edge nodes" (producer/consumer) the performance gain in > > actually > > > >> > pooling large buffers may be arguable, but i suspect for brokers > > > >> regularly > > > >> > operating on 1MB-sized requests (which is the norm at linkedin) > the > > > >> > resulting memory fragmentation is an actual bottleneck (i have > > initial > > > >> > micro-benchmark results to back this up but have not had the time > to > > > do a > > > >> > full profiling run). > > > >> > > > > >> > so basically I'm saying we may be doing (very) similar things in > > > mostly > > > >> the > > > >> > same areas of code. > > > >> > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison < > > > >> mickael.mai...@gmail.com> > > > >> > wrote: > > > >> > > > > >> >> electively reading from the socket should enable to > > > >> >> control the memory usage without impacting performance. I've had > > look > > > >> >> at that today and I can see how that would work. > > > >> >> I'll update the KIP accordingly tomorrow. > > > >> >> > > > >> > > > > > >