Whenever the BufferPool throws a "Failed to allocate memory within the
configured max blocking time" excepion, it should also remove the condition
object from the waiters deque. Otherwise the condition object is stays
forever in the deque.

(i.e) "this.waiters.remove(moreMemory);" should happen before the exception
is thrown.

.Otherwise the waiting thread count will never get to 0 after the exception
and batching will not occur. This is because in the RecordAccumulator.ready
method the exhausted flat is set as

boolean exhausted = this.free.queued() > 0 where free.queued() returns the
waiters.size().

I reported a issue with the producer on this thread
http://mail-archives.apache.org/mod_mbox/kafka-users/201605.mbox/browser

and this was because of above issue.


Thanks

Reply via email to