The consumer has a config property called consumer.timeout.ms. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.
Thanks, Jun On Fri, Aug 9, 2013 at 9:25 AM, Jan Rudert <rud...@gmail.com> wrote: > Hi, > > I have an consumer application where I have a message stream per topic and > one thread per stream. > > I will do a commitOffsets() when a global shared message counter is > reaching a limit. > > I think I need to make sure that no thread is consuming while I call > commitOffsets() to ensure that no concurrent consuming error happens in one > of the threads. > > Therefor I use a CyclicBarrier in my threads and do the commitOffsets() in > the barrier action. > > The problem arises in case thread A is blocked in stream.next() when there > is no traffic in the topic. When the other threads are blocked in > barrier.await() they have to wait until A receives a message. This can > possible block all consuming. > > Is there a best practice on committing properly in a multithreaded > consumer? > > Thank you! > Jan >