Yeah, that's a good point. Perhaps in retrospect, it would have been better
to define `buffer.memory` first and let `fetch.max.bytes` be based off of
it. I like `buffer.memory` since it gives the consumer nice symmetry with
the producer and its generic naming gives us some flexibility internally
with how we use it. We could still do that I guess, if we're willing to
deprecate `fetch.max.bytes` (one release after adding it!).

As for `max.partition.fetch.bytes`, it's noted in KIP-74 that it is still
useful in Kafka Streams, but I agree it makes sense to lower its priority
in favor of `fetch.max.bytes`.

-Jason

On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io> wrote:

> Jason, it's not just decompression but also the conversion from packed
> bytes to java objects, right? That can be even larger than the
> decompression blow up. I think this may be okay, the problem may just be
> that the naming is a bit misleading. In the producer you are literally
> allocating a buffer of that size, so the name buffer.memory makes sense. In
> this case it is something more like max.bytes.read.per.poll.call (terrible
> name, but maybe something like that?).
>
> Mickael, I'd second Jason's request for the default and expand on it. We
> currently have several consumer-related memory
> settings--max.partition.fetch.bytes, fetch.max.bytes. I don't think it is
> clear today how to set these. For example we mark max.partition.fetch.bytes
> as high importance and fetch.max.bytes as medium, but it seems like it
> would be the other way around. Can we think this through from the point of
> view of a lazy user? I.e. I have 64MB of space to use for my consumer, in
> an ideal world I'd say, "hey consumer here is 64MB go use that as
> efficiently as possible" and not have to tune a bunch of individual things
> with complex relationships. Maybe one or both of the existing settings can
> either be eliminated or at the least marked as low priority and we can
> infer a reasonable default from the new config your introducing?
>
> -jay
>
> On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Mickael,
> >
> > I think the approach looks good, just a few minor questions:
> >
> > 1. The KIP doesn't say what the default value of `buffer.memory` will be.
> > Looks like we use 50MB as the default for `fetch.max.bytes`, so perhaps
> it
> > makes sense to set the default based on that. Might also be worth
> > mentioning somewhere the constraint between the two configs.
> > 2. To clarify, this limit only affects the uncompressed size of the
> fetched
> > data, right? The consumer may still exceed it in order to store the
> > decompressed record data. We delay decompression until the records are
> > returned to the user, but because of max.poll.records, we may end up
> > holding onto the decompressed data from a single partition for a few
> > iterations. I think this is fine, but probably worth noting in the KIP.
> > 3. Is there any risk using the MemoryPool that, after we fill up the
> memory
> > with fetch data, we can starve the coordinator's connection? Suppose, for
> > example, that we send a bunch of pre-fetches right before returning to
> the
> > user. These fetches might return before the next call to poll(), in which
> > case we might not have enough memory to receive heartbeats, which would
> > block us from sending additional heartbeats until the next call to
> poll().
> > Not sure it's a big problem since heartbeats are tiny, but might be worth
> > thinking about.
> >
> > Thanks,
> > Jason
> >
> >
> > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <mickael.mai...@gmail.com
> >
> > wrote:
> >
> > > It's been a few days since the last comments. KIP-72 vote seems to
> > > have passed so if I don't get any new comments I'll start the vote on
> > > Monday.
> > > Thanks
> > >
> > > On Mon, Nov 14, 2016 at 6:25 PM, radai <radai.rosenbl...@gmail.com>
> > wrote:
> > > > +1 - there's is a need for an effective way to control kafka memory
> > > > consumption - both on the broker and on clients.
> > > > i think we could even reuse the exact same param name -
> > > *queued.max.bytes *-
> > > > as it would serve the exact same purpose.
> > > >
> > > > also (and again its the same across the broker and clients) this
> bound
> > > > should also cover decompression, at some point.
> > > > the problem with that is that to the best of my knowledge the current
> > > wire
> > > > protocol does not declare the final, uncompressed size of anything up
> > > front
> > > > - all we know is the size of the compressed buffer. this may require
> a
> > > > format change in the future to properly support?
> > > >
> > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison <
> > > mickael.mai...@gmail.com>
> > > > wrote:
> > > >
> > > >> Thanks for all the replies.
> > > >>
> > > >> I've updated the KIP:
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > >> The main point is to selectively read from sockets instead of
> > > >> throttling FetchRequests sends. I also mentioned it will be reusing
> > > >> the MemoryPool implementation created in KIP-72 instead of adding
> > > >> another memory tracking method.
> > > >>
> > > >> Please have another look. As always, comments are welcome !
> > > >>
> > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <radai.rosenbl...@gmail.com>
> > > wrote:
> > > >> > selectively reading from sockets achieves memory control (up to
> and
> > > not
> > > >> > including talk of (de)compression)
> > > >> >
> > > >> > this is exactly what i (also, even mostly) did for kip-72 - which
> i
> > > hope
> > > >> in
> > > >> > itself should be a reason to think about both KIPs at the same
> time
> > > >> because
> > > >> > the changes will be similar (at least in intent) and might result
> in
> > > >> > duplicated effort.
> > > >> >
> > > >> > a pool API is a way to "scale" all the way from just maintaining a
> > > >> variable
> > > >> > holding amount of available memory (which is what my current
> kip-72
> > > code
> > > >> > does and what this kip proposes IIUC) all the way up to actually
> > > re-using
> > > >> > buffers without any changes to the code using the pool - just drop
> > in
> > > a
> > > >> > different pool impl.
> > > >> >
> > > >> > for "edge nodes" (producer/consumer) the performance gain in
> > actually
> > > >> > pooling large buffers may be arguable, but i suspect for brokers
> > > >> regularly
> > > >> > operating on 1MB-sized requests (which is the norm at linkedin)
> the
> > > >> > resulting memory fragmentation is an actual bottleneck (i have
> > initial
> > > >> > micro-benchmark results to back this up but have not had the time
> to
> > > do a
> > > >> > full profiling run).
> > > >> >
> > > >> > so basically I'm saying we may be doing (very) similar things in
> > > mostly
> > > >> the
> > > >> > same areas of code.
> > > >> >
> > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison <
> > > >> mickael.mai...@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> >> electively reading from the socket should enable to
> > > >> >> control the memory usage without impacting performance. I've had
> > look
> > > >> >> at that today and I can see how that would work.
> > > >> >> I'll update the KIP accordingly tomorrow.
> > > >> >>
> > > >>
> > >
> >
>

Reply via email to