Just to clarify, are the consumer threads you are referring to the number 
passed into the map along with the topic when instantiating the connector or is 
it the fetcher thread count?  This formula must specify a maximum memory usage 
and not a working usage or we would still be getting OOMEs.  Under what 
conditions would the max memory usage be reached? 

-drew
________________________________________
From: Drew Daugherty [drew.daughe...@returnpath.com]
Sent: Friday, August 16, 2013 9:12 AM
To: users@kafka.apache.org
Subject: RE: Kafka Consumer Threads Stalled

Thanks Jun, that would explain why I was running out of memory.

-drew
________________________________________
From: Jun Rao [jun...@gmail.com]
Sent: Friday, August 16, 2013 8:37 AM
To: users@kafka.apache.org
Subject: Re: Kafka Consumer Threads Stalled

The more accurate formula is the following since fetch size is per
partition.

<consumer threads> * <queuedchunks.max> * <fetch size> * #partitions

Thanks,

Jun


On Thu, Aug 15, 2013 at 9:40 PM, Drew Daugherty <
drew.daughe...@returnpath.com> wrote:

> Thank you Jun. It turned out an OOME was thrown in one of the consumer
> fetcher threads.  Speaking of which, what is the best method for
> determining the consumer memory usage?  I had read that the formula below
> would suffice, but I am questioning it:
>
> <consumer threads> * <queuedchunks.max> * <fetch size>
>
> -drew
> ________________________________________
> From: Jun Rao [jun...@gmail.com]
> Sent: Wednesday, August 14, 2013 8:42 AM
> To: users@kafka.apache.org
> Subject: Re: Kafka Consumer Threads Stalled
>
> In that case, have you looked at
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped%2Cwhy%3F
> ?
>
> Thanks,
>
> Jun
>
>
> On Wed, Aug 14, 2013 at 7:38 AM, Drew Daugherty <
> drew.daughe...@returnpath.com> wrote:
>
> > The problem is not the fact that the timeout exceptions are being thrown.
> >  We have tried with and without the timeout setting and, in both cases,
> we
> > end up with threads that are stalled and not consuming data. Thus the
> > problem is consumers that are registered and not consuming and no
> > rebalancing is done  We suspected a problem with zookeeper but we have
> run
> > smoke and latency tests and got reasonable results.
> >
> > -drew
> >
> > Sent from Moxier Mail
> > (http://www.moxier.com)
> >
> >
> > ----- Original Message -----
> > From: Jun Rao <jun...@gmail.com>
> > To: "users@kafka.apache.org" <users@kafka.apache.org>
> > Sent: 8/13/2013 10:17 PM
> > Subject: Re: Kafka Consumer Threads Stalled
> >
> >
> >
> > If you don't want to see ConsumerTimeoutException, just set
> > consumer.timeout.ms to -1. If you do need consumer.timeout.ms larger
> than
> > 0, make sure that on ConsumerTimeoutException,  your consumer thread
> loops
> > back and calls hasNext() on the iterator to resume the consumption.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Aug 13, 2013 at 4:57 PM, Drew Daugherty <
> > drew.daughe...@returnpath.com> wrote:
> >
> > > Hi,
> > >
> > > We are using zookeeper 3.3.6 with kafka 0.7.2. We have a topic with 8
> > > partitions on each of 3 brokers that we are consuming with a consumer
> > group
> > > with multiple threads.  We are using the following settings for our
> > > consumers:
> > > zk.connectiontimeout.ms=12000000
> > > fetch_size=52428800
> > > queuedchunks.max=6
> > > consumer.timeout.ms=5000
> > >
> > > Our brokers have the following configuration:
> > > socket.send.buffer=1048576
> > > socket.receive.buffer=1048576
> > > max.socket.request.bytes=104857600
> > > log.flush.interval=10000
> > > log.default.flush.interval.ms=1000
> > > log.default.flush.scheduler.interval.ms=1000
> > > log.retention.hours=4
> > > log.file.size=536870912
> > > enable.zookeeper=true
> > > zk.connectiontimeout.ms=6000
> > > zk.sessiontimeout.ms=6000
> > > max.message.size=52428800
> > >
> > > We are noticing that after the consumer runs for a short while, some
> > > threads stop consuming and start throwing the following timeout
> > exceptions:
> > > kafka.consumer.ConsumerTimeoutException
> > >         at
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:66)
> > >         at
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> > >         at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > >         at
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > >
> > > When this happens, message consumption on the affected partitions
> doesn't
> > > recover but stalls and the consumer offset remains frozen.  The
> > exceptions
> > > also continue to be thrown in the logs as the thread logic logs the
> error
> > > then tries to create another iterator from the stream and consume from
> > it.
> > >  We also notice that consumption tends to freeze on 2/3 brokers but
> there
> > > is one that always seems to keep the consumers fed.  Are there settings
> > or
> > > logic we can use to avoid or recover from such exceptions?
> > >
> > > -drew
> > >
> >
>

Reply via email to