Nice catch. Do you have a jira for this?
I can submit a patch right away. This should be a small patch.

Thanks,

Mayuresh

On Tue, May 3, 2016 at 1:56 PM, Prabhu V <vpra...@gmail.com> wrote:

> Whenever the BufferPool throws a "Failed to allocate memory within the
> configured max blocking time" excepion, it should also remove the condition
> object from the waiters deque. Otherwise the condition object is stays
> forever in the deque.
>
> (i.e) "this.waiters.remove(moreMemory);" should happen before the exception
> is thrown.
>
> .Otherwise the waiting thread count will never get to 0 after the exception
> and batching will not occur. This is because in the RecordAccumulator.ready
> method the exhausted flat is set as
>
> boolean exhausted = this.free.queued() > 0 where free.queued() returns the
> waiters.size().
>
> I reported a issue with the producer on this thread
> http://mail-archives.apache.org/mod_mbox/kafka-users/201605.mbox/browser
>
> and this was because of above issue.
>
>
> Thanks
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to