Makes sense Jay.

Mickael, in addition to how we can compute defaults of the other settings
from `buffer.memory`, it would be good to specify what is allowed and how
we handle the different cases (e.g. what do we do if
`max.partition.fetch.bytes`
is greater than `buffer.memory`, is that simply not allowed?).

To summarise the gap between the ideal scenario (user specifies how much
memory the consumer can use) and what is being proposed:

1. We will decompress and deserialize the data for one or more partitions
in order to return them to the user and we don't account for the increased
memory usage resulting from that. This is likely to be significant on a per
record basis, but we try to do it for the minimal number of records
possible within the constraints of the system. Currently the constraints
are: we decompress and deserialize the data for a partition at a time
(default `max.partition.fetch.bytes` is 1MB, but this is a soft limit in
case there are oversized messages) until we have enough records to
satisfy `max.poll.records`
(default 500) or there are no more completed fetches. It seems like this
may be OK for a lot of cases, but some tuning will still be required in
others.

2. We don't account for bookkeeping data structures or intermediate objects
allocated during the general operation of the consumer. Probably something
we have to live with as the cost/benefit of fixing this doesn't seem worth
it.

Ismael

On Tue, Dec 13, 2016 at 8:34 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Ismael,
>
> Yeah I think we are both saying the same thing---removing only works if you
> have a truly optimal strategy. Actually even dynamically computing a
> reasonable default isn't totally obvious (do you set fetch.max.bytes to
> equal buffer.memory to try to queue up as much data in the network buffers?
> Do you try to limit it to your socket.receive.buffer size so that you can
> read it in a single shot?).
>
> Regarding what is being measured, my interpretation was the same as yours.
> I was just adding to the previous point that buffer.memory setting would
> not be a very close proxy for memory usage. Someone was pointing out that
> compression would make this true, and I was just adding that even without
> compression the object overhead would lead to a high expansion factor.
>
> -Jay
>
> On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > Hi Jay,
> >
> > About `max.partition.fetch.bytes`, yes it was an oversight not to lower
> its
> > priority as part of KIP-74 given the existence of `fetch.max.bytes` and
> the
> > fact that we can now make progress in the presence of oversized messages
> > independently of either of those settings.
> >
> > I agree that we should try to set those values automatically based on
> > `buffer.memory`, but I am not sure if we can have a truly optimal
> strategy.
> > So, I'd go with reducing the priority to "low" instead of removing
> > `fetch.max.bytes` and `max.partition.fetch.bytes` altogether for now. If
> > experience in the field tells us that the auto strategy is good enough,
> we
> > can consider removing them (yes, I know, it's unlikely to happen as there
> > won't be that much motivation then).
> >
> > Regarding the "conversion from packed bytes to java objects" comment,
> that
> > raises the question: what are we actually measuring here? From the KIP,
> > it's not too clear. My interpretation was that we were not measuring the
> > memory usage of the Java objects. In that case, `buffer.memory` seems
> like
> > a reasonable name although perhaps the user's expectation is that we
> would
> > measure the memory usage of the Java objects?
> >
> > Ismael
> >
> > On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > I think the question is whether we have a truly optimal strategy for
> > > deriving the partition- and fetch-level configs from the global
> setting.
> > If
> > > we do then we should just get rid of them. If not, then if we can at
> > least
> > > derive usually good and never terrible settings from the global limit
> at
> > > initialization time maybe we can set them automatically unless the user
> > > overrides with an explicit conifg. Even the latter would let us mark it
> > low
> > > priority which at least takes it off the list of things you have to
> grok
> > to
> > > use the consumer which I suspect would be much appreciated by our poor
> > > users.
> > >
> > > Regardless it'd be nice to make sure we get an explanation of the
> > > relationships between the remaining memory configs in the KIP and in
> the
> > > docs.
> > >
> > > I agree that buffer.memory isn't bad.
> > >
> > > -Jay
> > >
> > >
> > > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Yeah, that's a good point. Perhaps in retrospect, it would have been
> > > better
> > > > to define `buffer.memory` first and let `fetch.max.bytes` be based
> off
> > of
> > > > it. I like `buffer.memory` since it gives the consumer nice symmetry
> > with
> > > > the producer and its generic naming gives us some flexibility
> > internally
> > > > with how we use it. We could still do that I guess, if we're willing
> to
> > > > deprecate `fetch.max.bytes` (one release after adding it!).
> > > >
> > > > As for `max.partition.fetch.bytes`, it's noted in KIP-74 that it is
> > still
> > > > useful in Kafka Streams, but I agree it makes sense to lower its
> > priority
> > > > in favor of `fetch.max.bytes`.
> > > >
> > > > -Jason
> > > >
> > > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > Jason, it's not just decompression but also the conversion from
> > packed
> > > > > bytes to java objects, right? That can be even larger than the
> > > > > decompression blow up. I think this may be okay, the problem may
> just
> > > be
> > > > > that the naming is a bit misleading. In the producer you are
> > literally
> > > > > allocating a buffer of that size, so the name buffer.memory makes
> > > sense.
> > > > In
> > > > > this case it is something more like max.bytes.read.per.poll.call
> > > > (terrible
> > > > > name, but maybe something like that?).
> > > > >
> > > > > Mickael, I'd second Jason's request for the default and expand on
> it.
> > > We
> > > > > currently have several consumer-related memory
> > > > > settings--max.partition.fetch.bytes, fetch.max.bytes. I don't
> think
> > it
> > > > is
> > > > > clear today how to set these. For example we mark
> > > > max.partition.fetch.bytes
> > > > > as high importance and fetch.max.bytes as medium, but it seems like
> > it
> > > > > would be the other way around. Can we think this through from the
> > point
> > > > of
> > > > > view of a lazy user? I.e. I have 64MB of space to use for my
> > consumer,
> > > in
> > > > > an ideal world I'd say, "hey consumer here is 64MB go use that as
> > > > > efficiently as possible" and not have to tune a bunch of individual
> > > > things
> > > > > with complex relationships. Maybe one or both of the existing
> > settings
> > > > can
> > > > > either be eliminated or at the least marked as low priority and we
> > can
> > > > > infer a reasonable default from the new config your introducing?
> > > > >
> > > > > -jay
> > > > >
> > > > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Mickael,
> > > > > >
> > > > > > I think the approach looks good, just a few minor questions:
> > > > > >
> > > > > > 1. The KIP doesn't say what the default value of `buffer.memory`
> > will
> > > > be.
> > > > > > Looks like we use 50MB as the default for `fetch.max.bytes`, so
> > > perhaps
> > > > > it
> > > > > > makes sense to set the default based on that. Might also be worth
> > > > > > mentioning somewhere the constraint between the two configs.
> > > > > > 2. To clarify, this limit only affects the uncompressed size of
> the
> > > > > fetched
> > > > > > data, right? The consumer may still exceed it in order to store
> the
> > > > > > decompressed record data. We delay decompression until the
> records
> > > are
> > > > > > returned to the user, but because of max.poll.records, we may end
> > up
> > > > > > holding onto the decompressed data from a single partition for a
> > few
> > > > > > iterations. I think this is fine, but probably worth noting in
> the
> > > KIP.
> > > > > > 3. Is there any risk using the MemoryPool that, after we fill up
> > the
> > > > > memory
> > > > > > with fetch data, we can starve the coordinator's connection?
> > Suppose,
> > > > for
> > > > > > example, that we send a bunch of pre-fetches right before
> returning
> > > to
> > > > > the
> > > > > > user. These fetches might return before the next call to poll(),
> in
> > > > which
> > > > > > case we might not have enough memory to receive heartbeats, which
> > > would
> > > > > > block us from sending additional heartbeats until the next call
> to
> > > > > poll().
> > > > > > Not sure it's a big problem since heartbeats are tiny, but might
> be
> > > > worth
> > > > > > thinking about.
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <
> > > > mickael.mai...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > It's been a few days since the last comments. KIP-72 vote seems
> > to
> > > > > > > have passed so if I don't get any new comments I'll start the
> > vote
> > > on
> > > > > > > Monday.
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai <
> > radai.rosenbl...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > +1 - there's is a need for an effective way to control kafka
> > > memory
> > > > > > > > consumption - both on the broker and on clients.
> > > > > > > > i think we could even reuse the exact same param name -
> > > > > > > *queued.max.bytes *-
> > > > > > > > as it would serve the exact same purpose.
> > > > > > > >
> > > > > > > > also (and again its the same across the broker and clients)
> > this
> > > > > bound
> > > > > > > > should also cover decompression, at some point.
> > > > > > > > the problem with that is that to the best of my knowledge the
> > > > current
> > > > > > > wire
> > > > > > > > protocol does not declare the final, uncompressed size of
> > > anything
> > > > up
> > > > > > > front
> > > > > > > > - all we know is the size of the compressed buffer. this may
> > > > require
> > > > > a
> > > > > > > > format change in the future to properly support?
> > > > > > > >
> > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison <
> > > > > > > mickael.mai...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Thanks for all the replies.
> > > > > > > >>
> > > > > > > >> I've updated the KIP:
> > > > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > > > > > > >> The main point is to selectively read from sockets instead
> of
> > > > > > > >> throttling FetchRequests sends. I also mentioned it will be
> > > > reusing
> > > > > > > >> the MemoryPool implementation created in KIP-72 instead of
> > > adding
> > > > > > > >> another memory tracking method.
> > > > > > > >>
> > > > > > > >> Please have another look. As always, comments are welcome !
> > > > > > > >>
> > > > > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <
> > > > radai.rosenbl...@gmail.com>
> > > > > > > wrote:
> > > > > > > >> > selectively reading from sockets achieves memory control
> (up
> > > to
> > > > > and
> > > > > > > not
> > > > > > > >> > including talk of (de)compression)
> > > > > > > >> >
> > > > > > > >> > this is exactly what i (also, even mostly) did for kip-72
> -
> > > > which
> > > > > i
> > > > > > > hope
> > > > > > > >> in
> > > > > > > >> > itself should be a reason to think about both KIPs at the
> > same
> > > > > time
> > > > > > > >> because
> > > > > > > >> > the changes will be similar (at least in intent) and might
> > > > result
> > > > > in
> > > > > > > >> > duplicated effort.
> > > > > > > >> >
> > > > > > > >> > a pool API is a way to "scale" all the way from just
> > > > maintaining a
> > > > > > > >> variable
> > > > > > > >> > holding amount of available memory (which is what my
> current
> > > > > kip-72
> > > > > > > code
> > > > > > > >> > does and what this kip proposes IIUC) all the way up to
> > > actually
> > > > > > > re-using
> > > > > > > >> > buffers without any changes to the code using the pool -
> > just
> > > > drop
> > > > > > in
> > > > > > > a
> > > > > > > >> > different pool impl.
> > > > > > > >> >
> > > > > > > >> > for "edge nodes" (producer/consumer) the performance gain
> in
> > > > > > actually
> > > > > > > >> > pooling large buffers may be arguable, but i suspect for
> > > brokers
> > > > > > > >> regularly
> > > > > > > >> > operating on 1MB-sized requests (which is the norm at
> > > linkedin)
> > > > > the
> > > > > > > >> > resulting memory fragmentation is an actual bottleneck (i
> > have
> > > > > > initial
> > > > > > > >> > micro-benchmark results to back this up but have not had
> the
> > > > time
> > > > > to
> > > > > > > do a
> > > > > > > >> > full profiling run).
> > > > > > > >> >
> > > > > > > >> > so basically I'm saying we may be doing (very) similar
> > things
> > > in
> > > > > > > mostly
> > > > > > > >> the
> > > > > > > >> > same areas of code.
> > > > > > > >> >
> > > > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison <
> > > > > > > >> mickael.mai...@gmail.com>
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> >> electively reading from the socket should enable to
> > > > > > > >> >> control the memory usage without impacting performance.
> > I've
> > > > had
> > > > > > look
> > > > > > > >> >> at that today and I can see how that would work.
> > > > > > > >> >> I'll update the KIP accordingly tomorrow.
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to