As discussed in the KIP call, I have updated the kip-72 page ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) to record both configuration validations and implementation concerns. I've also implemented channel muting/unmuting in response to memory pressure. its available as a separate branch here - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting . the implementation without muting is here - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.
the mute/unmute happens just before poll(), which means as a worst case there will be no reads for 300ms if memory was unavailable (thats the timeout untill the next poll). perhaps a design with dedicated read threads could do better (such a thread could actually block waiting for memory), but that would be a giant change. On Tue, Sep 13, 2016 at 9:20 AM, radai <radai.rosenbl...@gmail.com> wrote: > the specific memory pool implementation i wrote will allocate _any_ amount > you request if it has _any_ memory available (so if it has 1 byte available > and you ask for 1MB you will get 1MB and the counter will go negative). > this was done to avoid issues with starvation of large requests. other > implementations may be more strict. to me this means that generally its not > a simple "have memory" vs "no memory" split (which gets worse under a > hypothetical tiered pool scheme for QoS). > > to allow this flexibility in pool implementation i must preserve the > amount of memory required. once read from the channel i cant put it back, > so i store it? > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram < > rajinisiva...@googlemail.com> wrote: > >> Is there any value in allowing the 4-byte size to be read even when the >> request memory limit has been reached? If not, you can disable OP_READ >> interest for all channels that are ready inside Selector.poll() when >> memory >> limit has been reached and re-enable before returning from poll(). Perhaps >> a listener that is invoked when MemoryPool moves from unavailable to >> available state can wakeup the selector. The changes for this should be >> fairly contained without any additional channel state. And it would avoid >> the overhead of polls that return immediately even when progress cannot be >> made because memory limit has been reached. >> >> On Tue, Sep 13, 2016 at 12:31 AM, radai <radai.rosenbl...@gmail.com> >> wrote: >> >> > Hi Jun, >> > >> > Yes, youre right - right now the next select() call will return >> immediately >> > with the same set of keys as earlier (at least) as they were not >> previously >> > handled (no memory). >> > My assumption is that this happens under considerable load - something >> has >> > to be occupying all this memory. also, this happens in the context of >> > SocketServer.Processor.run(): >> > >> > while (isRunning) { >> > configureNewConnections() >> > processNewResponses() >> > poll() <------ HERE >> > processCompletedReceives() >> > processCompletedSends() >> > processDisconnected() >> > } >> > >> > even within poll(), things like finishConnection(), prepare(), and >> write()s >> > can still make progress under low memory conditions. and given the load, >> > there's probably progress to be made in processCompletedReceives(), >> > processCompletedSends() and processDisconnected(). >> > >> > if there's progress to be made in other things its likely that the next >> > call to poll() will not happen immediately and so the loop wont be that >> > tight. in order for this to devolve into true busy waiting you would >> need a >> > situation where no progress can be made on any in-progress requests and >> no >> > responses to send out ? >> > >> > if my assumption does not hold then you are correct, and >> selector.poll(300) >> > currently hardcoded in SocketServer.Processor.poll() would need to be >> > replaced with something more complicated. my biggest point of concern >> > though is that the resulting code would be complicated and would couple >> > Selector to the memory pool very tightly. undey my current patch >> Selector >> > needs the memory pool only to pass to channels when they are built. this >> > would allow different memory pools relatively easily for things like >> > reserving memory for cross-broker replication and high-SLA connections. >> a >> > tighter coupling would make any such future modification hard. >> > >> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Radai, >> > > >> > > Thanks for the reply. I still have a followup question on #2. >> > > >> > > My understanding is that in your proposal, selector will now first >> read >> > the >> > > size of the Receive. If there is not enough memory, it has to turn off >> > the >> > > READ interest bit for the corresponding KafkaChannel. Otherwise, >> > subsequent >> > > selector.poll() call will always return immediately, adding >> unnecessary >> > > overhead. If you do that, the Selector will need to know when to >> turn on >> > > the READ interest bit again. It may not be enough to do this check >> until >> > > the next poll call since the timeout used by poll() could be >> arbitrarily >> > > large. So, it seems that some kind of coordination between the >> Selector >> > and >> > > the bufferpool is needed? >> > > >> > > Jun >> > > >> > > On Thu, Sep 8, 2016 at 7:02 PM, radai <radai.rosenbl...@gmail.com> >> > wrote: >> > > >> > > > Hi Jun, >> > > > >> > > > 1. yes, it is my own personal opinion that people use >> > queued.max.requests >> > > > as an indirect way to bound memory consumption. once a more direct >> > memory >> > > > bound mechanism exists (and works) i dont think queued.max.requests >> > woul >> > > > dbe required. having said that I was not planning on making any >> changes >> > > > w.r.t queued.max.requests support (so I was aiming to get to a >> > situation >> > > > where both configs are supported) to allow gathering enough >> > > data/feedback. >> > > > >> > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a >> > > > NetworkReceive. multiple such read() calls may be required until a >> > > Receive >> > > > is produced already in the current code base. my pool >> implementation is >> > > > non-blocking so if there's no memory available the read() call will >> > > return >> > > > null. poll() would then move on to try and service other selection >> > keys. >> > > > the pool will be checked for available memory again the next time >> the >> > > > SocketServer.run() loop gets to poll(). and so right now I dont >> > > communicate >> > > > memory becoming available to the selector - it will just go on to >> try >> > and >> > > > make progress elsewhere and come back again. i never block it or >> send >> > it >> > > to >> > > > sleep. I think for efficiency what could maybe be done is if there's >> > not >> > > > enough memory to service a readable selection key we may want to >> skip >> > all >> > > > other read-ready selection keys for that iteration of >> > > pollSelectionKeys(). >> > > > that would require rather invasive changes around >> > > > Selector.pollSelectionKeys() that I'd rather avoid. also different >> > > > KafkaChannels may be backed by different memory pool (under some >> sort >> > of >> > > > future QoS scheme?), which would complicate such an optimization >> > further. >> > > > >> > > > 3. i added the pool interface and implementation under >> > > kafka.common.memory, >> > > > and the API is "thin" enough to be generally useful (currently its >> > > > non-blocking only, but a get(long maxWait) is definitely doable). >> > having >> > > > said that, I'm not really familiar enough with the code to say.... >> > > > >> > > > >> > > > >> > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao <j...@confluent.io> wrote: >> > > > >> > > > > Hi, Radi, >> > > > > >> > > > > Thanks for the update. At the high level, this looks promising. A >> few >> > > > > comments below. >> > > > > >> > > > > 1. If we can bound the requests by bytes, it seems that we don't >> need >> > > > > queued.max.requests >> > > > > any more? Could we just deprecate the config and make the queue >> size >> > > > > unbounded? >> > > > > 2. How do we communicate back to the selector when some memory is >> > freed >> > > > up? >> > > > > We probably need to wake up the selector. For efficiency, perhaps >> we >> > > only >> > > > > need to wake up the selector if the bufferpool is full? >> > > > > 3. We talked about bounding the consumer's memory before. To fully >> > > > support >> > > > > that, we will need to bound the memory used by different fetch >> > > responses >> > > > in >> > > > > the consumer. Do you think the changes that you propose here can >> be >> > > > > leveraged to bound the memory in the consumer as well? >> > > > > >> > > > > Jun >> > > > > >> > > > > >> > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai < >> radai.rosenbl...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > My apologies for the delay in response. >> > > > > > >> > > > > > I agree with the concerns about OOM reading from the actual >> sockets >> > > and >> > > > > > blocking the network threads - messing with the request queue >> > itself >> > > > > would >> > > > > > not do. >> > > > > > >> > > > > > I propose instead a memory pool approach - the broker would >> have a >> > > non >> > > > > > blocking memory pool. upon reading the first 4 bytes out of a >> > socket >> > > an >> > > > > > attempt would be made to acquire enough memory and if that >> attempt >> > > > fails >> > > > > > the processing thread will move on to try and make progress with >> > > other >> > > > > > tasks. >> > > > > > >> > > > > > I think Its simpler than mute/unmute because using mute/unmute >> > would >> > > > > > require differentiating between sockets muted due to a request >> in >> > > > > progress >> > > > > > (normal current operation) and sockets muted due to lack of >> memory. >> > > > > sockets >> > > > > > of the 1st kind would be unmuted at the end of request >> processing >> > (as >> > > > it >> > > > > > happens right now) but the 2nd kind would require some sort of >> > > "unmute >> > > > > > watchdog" which is (i claim) more complicated than a memory >> pool. >> > > also >> > > > a >> > > > > > memory pool is a more generic solution. >> > > > > > >> > > > > > I've updated the KIP page ( >> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+ >> > Incoming+requests) >> > > > > > to reflect the new proposed implementation, and i've also put >> up an >> > > > > inital >> > > > > > implementation proposal on github - >> > > > > > https://github.com/radai-rosenblatt/kafka/commits/ >> > broker-memory-pool >> > > . >> > > > > the >> > > > > > proposed code is not complete and tested yet (so probably buggy) >> > but >> > > > does >> > > > > > include the main points of modification. >> > > > > > >> > > > > > the specific implementation of the pool on that branch also has >> a >> > > built >> > > > > in >> > > > > > safety net where memory that is acquired but not released (which >> > is a >> > > > > bug) >> > > > > > is discovered when the garbage collector frees it and the >> capacity >> > is >> > > > > > reclaimed. >> > > > > > >> > > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao <j...@confluent.io> >> wrote: >> > > > > > >> > > > > > > Radi, >> > > > > > > >> > > > > > > Yes, I got the benefit of bounding the request queue by >> bytes. My >> > > > > concern >> > > > > > > is the following if we don't change the behavior of processor >> > > > blocking >> > > > > on >> > > > > > > queue full. >> > > > > > > >> > > > > > > If the broker truly doesn't have enough memory for buffering >> > > > > outstanding >> > > > > > > requests from all connections, we have to either hit OOM or >> block >> > > the >> > > > > > > processor. Both will be bad. I am not sure if one is clearly >> > better >> > > > > than >> > > > > > > the other. In this case, the solution is probably to expand >> the >> > > > cluster >> > > > > > to >> > > > > > > reduce the per broker request load. >> > > > > > > >> > > > > > > If the broker actually has enough memory, we want to be able >> to >> > > > > configure >> > > > > > > the request queue in such a way that it never blocks. You can >> > tell >> > > > > people >> > > > > > > to just set the request queue to be unbounded, which may scare >> > > them. >> > > > If >> > > > > > we >> > > > > > > do want to put a bound, it seems it's easier to configure the >> > queue >> > > > > size >> > > > > > > based on # requests. Basically, we can tell people to set the >> > queue >> > > > > size >> > > > > > > based on number of connections. If the queue is based on >> bytes, >> > > it's >> > > > > not >> > > > > > > clear how people should set it w/o causing the processor to >> > block. >> > > > > > > >> > > > > > > Finally, Rajini has a good point. The ByteBuffer in the >> request >> > > > object >> > > > > is >> > > > > > > allocated as soon as we see the first 4 bytes from the socket. >> > So, >> > > I >> > > > am >> > > > > > not >> > > > > > > sure if just bounding the request queue itself is enough to >> bound >> > > the >> > > > > > > memory related to requests. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > Jun >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai < >> > radai.rosenbl...@gmail.com> >> > > > > > wrote: >> > > > > > > >> > > > > > > > I agree that filling up the request queue can cause clients >> to >> > > time >> > > > > out >> > > > > > > > (and presumably retry?). However, for the workloads where we >> > > expect >> > > > > > this >> > > > > > > > configuration to be useful the alternative is currently an >> OOM >> > > > crash. >> > > > > > > > In my opinion an initial implementation of this feature >> could >> > be >> > > > > > > > constrained to a simple drop-in replacement of >> > ArrayBlockingQueue >> > > > > > > > (conditional, opt-in) and further study of behavior patterns >> > > under >> > > > > load >> > > > > > > can >> > > > > > > > drive future changes to the API later when those behaviors >> are >> > > > better >> > > > > > > > understood (like back-pressure, nop filler responses to >> avoid >> > > > client >> > > > > > > > timeouts or whatever). >> > > > > > > > >> > > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < >> > > > > > > > gharatmayures...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Nice write up Radai. >> > > > > > > > > I think what Jun said is a valid concern. >> > > > > > > > > If I am not wrong as per the proposal, we are depending on >> > the >> > > > > entire >> > > > > > > > > pipeline to flow smoothly from accepting requests to >> handling >> > > it, >> > > > > > > calling >> > > > > > > > > KafkaApis and handing back the responses. >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > >> > > > > > > > > Mayuresh >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy < >> > > jjkosh...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > . >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > > Hi Becket, >> > > > > > > > > > > >> > > > > > > > > > > I don't think progress can be made in the processor's >> run >> > > > loop >> > > > > if >> > > > > > > the >> > > > > > > > > > > queue fills up. i.e., I think Jun's point is that if >> the >> > > > queue >> > > > > is >> > > > > > > > full >> > > > > > > > > > > (either due to the proposed max.bytes or today due to >> > > > > > max.requests >> > > > > > > > > > hitting >> > > > > > > > > > > the limit) then processCompletedReceives will block >> and >> > no >> > > > > > further >> > > > > > > > > > progress >> > > > > > > > > > > can be made. >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > I'm sorry - this isn't right. There will be progress as >> > long >> > > as >> > > > > the >> > > > > > > API >> > > > > > > > > > handlers are able to pick requests off the request queue >> > and >> > > > add >> > > > > > the >> > > > > > > > > > responses to the response queues (which are effectively >> > > > > unbounded). >> > > > > > > > > > However, the point is valid that blocking in the request >> > > > > channel's >> > > > > > > put >> > > > > > > > > has >> > > > > > > > > > the effect of exacerbating the pressure on the socket >> > server. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> >> > > > > > > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao < >> > > j...@confluent.io> >> > > > > > > wrote: >> > > > > > > > > > >> >> > > > > > > > > > >> > Radai, >> > > > > > > > > > >> > >> > > > > > > > > > >> > Thanks for the proposal. A couple of comments on >> this. >> > > > > > > > > > >> > >> > > > > > > > > > >> > 1. Since we store request objects in the request >> > queue, >> > > > how >> > > > > do >> > > > > > > we >> > > > > > > > > get >> > > > > > > > > > an >> > > > > > > > > > >> > accurate size estimate for those requests? >> > > > > > > > > > >> > >> > > > > > > > > > >> > 2. Currently, it's bad if the processor blocks on >> > > adding a >> > > > > > > request >> > > > > > > > > to >> > > > > > > > > > >> the >> > > > > > > > > > >> > request queue. Once blocked, the processor can't >> > process >> > > > the >> > > > > > > > sending >> > > > > > > > > > of >> > > > > > > > > > >> > responses of other socket keys either. This will >> cause >> > > all >> > > > > > > clients >> > > > > > > > > in >> > > > > > > > > > >> this >> > > > > > > > > > >> > processor with an outstanding request to eventually >> > > > timeout. >> > > > > > > > > > Typically, >> > > > > > > > > > >> > this will trigger client-side retries, which will >> add >> > > more >> > > > > > load >> > > > > > > on >> > > > > > > > > the >> > > > > > > > > > >> > broker and cause potentially more congestion in the >> > > > request >> > > > > > > queue. >> > > > > > > > > > With >> > > > > > > > > > >> > queued.max.requests, to prevent blocking on the >> > request >> > > > > queue, >> > > > > > > our >> > > > > > > > > > >> > recommendation is to configure queued.max.requests >> to >> > be >> > > > the >> > > > > > > same >> > > > > > > > as >> > > > > > > > > > the >> > > > > > > > > > >> > number of socket connections on the broker. Since >> the >> > > > broker >> > > > > > > never >> > > > > > > > > > >> > processes more than 1 request per connection at a >> > time, >> > > > the >> > > > > > > > request >> > > > > > > > > > >> queue >> > > > > > > > > > >> > will never be blocked. With queued.max.bytes, it's >> > going >> > > > to >> > > > > be >> > > > > > > > > harder >> > > > > > > > > > to >> > > > > > > > > > >> > configure the value properly to prevent blocking. >> > > > > > > > > > >> > >> > > > > > > > > > >> > So, while adding queued.max.bytes is potentially >> > useful >> > > > for >> > > > > > > memory >> > > > > > > > > > >> > management, for it to be truly useful, we probably >> > need >> > > to >> > > > > > > address >> > > > > > > > > the >> > > > > > > > > > >> > processor blocking issue for it to be really >> useful in >> > > > > > practice. >> > > > > > > > One >> > > > > > > > > > >> > possibility is to put back-pressure to the client >> when >> > > the >> > > > > > > request >> > > > > > > > > > >> queue is >> > > > > > > > > > >> > blocked. For example, if the processor notices that >> > the >> > > > > > request >> > > > > > > > > queue >> > > > > > > > > > is >> > > > > > > > > > >> > full, it can turn off the interest bit for read for >> > all >> > > > > socket >> > > > > > > > keys. >> > > > > > > > > > >> This >> > > > > > > > > > >> > will allow the processor to continue handling >> > responses. >> > > > > When >> > > > > > > the >> > > > > > > > > > >> request >> > > > > > > > > > >> > queue has space again, it can indicate the new >> state >> > to >> > > > the >> > > > > > > > process >> > > > > > > > > > and >> > > > > > > > > > >> > wake up the selector. Not sure how this will work >> with >> > > > > > multiple >> > > > > > > > > > >> processors >> > > > > > > > > > >> > though since the request queue is shared across all >> > > > > > processors. >> > > > > > > > > > >> > >> > > > > > > > > > >> > Thanks, >> > > > > > > > > > >> > >> > > > > > > > > > >> > Jun >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai < >> > > > > > > > radai.rosenbl...@gmail.com> >> > > > > > > > > > >> wrote: >> > > > > > > > > > >> > >> > > > > > > > > > >> > > Hello, >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > I'd like to initiate a discussion about >> > > > > > > > > > >> > > https://cwiki.apache.org/ >> > > confluence/display/KAFKA/KIP- >> > > > > > > > > > >> > > 72%3A+Allow+Sizing+Incoming+Re >> quest+Queue+in+Bytes >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > The goal of the KIP is to allow configuring a >> bound >> > on >> > > > the >> > > > > > > > > capacity >> > > > > > > > > > >> (as >> > > > > > > > > > >> > in >> > > > > > > > > > >> > > bytes of memory used) of the incoming request >> queue, >> > > in >> > > > > > > addition >> > > > > > > > > to >> > > > > > > > > > >> the >> > > > > > > > > > >> > > current bound on the number of messages. >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > This comes after several incidents at Linkedin >> > where a >> > > > > > sudden >> > > > > > > > > > "spike" >> > > > > > > > > > >> of >> > > > > > > > > > >> > > large message batches caused an out of memory >> > > exception. >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > Thank you, >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > Radai >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > -- >> > > > > > > > > -Regards, >> > > > > > > > > Mayuresh R. Gharat >> > > > > > > > > (862) 250-7125 >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >> >> -- >> Regards, >> >> Rajini >> > >