As discussed in the KIP call, I have updated the kip-72 page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
to record both configuration validations and implementation concerns.
I've also implemented channel muting/unmuting in response to memory
pressure. its available as a separate branch here -
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting
. the implementation without muting is here -
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.

the mute/unmute happens just before poll(), which means as a worst case
there will be no reads for 300ms if memory was unavailable (thats the
timeout untill the next poll). perhaps a design with dedicated read threads
could do better (such a thread could actually block waiting for memory),
but that would be a giant change.

On Tue, Sep 13, 2016 at 9:20 AM, radai <radai.rosenbl...@gmail.com> wrote:

> the specific memory pool implementation i wrote will allocate _any_ amount
> you request if it has _any_ memory available (so if it has 1 byte available
> and you ask for 1MB you will get 1MB and the counter will go negative).
> this was done to avoid issues with starvation of large requests. other
> implementations may be more strict. to me this means that generally its not
> a simple "have memory" vs "no memory" split (which gets worse under a
> hypothetical tiered pool scheme for QoS).
>
> to allow this flexibility in pool implementation i must preserve the
> amount of memory required. once read from the channel i cant put it back,
> so i store it?
>
> On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
>> Is there any value in allowing the 4-byte size to be read even when the
>> request memory limit has been reached? If not, you can disable OP_READ
>> interest for all channels that are ready inside Selector.poll() when
>> memory
>> limit has been reached and re-enable before returning from poll(). Perhaps
>> a listener that is invoked when MemoryPool moves from unavailable to
>> available state can wakeup the selector. The changes for this should be
>> fairly contained without any additional channel state. And it would avoid
>> the overhead of polls that return immediately even when progress cannot be
>> made because memory limit has been reached.
>>
>> On Tue, Sep 13, 2016 at 12:31 AM, radai <radai.rosenbl...@gmail.com>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Yes, youre right - right now the next select() call will return
>> immediately
>> > with the same set of keys as earlier (at least) as they were not
>> previously
>> > handled (no memory).
>> > My assumption is that this happens under considerable load - something
>> has
>> > to be occupying all this memory. also, this happens in the context of
>> > SocketServer.Processor.run():
>> >
>> > while (isRunning) {
>> >    configureNewConnections()
>> >    processNewResponses()
>> >    poll()   <------ HERE
>> >    processCompletedReceives()
>> >    processCompletedSends()
>> >    processDisconnected()
>> > }
>> >
>> > even within poll(), things like finishConnection(), prepare(), and
>> write()s
>> > can still make progress under low memory conditions. and given the load,
>> > there's probably progress to be made in processCompletedReceives(),
>> > processCompletedSends() and processDisconnected().
>> >
>> > if there's progress to be made in other things its likely that the next
>> > call to poll() will not happen immediately and so the loop wont be that
>> > tight. in order for this to devolve into true busy waiting you would
>> need a
>> > situation where no progress can be made on any in-progress requests and
>> no
>> > responses to send out ?
>> >
>> > if my assumption does not hold then you are correct, and
>> selector.poll(300)
>> > currently hardcoded in SocketServer.Processor.poll() would need to be
>> > replaced with something more complicated. my biggest point of concern
>> > though is that the resulting code would be complicated and would couple
>> > Selector to the memory pool very tightly. undey my current patch
>> Selector
>> > needs the memory pool only to pass to channels when they are built. this
>> > would allow different memory pools relatively easily for things like
>> > reserving memory for cross-broker replication and high-SLA connections.
>> a
>> > tighter coupling would make any such future modification hard.
>> >
>> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Radai,
>> > >
>> > > Thanks for the reply. I still have a followup question on #2.
>> > >
>> > > My understanding is that in your proposal, selector will now first
>> read
>> > the
>> > > size of the Receive. If there is not enough memory, it has to turn off
>> > the
>> > > READ interest bit for the corresponding KafkaChannel. Otherwise,
>> > subsequent
>> > > selector.poll() call will always return immediately, adding
>> unnecessary
>> > > overhead. If you do that, the  Selector will need to know when to
>> turn on
>> > > the READ interest bit again. It may not be enough to do this check
>> until
>> > > the next poll call since the timeout used by poll() could be
>> arbitrarily
>> > > large. So, it seems that some kind of coordination between the
>> Selector
>> > and
>> > > the bufferpool is needed?
>> > >
>> > > Jun
>> > >
>> > > On Thu, Sep 8, 2016 at 7:02 PM, radai <radai.rosenbl...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > 1. yes, it is my own personal opinion that people use
>> > queued.max.requests
>> > > > as an indirect way to bound memory consumption. once a more direct
>> > memory
>> > > > bound mechanism exists (and works) i dont think queued.max.requests
>> > woul
>> > > > dbe required. having said that I was not planning on making any
>> changes
>> > > > w.r.t queued.max.requests support (so I was aiming to get to a
>> > situation
>> > > > where both configs are supported) to allow gathering enough
>> > > data/feedback.
>> > > >
>> > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a
>> > > > NetworkReceive. multiple such read() calls may be required until a
>> > > Receive
>> > > > is produced already in the current code base. my pool
>> implementation is
>> > > > non-blocking so if there's no memory available the read() call will
>> > > return
>> > > > null. poll() would then move on to try and service other selection
>> > keys.
>> > > > the pool will be checked for available memory again the next time
>> the
>> > > > SocketServer.run() loop gets to poll(). and so right now I dont
>> > > communicate
>> > > > memory becoming available to the selector - it will just go on to
>> try
>> > and
>> > > > make progress elsewhere and come back again. i never block it or
>> send
>> > it
>> > > to
>> > > > sleep. I think for efficiency what could maybe be done is if there's
>> > not
>> > > > enough memory to service a readable selection key we may want to
>> skip
>> > all
>> > > > other read-ready selection keys for that iteration of
>> > > pollSelectionKeys().
>> > > > that would require rather invasive changes around
>> > > > Selector.pollSelectionKeys() that I'd rather avoid. also different
>> > > > KafkaChannels may be backed by different memory pool (under some
>> sort
>> > of
>> > > > future QoS scheme?), which would complicate such an optimization
>> > further.
>> > > >
>> > > > 3. i added the pool interface and implementation under
>> > > kafka.common.memory,
>> > > > and the API is "thin" enough to be generally useful (currently its
>> > > > non-blocking only, but a get(long maxWait) is definitely doable).
>> > having
>> > > > said that, I'm not really familiar enough with the code to say....
>> > > >
>> > > >
>> > > >
>> > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao <j...@confluent.io> wrote:
>> > > >
>> > > > > Hi, Radi,
>> > > > >
>> > > > > Thanks for the update. At the high level, this looks promising. A
>> few
>> > > > > comments below.
>> > > > >
>> > > > > 1. If we can bound the requests by bytes, it seems that we don't
>> need
>> > > > > queued.max.requests
>> > > > > any more? Could we just deprecate the config and make the queue
>> size
>> > > > > unbounded?
>> > > > > 2. How do we communicate back to the selector when some memory is
>> > freed
>> > > > up?
>> > > > > We probably need to wake up the selector. For efficiency, perhaps
>> we
>> > > only
>> > > > > need to wake up the selector if the bufferpool is full?
>> > > > > 3. We talked about bounding the consumer's memory before. To fully
>> > > > support
>> > > > > that, we will need to bound the memory used by different fetch
>> > > responses
>> > > > in
>> > > > > the consumer. Do you think the changes that you propose here can
>> be
>> > > > > leveraged to bound the memory in the consumer as well?
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > >
>> > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai <
>> radai.rosenbl...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > My apologies for the delay in response.
>> > > > > >
>> > > > > > I agree with the concerns about OOM reading from the actual
>> sockets
>> > > and
>> > > > > > blocking the network threads - messing with the request queue
>> > itself
>> > > > > would
>> > > > > > not do.
>> > > > > >
>> > > > > > I propose instead a memory pool approach - the broker would
>> have a
>> > > non
>> > > > > > blocking memory pool. upon reading the first 4 bytes out of a
>> > socket
>> > > an
>> > > > > > attempt would be made to acquire enough memory and if that
>> attempt
>> > > > fails
>> > > > > > the processing thread will move on to try and make progress with
>> > > other
>> > > > > > tasks.
>> > > > > >
>> > > > > > I think Its simpler than mute/unmute because using mute/unmute
>> > would
>> > > > > > require differentiating between sockets muted due to a request
>> in
>> > > > > progress
>> > > > > > (normal current operation) and sockets muted due to lack of
>> memory.
>> > > > > sockets
>> > > > > > of the 1st kind would be unmuted at the end of request
>> processing
>> > (as
>> > > > it
>> > > > > > happens right now) but the 2nd kind would require some sort of
>> > > "unmute
>> > > > > > watchdog" which is (i claim) more complicated than a memory
>> pool.
>> > > also
>> > > > a
>> > > > > > memory pool is a more generic solution.
>> > > > > >
>> > > > > > I've updated the KIP page (
>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+
>> > Incoming+requests)
>> > > > > > to reflect the new proposed implementation, and i've also put
>> up an
>> > > > > inital
>> > > > > > implementation proposal on github -
>> > > > > > https://github.com/radai-rosenblatt/kafka/commits/
>> > broker-memory-pool
>> > > .
>> > > > > the
>> > > > > > proposed code is not complete and tested yet (so probably buggy)
>> > but
>> > > > does
>> > > > > > include the main points of modification.
>> > > > > >
>> > > > > > the specific implementation of the pool on that branch also has
>> a
>> > > built
>> > > > > in
>> > > > > > safety net where memory that is acquired but not released (which
>> > is a
>> > > > > bug)
>> > > > > > is discovered when the garbage collector frees it and the
>> capacity
>> > is
>> > > > > > reclaimed.
>> > > > > >
>> > > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao <j...@confluent.io>
>> wrote:
>> > > > > >
>> > > > > > > Radi,
>> > > > > > >
>> > > > > > > Yes, I got the benefit of bounding the request queue by
>> bytes. My
>> > > > > concern
>> > > > > > > is the following if we don't change the behavior of processor
>> > > > blocking
>> > > > > on
>> > > > > > > queue full.
>> > > > > > >
>> > > > > > > If the broker truly doesn't have enough memory for buffering
>> > > > > outstanding
>> > > > > > > requests from all connections, we have to either hit OOM or
>> block
>> > > the
>> > > > > > > processor. Both will be bad. I am not sure if one is clearly
>> > better
>> > > > > than
>> > > > > > > the other. In this case, the solution is probably to expand
>> the
>> > > > cluster
>> > > > > > to
>> > > > > > > reduce the per broker request load.
>> > > > > > >
>> > > > > > > If the broker actually has enough memory, we want to be able
>> to
>> > > > > configure
>> > > > > > > the request queue in such a way that it never blocks. You can
>> > tell
>> > > > > people
>> > > > > > > to just set the request queue to be unbounded, which may scare
>> > > them.
>> > > > If
>> > > > > > we
>> > > > > > > do want to put a bound, it seems it's easier to configure the
>> > queue
>> > > > > size
>> > > > > > > based on # requests. Basically, we can tell people to set the
>> > queue
>> > > > > size
>> > > > > > > based on number of connections. If the queue is based on
>> bytes,
>> > > it's
>> > > > > not
>> > > > > > > clear how people should set it w/o causing the processor to
>> > block.
>> > > > > > >
>> > > > > > > Finally, Rajini has a good point. The ByteBuffer in the
>> request
>> > > > object
>> > > > > is
>> > > > > > > allocated as soon as we see the first 4 bytes from the socket.
>> > So,
>> > > I
>> > > > am
>> > > > > > not
>> > > > > > > sure if just bounding the request queue itself is enough to
>> bound
>> > > the
>> > > > > > > memory related to requests.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Jun
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai <
>> > radai.rosenbl...@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > I agree that filling up the request queue can cause clients
>> to
>> > > time
>> > > > > out
>> > > > > > > > (and presumably retry?). However, for the workloads where we
>> > > expect
>> > > > > > this
>> > > > > > > > configuration to be useful the alternative is currently an
>> OOM
>> > > > crash.
>> > > > > > > > In my opinion an initial implementation of this feature
>> could
>> > be
>> > > > > > > > constrained to a simple drop-in replacement of
>> > ArrayBlockingQueue
>> > > > > > > > (conditional, opt-in) and further study of behavior patterns
>> > > under
>> > > > > load
>> > > > > > > can
>> > > > > > > > drive future changes to the API later when those behaviors
>> are
>> > > > better
>> > > > > > > > understood (like back-pressure, nop filler responses to
>> avoid
>> > > > client
>> > > > > > > > timeouts or whatever).
>> > > > > > > >
>> > > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
>> > > > > > > > gharatmayures...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Nice write up Radai.
>> > > > > > > > > I think what Jun said is a valid concern.
>> > > > > > > > > If I am not wrong as per the proposal, we are depending on
>> > the
>> > > > > entire
>> > > > > > > > > pipeline to flow smoothly from accepting requests to
>> handling
>> > > it,
>> > > > > > > calling
>> > > > > > > > > KafkaApis and handing back the responses.
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Mayuresh
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy <
>> > > jjkosh...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > .
>> > > > > > > > > > >>
>> > > > > > > > > > >>
>> > > > > > > > > > > Hi Becket,
>> > > > > > > > > > >
>> > > > > > > > > > > I don't think progress can be made in the processor's
>> run
>> > > > loop
>> > > > > if
>> > > > > > > the
>> > > > > > > > > > > queue fills up. i.e., I think Jun's point is that if
>> the
>> > > > queue
>> > > > > is
>> > > > > > > > full
>> > > > > > > > > > > (either due to the proposed max.bytes or today due to
>> > > > > > max.requests
>> > > > > > > > > > hitting
>> > > > > > > > > > > the limit) then processCompletedReceives will block
>> and
>> > no
>> > > > > > further
>> > > > > > > > > > progress
>> > > > > > > > > > > can be made.
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > I'm sorry - this isn't right. There will be progress as
>> > long
>> > > as
>> > > > > the
>> > > > > > > API
>> > > > > > > > > > handlers are able to pick requests off the request queue
>> > and
>> > > > add
>> > > > > > the
>> > > > > > > > > > responses to the response queues (which are effectively
>> > > > > unbounded).
>> > > > > > > > > > However, the point is valid that blocking in the request
>> > > > > channel's
>> > > > > > > put
>> > > > > > > > > has
>> > > > > > > > > > the effect of exacerbating the pressure on the socket
>> > server.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >>
>> > > > > > > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao <
>> > > j...@confluent.io>
>> > > > > > > wrote:
>> > > > > > > > > > >>
>> > > > > > > > > > >> > Radai,
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > Thanks for the proposal. A couple of comments on
>> this.
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > 1. Since we store request objects in the request
>> > queue,
>> > > > how
>> > > > > do
>> > > > > > > we
>> > > > > > > > > get
>> > > > > > > > > > an
>> > > > > > > > > > >> > accurate size estimate for those requests?
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > 2. Currently, it's bad if the processor blocks on
>> > > adding a
>> > > > > > > request
>> > > > > > > > > to
>> > > > > > > > > > >> the
>> > > > > > > > > > >> > request queue. Once blocked, the processor can't
>> > process
>> > > > the
>> > > > > > > > sending
>> > > > > > > > > > of
>> > > > > > > > > > >> > responses of other socket keys either. This will
>> cause
>> > > all
>> > > > > > > clients
>> > > > > > > > > in
>> > > > > > > > > > >> this
>> > > > > > > > > > >> > processor with an outstanding request to eventually
>> > > > timeout.
>> > > > > > > > > > Typically,
>> > > > > > > > > > >> > this will trigger client-side retries, which will
>> add
>> > > more
>> > > > > > load
>> > > > > > > on
>> > > > > > > > > the
>> > > > > > > > > > >> > broker and cause potentially more congestion in the
>> > > > request
>> > > > > > > queue.
>> > > > > > > > > > With
>> > > > > > > > > > >> > queued.max.requests, to prevent blocking on the
>> > request
>> > > > > queue,
>> > > > > > > our
>> > > > > > > > > > >> > recommendation is to configure queued.max.requests
>> to
>> > be
>> > > > the
>> > > > > > > same
>> > > > > > > > as
>> > > > > > > > > > the
>> > > > > > > > > > >> > number of socket connections on the broker. Since
>> the
>> > > > broker
>> > > > > > > never
>> > > > > > > > > > >> > processes more than 1 request per connection at a
>> > time,
>> > > > the
>> > > > > > > > request
>> > > > > > > > > > >> queue
>> > > > > > > > > > >> > will never be blocked. With queued.max.bytes, it's
>> > going
>> > > > to
>> > > > > be
>> > > > > > > > > harder
>> > > > > > > > > > to
>> > > > > > > > > > >> > configure the value properly to prevent blocking.
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > So, while adding queued.max.bytes is potentially
>> > useful
>> > > > for
>> > > > > > > memory
>> > > > > > > > > > >> > management, for it to be truly useful, we probably
>> > need
>> > > to
>> > > > > > > address
>> > > > > > > > > the
>> > > > > > > > > > >> > processor blocking issue for it to be really
>> useful in
>> > > > > > practice.
>> > > > > > > > One
>> > > > > > > > > > >> > possibility is to put back-pressure to the client
>> when
>> > > the
>> > > > > > > request
>> > > > > > > > > > >> queue is
>> > > > > > > > > > >> > blocked. For example, if the processor notices that
>> > the
>> > > > > > request
>> > > > > > > > > queue
>> > > > > > > > > > is
>> > > > > > > > > > >> > full, it can turn off the interest bit for read for
>> > all
>> > > > > socket
>> > > > > > > > keys.
>> > > > > > > > > > >> This
>> > > > > > > > > > >> > will allow the processor to continue handling
>> > responses.
>> > > > > When
>> > > > > > > the
>> > > > > > > > > > >> request
>> > > > > > > > > > >> > queue has space again, it can indicate the new
>> state
>> > to
>> > > > the
>> > > > > > > > process
>> > > > > > > > > > and
>> > > > > > > > > > >> > wake up the selector. Not sure how this will work
>> with
>> > > > > > multiple
>> > > > > > > > > > >> processors
>> > > > > > > > > > >> > though since the request queue is shared across all
>> > > > > > processors.
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > Thanks,
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > Jun
>> > > > > > > > > > >> >
>> > > > > > > > > > >> >
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai <
>> > > > > > > > radai.rosenbl...@gmail.com>
>> > > > > > > > > > >> wrote:
>> > > > > > > > > > >> >
>> > > > > > > > > > >> > > Hello,
>> > > > > > > > > > >> > >
>> > > > > > > > > > >> > > I'd like to initiate a discussion about
>> > > > > > > > > > >> > > https://cwiki.apache.org/
>> > > confluence/display/KAFKA/KIP-
>> > > > > > > > > > >> > > 72%3A+Allow+Sizing+Incoming+Re
>> quest+Queue+in+Bytes
>> > > > > > > > > > >> > >
>> > > > > > > > > > >> > > The goal of the KIP is to allow configuring a
>> bound
>> > on
>> > > > the
>> > > > > > > > > capacity
>> > > > > > > > > > >> (as
>> > > > > > > > > > >> > in
>> > > > > > > > > > >> > > bytes of memory used) of the incoming request
>> queue,
>> > > in
>> > > > > > > addition
>> > > > > > > > > to
>> > > > > > > > > > >> the
>> > > > > > > > > > >> > > current bound on the number of messages.
>> > > > > > > > > > >> > >
>> > > > > > > > > > >> > > This comes after several incidents at Linkedin
>> > where a
>> > > > > > sudden
>> > > > > > > > > > "spike"
>> > > > > > > > > > >> of
>> > > > > > > > > > >> > > large message batches caused an out of memory
>> > > exception.
>> > > > > > > > > > >> > >
>> > > > > > > > > > >> > > Thank you,
>> > > > > > > > > > >> > >
>> > > > > > > > > > >> > >    Radai
>> > > > > > > > > > >> > >
>> > > > > > > > > > >> >
>> > > > > > > > > > >>
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > > -Regards,
>> > > > > > > > > Mayuresh R. Gharat
>> > > > > > > > > (862) 250-7125
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> Regards,
>>
>> Rajini
>>
>
>

Reply via email to