Rajini,

Why do you think we don't want to do the same for brokers ?
It feels like brokers would be affected the same way and could end up
delaying group/hearbeat requests.

Also given queued.max.requests it seems unlikely that small requests
(<<1Kb) being allocated outside of the memory pool would cause OOM
exceptions


On Wed, Dec 14, 2016 at 12:29 PM, Rajini Sivaram <rsiva...@pivotal.io> wrote:
> Edo,
>
> I wouldn't introduce a new config entry, especially since you don't need it
> after KAFKA-4137. As a temporary measure that would work for consumers. But
> you probably don't want to do the same for brokers - will be worth checking
> with Radai since the implementation will be based on KIP-72. To do this
> only for consumers, you will need some conditions in the common network
> code while allocating and releasing buffers. A bit messy, but doable.
>
>
>
> On Wed, Dec 14, 2016 at 11:32 AM, Edoardo Comar <eco...@uk.ibm.com> wrote:
>
>> Thanks Rajini,
>> Before Kafka-4137, we could avoid coordinator starvation without making a
>> special case for a special connection,
>> but rather simply, in applying the buffer.memory check only to 'large'
>> responses
>> (e.g.  size > 1k, possibly introducing a new config entry) in
>>
>> NetworkReceive.readFromReadableChannel(ReadableByteChannel)
>>
>> Essentially this would limit reading fetch responses but allow for other
>> responses to be processed.
>>
>> This is a sample of sizes for responses I collected :
>>
>> ***** size=108 APIKEY=3 METADATA
>> *****  size=28 APIKEY=10 GROUP_COORDINATOR
>> *****  size=193 APIKEY=11 JOIN_GROUP
>> *****  size=39 APIKEY=14 SYNC_GROUP
>> *****  size=39 APIKEY=9 OFFSET_FETCH
>> *****  size=45 APIKEY=2 LIST_OFFSETS
>> *****  size=88926 APIKEY=1 FETCH
>> *****  size=45 APIKEY=1 FETCH
>> *****  size=6 APIKEY=12 HEARTBEAT
>> *****  size=45 APIKEY=1 FETCH
>> *****  size=45 APIKEY=1 FETCH
>> *****  size=45 APIKEY=1 FETCH
>> *****  size=6 APIKEY=12 HEARTBEAT
>> *****  size=45 APIKEY=1 FETCH
>> *****  size=45 APIKEY=1 FETCH
>>
>> What do you think?
>> --------------------------------------------------
>> Edoardo Comar
>> IBM MessageHub
>> eco...@uk.ibm.com
>> IBM UK Ltd, Hursley Park, SO21 2JN
>>
>> IBM United Kingdom Limited Registered in England and Wales with number
>> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
>> 3AU
>>
>>
>>
>> From:   Rajini Sivaram <rajinisiva...@googlemail.com>
>> To:     dev@kafka.apache.org
>> Date:   13/12/2016 17:27
>> Subject:        Re: [DISCUSS] KIP-81: Max in-flight fetches
>>
>>
>>
>> Coordinator starvation: For an implementation based on KIP-72, there will
>> be coordinator starvation without KAFKA-4137 since you would stop reading
>> from sockets when the memory pool is full (the fact that coordinator
>> messages are small doesn't help). I imagine you can work around this by
>> treating coordinator connections as special connections but that spills
>> over to common network code. Separate NetworkClient for coordinator
>> proposed in KAFKA-4137 would be much better.
>>
>> On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison <mickael.mai...@gmail.com>
>> wrote:
>>
>> > Thanks for all the feedback.
>> >
>> > I've updated the KIP with all the details.
>> > Below are a few of the main points:
>> >
>> > - Overall memory usage of the consumer:
>> > I made it clear the memory pool is only used to store the raw bytes
>> > from the network and that the decompressed/deserialized messages are
>> > not stored in it but as extra memory on the heap. In addition, the
>> > consumer also keeps track of other things (in flight requests,
>> > subscriptions, etc..) that account for extra memory as well. So this
>> > is not a hard bound memory constraint but should still allow to
>> > roughly size how much memory can be used.
>> >
>> > - Relation with the existing settings:
>> > There are already 2 settings that deal with memory usage of the
>> > consumer. I suggest we lower the priority of
>> > `max.partition.fetch.bytes` (I wonder if we should attempt to
>> > deprecate it or increase its default value so it's a contraint less
>> > likely to be hit) and have the new setting `buffer.memory` as High.
>> > I'm a bit unsure what's the best default value for `buffer.memory`, I
>> > suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd appreciate
>> > feedback. It should always at least be equal to `max.fetch.bytes`.
>> >
>> > - Configuration name `buffer.memory`:
>> > I think it's the name that makes the most sense. It's aligned with the
>> > producer and as mentioned generic enough to allow future changes if
>> > needed.
>> >
>> > - Coordination starvation:
>> > Yes this is a potential issue. I'd expect these requests to be small
>> > enough to not be affected too much. If that's the case KAFKA-4137
>> > suggests a possible fix.
>> >
>> >
>> >
>> > On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>> > > Makes sense Jay.
>> > >
>> > > Mickael, in addition to how we can compute defaults of the other
>> settings
>> > > from `buffer.memory`, it would be good to specify what is allowed and
>> how
>> > > we handle the different cases (e.g. what do we do if
>> > > `max.partition.fetch.bytes`
>> > > is greater than `buffer.memory`, is that simply not allowed?).
>> > >
>> > > To summarise the gap between the ideal scenario (user specifies how
>> much
>> > > memory the consumer can use) and what is being proposed:
>> > >
>> > > 1. We will decompress and deserialize the data for one or more
>> partitions
>> > > in order to return them to the user and we don't account for the
>> > increased
>> > > memory usage resulting from that. This is likely to be significant on
>> a
>> > per
>> > > record basis, but we try to do it for the minimal number of records
>> > > possible within the constraints of the system. Currently the
>> constraints
>> > > are: we decompress and deserialize the data for a partition at a time
>> > > (default `max.partition.fetch.bytes` is 1MB, but this is a soft limit
>> in
>> > > case there are oversized messages) until we have enough records to
>> > > satisfy `max.poll.records`
>> > > (default 500) or there are no more completed fetches. It seems like
>> this
>> > > may be OK for a lot of cases, but some tuning will still be required
>> in
>> > > others.
>> > >
>> > > 2. We don't account for bookkeeping data structures or intermediate
>> > objects
>> > > allocated during the general operation of the consumer. Probably
>> > something
>> > > we have to live with as the cost/benefit of fixing this doesn't seem
>> > worth
>> > > it.
>> > >
>> > > Ismael
>> > >
>> > > On Tue, Dec 13, 2016 at 8:34 AM, Jay Kreps <j...@confluent.io> wrote:
>> > >
>> > >> Hey Ismael,
>> > >>
>> > >> Yeah I think we are both saying the same thing---removing only works
>> if
>> > you
>> > >> have a truly optimal strategy. Actually even dynamically computing a
>> > >> reasonable default isn't totally obvious (do you set fetch.max.bytes
>> to
>> > >> equal buffer.memory to try to queue up as much data in the network
>> > buffers?
>> > >> Do you try to limit it to your socket.receive.buffer size so that you
>> > can
>> > >> read it in a single shot?).
>> > >>
>> > >> Regarding what is being measured, my interpretation was the same as
>> > yours.
>> > >> I was just adding to the previous point that buffer.memory setting
>> would
>> > >> not be a very close proxy for memory usage. Someone was pointing out
>> > that
>> > >> compression would make this true, and I was just adding that even
>> > without
>> > >> compression the object overhead would lead to a high expansion
>> factor.
>> > >>
>> > >> -Jay
>> > >>
>> > >> On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma <ism...@juma.me.uk>
>> > wrote:
>> > >>
>> > >> > Hi Jay,
>> > >> >
>> > >> > About `max.partition.fetch.bytes`, yes it was an oversight not to
>> > lower
>> > >> its
>> > >> > priority as part of KIP-74 given the existence of `fetch.max.bytes`
>> > and
>> > >> the
>> > >> > fact that we can now make progress in the presence of oversized
>> > messages
>> > >> > independently of either of those settings.
>> > >> >
>> > >> > I agree that we should try to set those values automatically based
>> on
>> > >> > `buffer.memory`, but I am not sure if we can have a truly optimal
>> > >> strategy.
>> > >> > So, I'd go with reducing the priority to "low" instead of removing
>> > >> > `fetch.max.bytes` and `max.partition.fetch.bytes` altogether for
>> now.
>> > If
>> > >> > experience in the field tells us that the auto strategy is good
>> > enough,
>> > >> we
>> > >> > can consider removing them (yes, I know, it's unlikely to happen as
>> > there
>> > >> > won't be that much motivation then).
>> > >> >
>> > >> > Regarding the "conversion from packed bytes to java objects"
>> comment,
>> > >> that
>> > >> > raises the question: what are we actually measuring here? From the
>> > KIP,
>> > >> > it's not too clear. My interpretation was that we were not
>> measuring
>> > the
>> > >> > memory usage of the Java objects. In that case, `buffer.memory`
>> seems
>> > >> like
>> > >> > a reasonable name although perhaps the user's expectation is that
>> we
>> > >> would
>> > >> > measure the memory usage of the Java objects?
>> > >> >
>> > >> > Ismael
>> > >> >
>> > >> > On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps <j...@confluent.io>
>> wrote:
>> > >> >
>> > >> > > I think the question is whether we have a truly optimal strategy
>> for
>> > >> > > deriving the partition- and fetch-level configs from the global
>> > >> setting.
>> > >> > If
>> > >> > > we do then we should just get rid of them. If not, then if we can
>> at
>> > >> > least
>> > >> > > derive usually good and never terrible settings from the global
>> > limit
>> > >> at
>> > >> > > initialization time maybe we can set them automatically unless
>> the
>> > user
>> > >> > > overrides with an explicit conifg. Even the latter would let us
>> > mark it
>> > >> > low
>> > >> > > priority which at least takes it off the list of things you have
>> to
>> > >> grok
>> > >> > to
>> > >> > > use the consumer which I suspect would be much appreciated by our
>> > poor
>> > >> > > users.
>> > >> > >
>> > >> > > Regardless it'd be nice to make sure we get an explanation of the
>> > >> > > relationships between the remaining memory configs in the KIP and
>> in
>> > >> the
>> > >> > > docs.
>> > >> > >
>> > >> > > I agree that buffer.memory isn't bad.
>> > >> > >
>> > >> > > -Jay
>> > >> > >
>> > >> > >
>> > >> > > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <
>> > ja...@confluent.io>
>> > >> > > wrote:
>> > >> > >
>> > >> > > > Yeah, that's a good point. Perhaps in retrospect, it would have
>> > been
>> > >> > > better
>> > >> > > > to define `buffer.memory` first and let `fetch.max.bytes` be
>> based
>> > >> off
>> > >> > of
>> > >> > > > it. I like `buffer.memory` since it gives the consumer nice
>> > symmetry
>> > >> > with
>> > >> > > > the producer and its generic naming gives us some flexibility
>> > >> > internally
>> > >> > > > with how we use it. We could still do that I guess, if we're
>> > willing
>> > >> to
>> > >> > > > deprecate `fetch.max.bytes` (one release after adding it!).
>> > >> > > >
>> > >> > > > As for `max.partition.fetch.bytes`, it's noted in KIP-74 that
>> it
>> > is
>> > >> > still
>> > >> > > > useful in Kafka Streams, but I agree it makes sense to lower
>> its
>> > >> > priority
>> > >> > > > in favor of `fetch.max.bytes`.
>> > >> > > >
>> > >> > > > -Jason
>> > >> > > >
>> > >> > > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io>
>> > wrote:
>> > >> > > >
>> > >> > > > > Jason, it's not just decompression but also the conversion
>> from
>> > >> > packed
>> > >> > > > > bytes to java objects, right? That can be even larger than
>> the
>> > >> > > > > decompression blow up. I think this may be okay, the problem
>> may
>> > >> just
>> > >> > > be
>> > >> > > > > that the naming is a bit misleading. In the producer you are
>> > >> > literally
>> > >> > > > > allocating a buffer of that size, so the name buffer.memory
>> > makes
>> > >> > > sense.
>> > >> > > > In
>> > >> > > > > this case it is something more like
>> max.bytes.read.per.poll.call
>> > >> > > > (terrible
>> > >> > > > > name, but maybe something like that?).
>> > >> > > > >
>> > >> > > > > Mickael, I'd second Jason's request for the default and
>> expand
>> > on
>> > >> it.
>> > >> > > We
>> > >> > > > > currently have several consumer-related memory
>> > >> > > > > settings--max.partition.fetch.bytes, fetch.max.bytes. I don't
>> > >> think
>> > >> > it
>> > >> > > > is
>> > >> > > > > clear today how to set these. For example we mark
>> > >> > > > max.partition.fetch.bytes
>> > >> > > > > as high importance and fetch.max.bytes as medium, but it
>> seems
>> > like
>> > >> > it
>> > >> > > > > would be the other way around. Can we think this through from
>> > the
>> > >> > point
>> > >> > > > of
>> > >> > > > > view of a lazy user? I.e. I have 64MB of space to use for my
>> > >> > consumer,
>> > >> > > in
>> > >> > > > > an ideal world I'd say, "hey consumer here is 64MB go use
>> that
>> > as
>> > >> > > > > efficiently as possible" and not have to tune a bunch of
>> > individual
>> > >> > > > things
>> > >> > > > > with complex relationships. Maybe one or both of the existing
>> > >> > settings
>> > >> > > > can
>> > >> > > > > either be eliminated or at the least marked as low priority
>> and
>> > we
>> > >> > can
>> > >> > > > > infer a reasonable default from the new config your
>> introducing?
>> > >> > > > >
>> > >> > > > > -jay
>> > >> > > > >
>> > >> > > > > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <
>> > >> ja...@confluent.io>
>> > >> > > > > wrote:
>> > >> > > > >
>> > >> > > > > > Hi Mickael,
>> > >> > > > > >
>> > >> > > > > > I think the approach looks good, just a few minor
>> questions:
>> > >> > > > > >
>> > >> > > > > > 1. The KIP doesn't say what the default value of
>> > `buffer.memory`
>> > >> > will
>> > >> > > > be.
>> > >> > > > > > Looks like we use 50MB as the default for
>> `fetch.max.bytes`,
>> > so
>> > >> > > perhaps
>> > >> > > > > it
>> > >> > > > > > makes sense to set the default based on that. Might also be
>> > worth
>> > >> > > > > > mentioning somewhere the constraint between the two
>> configs.
>> > >> > > > > > 2. To clarify, this limit only affects the uncompressed
>> size
>> > of
>> > >> the
>> > >> > > > > fetched
>> > >> > > > > > data, right? The consumer may still exceed it in order to
>> > store
>> > >> the
>> > >> > > > > > decompressed record data. We delay decompression until the
>> > >> records
>> > >> > > are
>> > >> > > > > > returned to the user, but because of max.poll.records, we
>> may
>> > end
>> > >> > up
>> > >> > > > > > holding onto the decompressed data from a single partition
>> > for a
>> > >> > few
>> > >> > > > > > iterations. I think this is fine, but probably worth noting
>> in
>> > >> the
>> > >> > > KIP.
>> > >> > > > > > 3. Is there any risk using the MemoryPool that, after we
>> fill
>> > up
>> > >> > the
>> > >> > > > > memory
>> > >> > > > > > with fetch data, we can starve the coordinator's
>> connection?
>> > >> > Suppose,
>> > >> > > > for
>> > >> > > > > > example, that we send a bunch of pre-fetches right before
>> > >> returning
>> > >> > > to
>> > >> > > > > the
>> > >> > > > > > user. These fetches might return before the next call to
>> > poll(),
>> > >> in
>> > >> > > > which
>> > >> > > > > > case we might not have enough memory to receive heartbeats,
>> > which
>> > >> > > would
>> > >> > > > > > block us from sending additional heartbeats until the next
>> > call
>> > >> to
>> > >> > > > > poll().
>> > >> > > > > > Not sure it's a big problem since heartbeats are tiny, but
>> > might
>> > >> be
>> > >> > > > worth
>> > >> > > > > > thinking about.
>> > >> > > > > >
>> > >> > > > > > Thanks,
>> > >> > > > > > Jason
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison <
>> > >> > > > mickael.mai...@gmail.com
>> > >> > > > > >
>> > >> > > > > > wrote:
>> > >> > > > > >
>> > >> > > > > > > It's been a few days since the last comments. KIP-72 vote
>> > seems
>> > >> > to
>> > >> > > > > > > have passed so if I don't get any new comments I'll start
>> > the
>> > >> > vote
>> > >> > > on
>> > >> > > > > > > Monday.
>> > >> > > > > > > Thanks
>> > >> > > > > > >
>> > >> > > > > > > On Mon, Nov 14, 2016 at 6:25 PM, radai <
>> > >> > radai.rosenbl...@gmail.com
>> > >> > > >
>> > >> > > > > > wrote:
>> > >> > > > > > > > +1 - there's is a need for an effective way to control
>> > kafka
>> > >> > > memory
>> > >> > > > > > > > consumption - both on the broker and on clients.
>> > >> > > > > > > > i think we could even reuse the exact same param name -
>> > >> > > > > > > *queued.max.bytes *-
>> > >> > > > > > > > as it would serve the exact same purpose.
>> > >> > > > > > > >
>> > >> > > > > > > > also (and again its the same across the broker and
>> > clients)
>> > >> > this
>> > >> > > > > bound
>> > >> > > > > > > > should also cover decompression, at some point.
>> > >> > > > > > > > the problem with that is that to the best of my
>> knowledge
>> > the
>> > >> > > > current
>> > >> > > > > > > wire
>> > >> > > > > > > > protocol does not declare the final, uncompressed size
>> of
>> > >> > > anything
>> > >> > > > up
>> > >> > > > > > > front
>> > >> > > > > > > > - all we know is the size of the compressed buffer.
>> this
>> > may
>> > >> > > > require
>> > >> > > > > a
>> > >> > > > > > > > format change in the future to properly support?
>> > >> > > > > > > >
>> > >> > > > > > > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison <
>> > >> > > > > > > mickael.mai...@gmail.com>
>> > >> > > > > > > > wrote:
>> > >> > > > > > > >
>> > >> > > > > > > >> Thanks for all the replies.
>> > >> > > > > > > >>
>> > >> > > > > > > >> I've updated the KIP:
>> > >> > > > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >> > > > > > > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> > >> > > > > > > >> The main point is to selectively read from sockets
>> > instead
>> > >> of
>> > >> > > > > > > >> throttling FetchRequests sends. I also mentioned it
>> will
>> > be
>> > >> > > > reusing
>> > >> > > > > > > >> the MemoryPool implementation created in KIP-72
>> instead
>> > of
>> > >> > > adding
>> > >> > > > > > > >> another memory tracking method.
>> > >> > > > > > > >>
>> > >> > > > > > > >> Please have another look. As always, comments are
>> > welcome !
>> > >> > > > > > > >>
>> > >> > > > > > > >> On Thu, Nov 10, 2016 at 2:47 AM, radai <
>> > >> > > > radai.rosenbl...@gmail.com>
>> > >> > > > > > > wrote:
>> > >> > > > > > > >> > selectively reading from sockets achieves memory
>> > control
>> > >> (up
>> > >> > > to
>> > >> > > > > and
>> > >> > > > > > > not
>> > >> > > > > > > >> > including talk of (de)compression)
>> > >> > > > > > > >> >
>> > >> > > > > > > >> > this is exactly what i (also, even mostly) did for
>> > kip-72
>> > >> -
>> > >> > > > which
>> > >> > > > > i
>> > >> > > > > > > hope
>> > >> > > > > > > >> in
>> > >> > > > > > > >> > itself should be a reason to think about both KIPs
>> at
>> > the
>> > >> > same
>> > >> > > > > time
>> > >> > > > > > > >> because
>> > >> > > > > > > >> > the changes will be similar (at least in intent) and
>> > might
>> > >> > > > result
>> > >> > > > > in
>> > >> > > > > > > >> > duplicated effort.
>> > >> > > > > > > >> >
>> > >> > > > > > > >> > a pool API is a way to "scale" all the way from just
>> > >> > > > maintaining a
>> > >> > > > > > > >> variable
>> > >> > > > > > > >> > holding amount of available memory (which is what my
>> > >> current
>> > >> > > > > kip-72
>> > >> > > > > > > code
>> > >> > > > > > > >> > does and what this kip proposes IIUC) all the way up
>> to
>> > >> > > actually
>> > >> > > > > > > re-using
>> > >> > > > > > > >> > buffers without any changes to the code using the
>> pool
>> > -
>> > >> > just
>> > >> > > > drop
>> > >> > > > > > in
>> > >> > > > > > > a
>> > >> > > > > > > >> > different pool impl.
>> > >> > > > > > > >> >
>> > >> > > > > > > >> > for "edge nodes" (producer/consumer) the performance
>> > gain
>> > >> in
>> > >> > > > > > actually
>> > >> > > > > > > >> > pooling large buffers may be arguable, but i suspect
>> > for
>> > >> > > brokers
>> > >> > > > > > > >> regularly
>> > >> > > > > > > >> > operating on 1MB-sized requests (which is the norm
>> at
>> > >> > > linkedin)
>> > >> > > > > the
>> > >> > > > > > > >> > resulting memory fragmentation is an actual
>> bottleneck
>> > (i
>> > >> > have
>> > >> > > > > > initial
>> > >> > > > > > > >> > micro-benchmark results to back this up but have not
>> > had
>> > >> the
>> > >> > > > time
>> > >> > > > > to
>> > >> > > > > > > do a
>> > >> > > > > > > >> > full profiling run).
>> > >> > > > > > > >> >
>> > >> > > > > > > >> > so basically I'm saying we may be doing (very)
>> similar
>> > >> > things
>> > >> > > in
>> > >> > > > > > > mostly
>> > >> > > > > > > >> the
>> > >> > > > > > > >> > same areas of code.
>> > >> > > > > > > >> >
>> > >> > > > > > > >> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison <
>> > >> > > > > > > >> mickael.mai...@gmail.com>
>> > >> > > > > > > >> > wrote:
>> > >> > > > > > > >> >
>> > >> > > > > > > >> >> electively reading from the socket should enable to
>> > >> > > > > > > >> >> control the memory usage without impacting
>> > performance.
>> > >> > I've
>> > >> > > > had
>> > >> > > > > > look
>> > >> > > > > > > >> >> at that today and I can see how that would work.
>> > >> > > > > > > >> >> I'll update the KIP accordingly tomorrow.
>> > >> > > > > > > >> >>
>> > >> > > > > > > >>
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>>
>>
>>
>> --
>> Regards,
>>
>> Rajini
>>
>>
>>
>> Unless stated otherwise above:
>> IBM United Kingdom Limited - Registered in England and Wales with number
>> 741598.
>> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>>

Reply via email to