Hi Mayuresh, Staying on the BufferPool.java, could you tell me why we need the following piece of code
if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } As far as I see, the there are 2 threads, the Producer and the Sender. The producer attemps to append the record and if there is no memory in the buffer it "awaits" on the "condition". When the Sender has sent some data, it deallocates the buffer and "signals" the "condition". In this scenario that there never will be more than one element in the "waiters" deque and the producer thread is blocked. If we have a multithreaded producer then it makes sense. Please let me know if this is not the case. Thanks, Prabhu On Tue, May 3, 2016 at 1:56 PM, Prabhu V <vpra...@gmail.com> wrote: > > Whenever the BufferPool throws a "Failed to allocate memory within the > configured max blocking time" excepion, it should also remove the condition > object from the waiters deque. Otherwise the condition object is stays > forever in the deque. > > (i.e) "this.waiters.remove(moreMemory);" should happen before the > exception is thrown. > > .Otherwise the waiting thread count will never get to 0 after the > exception and batching will not occur. This is because in the > RecordAccumulator.ready method the exhausted flat is set as > > boolean exhausted = this.free.queued() > 0 where free.queued() returns the > waiters.size(). > > I reported a issue with the producer on this thread > http://mail-archives.apache.org/mod_mbox/kafka-users/201605.mbox/browser > > and this was because of above issue. > > > Thanks >