Hi Siddharth, The error log shows that the kafka broker the consumer talks to is no longer hosting the leader replica of the topic-partition, is there any broker failures during that time (you may find it in server and controller logs)?
Guozhang On Thu, Sep 4, 2014 at 12:04 AM, siddharth ubale <siddharth.ub...@gmail.com> wrote: > Hi, > > I am using a kafka spout to read data into Storm 0.9.2. > whenever i run my program and check the console consumer, i can see that > there is an aerror saying > *" Failed to add leader for partitions [Json1234,0]; will retry > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread"* > along with a log : > kafka.common.NotLeaderForPartitionException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at java.lang.Class.newInstance(Class.java:374) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) > at > > kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) > at > > kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) > at > > kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179) > at > > kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174) > at > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > > kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) > at > > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) > at > > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) > at > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > > kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) > at > > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > ^C[2014-09-04 12:27:45,186] WARN Reconnect due to socket error: null > (kafka.consumer.SimpleConsumer) > [2014-09-04 12:27:45,223] WARN Reconnect due to socket error: null > (kafka.consumer.SimpleConsumer) > [2014-09-04 12:27:45,237] WARN Reconnect due to socket error: null > (kafka.consumer.SimpleConsumer) > > has any one encountered this issue. > I am using *kafka_2.10-0.8.1.1 and storm 0.9.2 incubating with zookeeper > 3.4.6.* > > *do let me know. also i see this very often whenever i have restarted my > topic .* > > Thanks, > Siddharth ubale > -- -- Guozhang