Thanks a lot Gwen. I bumped up the JVM to 1g on the consumer side and it works :)
All the consumer belong to the same group and I am using the High level group API to consume from the kafka. It seems there is some initial meta data exchange or something about all the partitions are sent to all the consumer. Also, I launch 10 consumer from each machine at a time and keep on adding till 200 consumers. I see that initial consumers seem to require initially lot more cpu and memory. Should I launch all the consumers at one go instead of adding 10 at a time? On different issue, I couldn't find anyway of keeping the current read offset metadata while using the High level API( I am using the gem mentioned in earlier mail). Is there anyway to record the current read metadata periodically to monitor the progress of the consumer. Further, everytime a consumer dies and restart it seems to start reading from the beginning, is there anyway to read from last read offsets only? Thanks -Pranay On Mon, Jan 19, 2015 at 6:54 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > Two things: > 1. The OOM happened on the consumer, right? So the memory that matters > is the RAM on the consumer machine, not on the Kafka cluster nodes. > > 2. If the consumers belong to the same consumer group, each will > consume a subset of the partitions and will only need to allocate > memory for those partitions. > > So, assuming all your consumers belong to the same group: > 2 consumers -> each has 500 partitions -> each uses 500MB. > > The total remains 1GB no matter how many consumers you have, as long > as they are all in the same group. > > If the consumer belong to different groups (i.e. they read copies of > the same messages from the same partitions), then yes, you are limited > to 8 per server (probably less because there are other stuff on the > server). > > Gwen > > On Mon, Jan 19, 2015 at 3:06 PM, Pranay Agarwal > <agarwalpran...@gmail.com> wrote: > > Thanks a lot Natty. > > > > I am using this Ruby gem on the client side with all the default config > > > https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb > > and the value fetch.message.max.bytes is set to 1MB. > > > > Currently I only have 3 nodes setup in the Kafka cluster (with 8 GB RAM) > > and if 1 consumer if going to take 1000 partitions X 1mb ~ 1GB, does it > > mean 1 kafka node can at best support 8 consumer only? Also, when I do > > top/free on the Kafka cluster nodes (Both zookeeper and kafka is deployed > > on each 3 nodes of the cluster) I don't see lots of memory being used on > > the machine. Also, even with this calculation, I shouldn't be facing any > > issue with only 1 consumer, as I have 8GB of JVM space given to Kafka > > nodes, right? > > > > Thanks > > -Pranay > > > > On Mon, Jan 19, 2015 at 2:53 PM, Jonathan Natkins <na...@streamsets.com> > > wrote: > > > >> The fetch.message.max.size is actually a client-side configuration. With > >> regard to increasing the number of threads, I think the calculation may > be > >> a little more subtle than what you're proposing, and frankly, it's > unlikely > >> that your servers can handle allocating 200MB x 1000 threads = 200GB of > >> memory at a single time. > >> > >> I believe that if you have every partition on a single broker, and all > of > >> your consumer threads are requesting data simultaneously, then yes, the > >> broker would attempt to allocate 200GB of heap, and probably you'll hit > an > >> OOME. However, since each consumer is only reading from one partition, > >> those 1000 threads should be making requests that are spread out over > the > >> entire Kafka cluster. Depending on the memory on your servers, you may > need > >> to increase the number of brokers in your cluster to support the 1000 > >> threads. For example, I would expect that you can support this with 10 > >> brokers if each broker has something north of 20GB of heap allocated. > >> > >> Some of this is a little bit of guess work on my part, and I'm not super > >> confident of my numbers...Can anybody else on the list validate my math? > >> > >> Thanks, > >> Natty > >> > >> Jonathan "Natty" Natkins > >> StreamSets | Customer Engagement Engineer > >> mobile: 609.577.1600 | linkedin <http://www.linkedin.com/in/nattyice> > >> > >> > >> On Mon, Jan 19, 2015 at 2:34 PM, Pranay Agarwal < > agarwalpran...@gmail.com> > >> wrote: > >> > >> > Thanks Natty. > >> > > >> > Is there any config which I need to change on the client side as well? > >> > Also, currently I am trying with only 1 consumer thread. Does the > >> equation > >> > changes to > >> > (#partitions)*(fetchsize)*(#consumer_threads) in case I try to read > with > >> > 1000 threads from from topic2(1000 partitions)? > >> > > >> > -Pranay > >> > > >> > On Mon, Jan 19, 2015 at 2:26 PM, Jonathan Natkins < > na...@streamsets.com> > >> > wrote: > >> > > >> > > Hi Pranay, > >> > > > >> > > I think the JIRA you're referencing is a bit orthogonal to the OOME > >> that > >> > > you're experiencing. Based on the stacktrace, it looks like your > OOME > >> is > >> > > coming from a consumer request, which is attempting to allocate > 200MB. > >> > > There was a thread (relatively recently) that discussed what I > think is > >> > > your issue: > >> > > > >> > > > >> > > > >> > > >> > http://mail-archives.apache.org/mod_mbox/kafka-users/201412.mbox/%3CCAG1fNJDHHGSL-x3wp=pPZS1asOdOBrQ-Ge3kiA3Bk_iz7o=5...@mail.gmail.com%3E > >> > > > >> > > I suspect that the takeaway is that the way Kafka determines the > >> required > >> > > memory for a consumer request is (#partitions in the topic) x > >> > > (replica.fetch.max.bytes), and seemingly you don't have enough > memory > >> > > allocated to handle that request. The solution is likely to increase > >> the > >> > > heap size on your brokers or to decrease your max fetch size. > >> > > > >> > > Thanks, > >> > > Natty > >> > > > >> > > Jonathan "Natty" Natkins > >> > > StreamSets | Customer Engagement Engineer > >> > > mobile: 609.577.1600 | linkedin < > http://www.linkedin.com/in/nattyice> > >> > > > >> > > > >> > > On Mon, Jan 19, 2015 at 2:10 PM, Pranay Agarwal < > >> > agarwalpran...@gmail.com> > >> > > wrote: > >> > > > >> > > > Hi All, > >> > > > > >> > > > I have a kafka cluster setup which has 2 topics > >> > > > > >> > > > topic1 with 10 partitions > >> > > > topic2 with 1000 partitions. > >> > > > > >> > > > While, I am able to consume messages from topic1 just fine, I get > >> > > following > >> > > > error from the topic2. There is a resolved issue here on the same > >> thing > >> > > > https://issues.apache.org/jira/browse/KAFKA-664 > >> > > > > >> > > > I am using latest kafka server version, and I have used kafka > command > >> > > line > >> > > > tools to consumer from the topics. > >> > > > > >> > > > >>>> > >> > > > [2015-01-19 22:08:10,758] ERROR OOME with size 201332748 > >> > > > (kafka.network.BoundedByteBufferReceive) > >> > > > > >> > > > java.lang.OutOfMemoryError: Java heap space > >> > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > >> > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) > >> > > > at > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > >> > > > at > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > >> > > > at > >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > >> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > >> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > >> > > > at > >> > > > > >> > > > > >> > > > >> > > >> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > >> > > > at > >> > > > > >> > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > >> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > >> > > > <<< > >> > > > > >> > > > > >> > > > Thanks > >> > > > -Pranay > >> > > > > >> > > > >> > > >> >