The consumer has a config property called consumer.timeout.ms. By setting
the value to a positive integer, a timeout exception is thrown to the
consumer if no message is available for consumption after the specified
timeout value.

Thanks,
Jun


On Fri, Aug 9, 2013 at 9:25 AM, Jan Rudert <rud...@gmail.com> wrote:

> Hi,
>
> I have an consumer application where I have a message stream per topic and
> one thread per stream.
>
> I will do a commitOffsets() when a global shared message counter is
> reaching a limit.
>
> I think I need to make sure that no thread is consuming while I call
> commitOffsets() to ensure that no concurrent consuming error happens in one
> of the threads.
>
> Therefor I use a CyclicBarrier in my threads and do the commitOffsets() in
> the barrier action.
>
> The problem arises in case thread A is blocked in stream.next() when there
> is no traffic in the topic. When the other threads are blocked in
> barrier.await() they have to wait until A receives a message. This can
> possible block all consuming.
>
> Is there a best practice on committing properly in a multithreaded
> consumer?
>
> Thank you!
> Jan
>

Reply via email to