Nice catch. Do you have a jira for this? I can submit a patch right away. This should be a small patch.
Thanks, Mayuresh On Tue, May 3, 2016 at 1:56 PM, Prabhu V <vpra...@gmail.com> wrote: > Whenever the BufferPool throws a "Failed to allocate memory within the > configured max blocking time" excepion, it should also remove the condition > object from the waiters deque. Otherwise the condition object is stays > forever in the deque. > > (i.e) "this.waiters.remove(moreMemory);" should happen before the exception > is thrown. > > .Otherwise the waiting thread count will never get to 0 after the exception > and batching will not occur. This is because in the RecordAccumulator.ready > method the exhausted flat is set as > > boolean exhausted = this.free.queued() > 0 where free.queued() returns the > waiters.size(). > > I reported a issue with the producer on this thread > http://mail-archives.apache.org/mod_mbox/kafka-users/201605.mbox/browser > > and this was because of above issue. > > > Thanks > -- -Regards, Mayuresh R. Gharat (862) 250-7125