Coordinator starvation: For an implementation based on KIP-72, there will be coordinator starvation without KAFKA-4137 since you would stop reading from sockets when the memory pool is full (the fact that coordinator messages are small doesn't help). I imagine you can work around this by treating coordinator connections as special connections but that spills over to common network code. Separate NetworkClient for coordinator proposed in KAFKA-4137 would be much better.
On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison <mickael.mai...@gmail.com> wrote: > Thanks for all the feedback. > > I've updated the KIP with all the details. > Below are a few of the main points: > > - Overall memory usage of the consumer: > I made it clear the memory pool is only used to store the raw bytes > from the network and that the decompressed/deserialized messages are > not stored in it but as extra memory on the heap. In addition, the > consumer also keeps track of other things (in flight requests, > subscriptions, etc..) that account for extra memory as well. So this > is not a hard bound memory constraint but should still allow to > roughly size how much memory can be used. > > - Relation with the existing settings: > There are already 2 settings that deal with memory usage of the > consumer. I suggest we lower the priority of > `max.partition.fetch.bytes` (I wonder if we should attempt to > deprecate it or increase its default value so it's a contraint less > likely to be hit) and have the new setting `buffer.memory` as High. > I'm a bit unsure what's the best default value for `buffer.memory`, I > suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd appreciate > feedback. It should always at least be equal to `max.fetch.bytes`. > > - Configuration name `buffer.memory`: > I think it's the name that makes the most sense. It's aligned with the > producer and as mentioned generic enough to allow future changes if > needed. > > - Coordination starvation: > Yes this is a potential issue. I'd expect these requests to be small > enough to not be affected too much. If that's the case KAFKA-4137 > suggests a possible fix. > > > > On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > Makes sense Jay. > > > > Mickael, in addition to how we can compute defaults of the other settings > > from `buffer.memory`, it would be good to specify what is allowed and how > > we handle the different cases (e.g. what do we do if > > `max.partition.fetch.bytes` > > is greater than `buffer.memory`, is that simply not allowed?). > > > > To summarise the gap between the ideal scenario (user specifies how much > > memory the consumer can use) and what is being proposed: > > > > 1. We will decompress and deserialize the data for one or more partitions > > in order to return them to the user and we don't account for the > increased > > memory usage resulting from that. This is likely to be significant on a > per > > record basis, but we try to do it for the minimal number of records > > possible within the constraints of the system. Currently the constraints > > are: we decompress and deserialize the data for a partition at a time > > (default `max.partition.fetch.bytes` is 1MB, but this is a soft limit in > > case there are oversized messages) until we have enough records to > > satisfy `max.poll.records` > > (default 500) or there are no more completed fetches. It seems like this > > may be OK for a lot of cases, but some tuning will still be required in > > others. > > > > 2. We don't account for bookkeeping data structures or intermediate > objects > > allocated during the general operation of the consumer. Probably > something > > we have to live with as the cost/benefit of fixing this doesn't seem > worth > > it. > > > > Ismael > > > > On Tue, Dec 13, 2016 at 8:34 AM, Jay Kreps <j...@confluent.io> wrote: > > > >> Hey Ismael, > >> > >> Yeah I think we are both saying the same thing---removing only works if > you > >> have a truly optimal strategy. Actually even dynamically computing a > >> reasonable default isn't totally obvious (do you set fetch.max.bytes to > >> equal buffer.memory to try to queue up as much data in the network > buffers? > >> Do you try to limit it to your socket.receive.buffer size so that you > can > >> read it in a single shot?). > >> > >> Regarding what is being measured, my interpretation was the same as > yours. > >> I was just adding to the previous point that buffer.memory setting would > >> not be a very close proxy for memory usage. Someone was pointing out > that > >> compression would make this true, and I was just adding that even > without > >> compression the object overhead would lead to a high expansion factor. > >> > >> -Jay > >> > >> On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma <ism...@juma.me.uk> > wrote: > >> > >> > Hi Jay, > >> > > >> > About `max.partition.fetch.bytes`, yes it was an oversight not to > lower > >> its > >> > priority as part of KIP-74 given the existence of `fetch.max.bytes` > and > >> the > >> > fact that we can now make progress in the presence of oversized > messages > >> > independently of either of those settings. > >> > > >> > I agree that we should try to set those values automatically based on > >> > `buffer.memory`, but I am not sure if we can have a truly optimal > >> strategy. > >> > So, I'd go with reducing the priority to "low" instead of removing > >> > `fetch.max.bytes` and `max.partition.fetch.bytes` altogether for now. > If > >> > experience in the field tells us that the auto strategy is good > enough, > >> we > >> > can consider removing them (yes, I know, it's unlikely to happen as > there > >> > won't be that much motivation then). > >> > > >> > Regarding the "conversion from packed bytes to java objects" comment, > >> that > >> > raises the question: what are we actually measuring here? From the > KIP, > >> > it's not too clear. My interpretation was that we were not measuring > the > >> > memory usage of the Java objects. In that case, `buffer.memory` seems > >> like > >> > a reasonable name although perhaps the user's expectation is that we > >> would > >> > measure the memory usage of the Java objects? > >> > > >> > Ismael > >> > > >> > On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps <j...@confluent.io> wrote: > >> > > >> > > I think the question is whether we have a truly optimal strategy for > >> > > deriving the partition- and fetch-level configs from the global > >> setting. > >> > If > >> > > we do then we should just get rid of them. If not, then if we can at > >> > least > >> > > derive usually good and never terrible settings from the global > limit > >> at > >> > > initialization time maybe we can set them automatically unless the > user > >> > > overrides with an explicit conifg. Even the latter would let us > mark it > >> > low > >> > > priority which at least takes it off the list of things you have to > >> grok > >> > to > >> > > use the consumer which I suspect would be much appreciated by our > poor > >> > > users. > >> > > > >> > > Regardless it'd be nice to make sure we get an explanation of the > >> > > relationships between the remaining memory configs in the KIP and in > >> the > >> > > docs. > >> > > > >> > > I agree that buffer.memory isn't bad. > >> > > > >> > > -Jay > >> > > > >> > > > >> > > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson < > ja...@confluent.io> > >> > > wrote: > >> > > > >> > > > Yeah, that's a good point. Perhaps in retrospect, it would have > been > >> > > better > >> > > > to define `buffer.memory` first and let `fetch.max.bytes` be based > >> off > >> > of > >> > > > it. I like `buffer.memory` since it gives the consumer nice > symmetry > >> > with > >> > > > the producer and its generic naming gives us some flexibility > >> > internally > >> > > > with how we use it. We could still do that I guess, if we're > willing > >> to > >> > > > deprecate `fetch.max.bytes` (one release after adding it!). > >> > > > > >> > > > As for `max.partition.fetch.bytes`, it's noted in KIP-74 that it > is > >> > still > >> > > > useful in Kafka Streams, but I agree it makes sense to lower its > >> > priority > >> > > > in favor of `fetch.max.bytes`. > >> > > > > >> > > > -Jason > >> > > > > >> > > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io> > wrote: > >> > > > > >> > > > > Jason, it's not just decompression but also the conversion from > >> > packed > >> > > > > bytes to java objects, right? That can be even larger than the > >> > > > > decompression blow up. I think this may be okay, the problem may > >> just > >> > > be > >> > > > > that the naming is a bit misleading. In the producer you are > >> > literally > >> > > > > allocating a buffer of that size, so the name buffer.memory > makes > >> > > sense. > >> > > > In > >> > > > > this case it is something more like max.bytes.read.per.poll.call > >> > > > (terrible > >> > > > > name, but maybe something like that?). > >> > > > > > >> > > > > Mickael, I'd second Jason's request for the default and expand > on > >> it. > >> > > We > >> > > > > currently have several consumer-related memory > >> > > > > settings--max.partition.fetch.bytes, fetch.max.bytes. I don't > >> think > >> > it > >> > > > is > >> > > > > clear today how to set these. For example we mark > >> > > > max.partition.fetch.bytes > >> > > > > as high importance and fetch.max.bytes as medium, but it seems > like > >> > it > >> > > > > would be the other way around. Can we think this through from > the > >> > point > >> > > > of > >> > > > > view of a lazy user? I.e. I have 64MB of space to use for my > >> > consumer, > >> > > in > >> > > > > an ideal world I'd say, "hey consumer here is 64MB go use that > as > >> > > > > efficiently as possible" and not have to tune a bunch of > individual > >> > > > things > >> > > > > with complex relationships. Maybe one or both of the existing > >> > settings > >> > > > can > >> > > > > either be eliminated or at the least marked as low priority and > we > >> > can > >> > > > > infer a reasonable default from the new config your introducing? > >> > > > > > >> > > > > -jay > >> > > > > > >> > > > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson < > >> ja...@confluent.io> > >> > > > > wrote: > >> > > > > > >> > > > > > Hi Mickael, > >> > > > > > > >> > > > > > I think the approach looks good, just a few minor questions: > >> > > > > > > >> > > > > > 1. The KIP doesn't say what the default value of > `buffer.memory` > >> > will > >> > > > be. > >> > > > > > Looks like we use 50MB as the default for `fetch.max.bytes`, > so > >> > > perhaps > >> > > > > it > >> > > > > > makes sense to set the default based on that. Might also be > worth > >> > > > > > mentioning somewhere the constraint between the two configs. > >> > > > > > 2. To clarify, this limit only affects the uncompressed size > of > >> the > >> > > > > fetched > >> > > > > > data, right? The consumer may still exceed it in order to > store > >> the > >> > > > > > decompressed record data. We delay decompression until the > >> records > >> > > are > >> > > > > > returned to the user, but because of max.poll.records, we may > end > >> > up > >> > > > > > holding onto the decompressed data from a single partition > for a > >> > few > >> > > > > > iterations. I think this is fine, but probably worth noting in > >> the > >> > > KIP. > >> > > > > > 3. Is there any risk using the MemoryPool that, after we fill > up > >> > the > >> > > > > memory > >> > > > > > with fetch data, we can starve the coordinator's connection? > >> > Suppose, > >> > > > for > >> > > > > > example, that we send a bunch of pre-fetches right before > >> returning > >> > > to > >> > > > > the > >> > > > > > user. These fetches might return before the next call to > poll(), > >> in > >> > > > which > >> > > > > > case we might not have enough memory to receive heartbeats, > which > >> > > would > >> > > > > > block us from sending additional heartbeats until the next > call > >> to > >> > > > > poll(). > >> > > > > > Not sure it's a big problem since heartbeats are tiny, but > might > >> be > >> > > > worth > >> > > > > > thinking about. > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Jason > >> > > > > > > >> > > > > > > >> > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison < > >> > > > mickael.mai...@gmail.com > >> > > > > > > >> > > > > > wrote: > >> > > > > > > >> > > > > > > It's been a few days since the last comments. KIP-72 vote > seems > >> > to > >> > > > > > > have passed so if I don't get any new comments I'll start > the > >> > vote > >> > > on > >> > > > > > > Monday. > >> > > > > > > Thanks > >> > > > > > > > >> > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai < > >> > radai.rosenbl...@gmail.com > >> > > > > >> > > > > > wrote: > >> > > > > > > > +1 - there's is a need for an effective way to control > kafka > >> > > memory > >> > > > > > > > consumption - both on the broker and on clients. > >> > > > > > > > i think we could even reuse the exact same param name - > >> > > > > > > *queued.max.bytes *- > >> > > > > > > > as it would serve the exact same purpose. > >> > > > > > > > > >> > > > > > > > also (and again its the same across the broker and > clients) > >> > this > >> > > > > bound > >> > > > > > > > should also cover decompression, at some point. > >> > > > > > > > the problem with that is that to the best of my knowledge > the > >> > > > current > >> > > > > > > wire > >> > > > > > > > protocol does not declare the final, uncompressed size of > >> > > anything > >> > > > up > >> > > > > > > front > >> > > > > > > > - all we know is the size of the compressed buffer. this > may > >> > > > require > >> > > > > a > >> > > > > > > > format change in the future to properly support? > >> > > > > > > > > >> > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison < > >> > > > > > > mickael.mai...@gmail.com> > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > >> Thanks for all the replies. > >> > > > > > > >> > >> > > > > > > >> I've updated the KIP: > >> > > > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > > > > > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer > >> > > > > > > >> The main point is to selectively read from sockets > instead > >> of > >> > > > > > > >> throttling FetchRequests sends. I also mentioned it will > be > >> > > > reusing > >> > > > > > > >> the MemoryPool implementation created in KIP-72 instead > of > >> > > adding > >> > > > > > > >> another memory tracking method. > >> > > > > > > >> > >> > > > > > > >> Please have another look. As always, comments are > welcome ! > >> > > > > > > >> > >> > > > > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai < > >> > > > radai.rosenbl...@gmail.com> > >> > > > > > > wrote: > >> > > > > > > >> > selectively reading from sockets achieves memory > control > >> (up > >> > > to > >> > > > > and > >> > > > > > > not > >> > > > > > > >> > including talk of (de)compression) > >> > > > > > > >> > > >> > > > > > > >> > this is exactly what i (also, even mostly) did for > kip-72 > >> - > >> > > > which > >> > > > > i > >> > > > > > > hope > >> > > > > > > >> in > >> > > > > > > >> > itself should be a reason to think about both KIPs at > the > >> > same > >> > > > > time > >> > > > > > > >> because > >> > > > > > > >> > the changes will be similar (at least in intent) and > might > >> > > > result > >> > > > > in > >> > > > > > > >> > duplicated effort. > >> > > > > > > >> > > >> > > > > > > >> > a pool API is a way to "scale" all the way from just > >> > > > maintaining a > >> > > > > > > >> variable > >> > > > > > > >> > holding amount of available memory (which is what my > >> current > >> > > > > kip-72 > >> > > > > > > code > >> > > > > > > >> > does and what this kip proposes IIUC) all the way up to > >> > > actually > >> > > > > > > re-using > >> > > > > > > >> > buffers without any changes to the code using the pool > - > >> > just > >> > > > drop > >> > > > > > in > >> > > > > > > a > >> > > > > > > >> > different pool impl. > >> > > > > > > >> > > >> > > > > > > >> > for "edge nodes" (producer/consumer) the performance > gain > >> in > >> > > > > > actually > >> > > > > > > >> > pooling large buffers may be arguable, but i suspect > for > >> > > brokers > >> > > > > > > >> regularly > >> > > > > > > >> > operating on 1MB-sized requests (which is the norm at > >> > > linkedin) > >> > > > > the > >> > > > > > > >> > resulting memory fragmentation is an actual bottleneck > (i > >> > have > >> > > > > > initial > >> > > > > > > >> > micro-benchmark results to back this up but have not > had > >> the > >> > > > time > >> > > > > to > >> > > > > > > do a > >> > > > > > > >> > full profiling run). > >> > > > > > > >> > > >> > > > > > > >> > so basically I'm saying we may be doing (very) similar > >> > things > >> > > in > >> > > > > > > mostly > >> > > > > > > >> the > >> > > > > > > >> > same areas of code. > >> > > > > > > >> > > >> > > > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison < > >> > > > > > > >> mickael.mai...@gmail.com> > >> > > > > > > >> > wrote: > >> > > > > > > >> > > >> > > > > > > >> >> electively reading from the socket should enable to > >> > > > > > > >> >> control the memory usage without impacting > performance. > >> > I've > >> > > > had > >> > > > > > look > >> > > > > > > >> >> at that today and I can see how that would work. > >> > > > > > > >> >> I'll update the KIP accordingly tomorrow. > >> > > > > > > >> >> > >> > > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > -- Regards, Rajini