Vadim, If configured properly, Kafka should garbage collect objects fairly regularly since request/response objects are typically short lived. It seems that the only thing that would cause memory usage increase proportional to the number of topics is metrics. One issue with garbage collection in Kafka is the request purgatory, that is if the consumer clients use very large request timeouts, the fetch request objects would sit in the internal purgatory data structure for long time without being garbage collected. Do you have consumer clients that have set a large value for "fetch.wait.max.ms" ?
Thanks, Neha On Mon, Sep 2, 2013 at 10:54 PM, Vadim Keylis <vkeylis2...@gmail.com> wrote: > Jun. I will try to do heap dump once I'll able duplicate the exception. > We have 300 topics which I estimate will grow rapidly. Each topic has 36 > partition to allow greater parallesim. > how kafka heap memory usage changes from your experience with addition of > new topics and having large number of partitions? > > Thanks, > Vadim > > > On Mon, Sep 2, 2013 at 9:28 PM, Jun Rao <jun...@gmail.com> wrote: > > > For the OOME, could you do a heap dump and see what type of objects takes > > most of the space? > > > > The second ERROR exposed a bug in Kafka. File KAFKA-1038 to fix that. > > > > Thanks, > > > > Jun > > > > > > On Fri, Aug 30, 2013 at 10:39 PM, Vadim Keylis <vkeylis2...@gmail.com > > >wrote: > > > > > I followed linkedin setup example in the docs and located 3g for heap > > size. > > > > > > java -Xmx3G -Xms3G -server -XX:+UseCompressedOops -XX:+UseParNewGC > > > -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled > > > -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC - > > > > > > After a day of normal run scenario I discovered the following errors > > > flooding the error log. I can increase the heap size its not a problem, > > but > > > I want to be able properly estimate how much memory kafka will use in > > order > > > to predict system limits as we add topics, consumers and etc. > > > > > > Thanks so much in advance, > > > Vadim > > > > > > [2013-08-29 23:57:14,072] ERROR [ReplicaFetcherThread--1-6], Error due > to > > > (kafka.server.ReplicaFetcherThread) > > > java.lang.OutOfMemoryError: Java heap space > > > at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:114) > > > at > java.io.OutputStreamWriter.write(OutputStreamWriter.java:203) > > > at java.io.Writer.write(Writer.java:140) > > > at > > org.apache.log4j.helpers.QuietWriter.write(QuietWriter.java:48) > > > at > > > org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:302) > > > at > > > > > > > > > com.ordersets.utils.logging.CustodianDailyRollingFileAppender.subAppend(CustodianDailyRollingFileAppender.java:299) > > > at > > org.apache.log4j.WriterAppender.append(WriterAppender.java:160) > > > at > > > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > > > at > > > > > > > > > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > > > at org.apache.log4j.Category.callAppenders(Category.java:206) > > > at org.apache.log4j.Category.forcedLog(Category.java:391) > > > at org.apache.log4j.Category.warn(Category.java:1060) > > > at kafka.utils.Logging$class.warn(Logging.scala:88) > > > at > > kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) > > > at > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:100) > > > at > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > at > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > > > > [2013-08-30 10:21:44,932] ERROR [Kafka Request Handler 4 on Broker 5], > > > Exception when handling request (kafka.server.KafkaRequestHandler) > > > java.lang.NullPointerException > > > at > > > kafka.api.FetchResponsePartitionData.<init>(FetchResponse.scala:46) > > > at > > kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) > > > at > > kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) > > > at > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233) > > > at scala.collection.immutable.HashMap.map(HashMap.scala:38) > > > at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:78) > > > at > > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > > at java.lang.Thread.run(Thread.java:662) > > > [2013-08-30 10:05:17,214] ERROR [Kafka Request Handler 6 on Broker 5], > > > Exception when handling request (kafka.server.KafkaRequestHandler) > > > java.lang.NullPointerException > > > at > > > kafka.api.FetchResponsePartitionData.<init>(FetchResponse.scala:46) > > > at > > kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) > > > at > > kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) > > > at > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233) > > > at scala.collection.immutable.HashMap.map(HashMap.scala:38) > > > at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:78) > > > at > > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > > at java.lang.Thread.run(Thread.java:662) > > > [2013-08-30 10:06:55,929] INFO Closing socket connection to / > 10.15.11.19 > > . > > > (kafka.network.Processor) > > > > > >