Hi,
looking at the stack trace It seems to me you are using Kafka 0.7.0.
Well, I think the issue has something to do with broker(s) registration
(i.e Zookeeper) so you should check the startup messages on Kafka and
eventually the Zookeeper logs, I'm quite sure you will find something there.
The null value
/java.lang.NumberFormatException: null/
//that causes the NumberFormatException should be the number of topic
partitions that a given broker owns. So it seems that you Zookeeper has
a node like this
//brokers/topics/<topic name>/<broker id> //
/
which doesn't have any data (i.e. number of partitions) associated.
Cheers,
Andrea
On 06/07/2013 12:26 AM, Ashish Nigam wrote:
Hi,
I am getting exception when Producer tries to send messages to Kafka queue.
It tries to connect to zookeeper instances and then throws exception.
Here's initial log -
06 Jun 2013 15:17:29,541 [DEBUG] ZkConnection | Creating new
ZookKeeper instance to connect to 172.16.1.104:2181
06 Jun 2013 15:17:29,541 [INFO ] ZkEventThread | Starting
ZkClient event thread.
06 Jun 2013 15:17:31,610 [DEBUG] ZkClient | Awaiting
connection to Zookeeper server
06 Jun 2013 15:17:31,611 [DEBUG] ZkClient | Waiting for
keeper state SyncConnected
06 Jun 2013 15:17:32,787 [DEBUG] ZkClient | Received event:
WatchedEvent state:SyncConnected type:None path:null
06 Jun 2013 15:17:32,787 [INFO ] ZkClient | zookeeper state
changed (SyncConnected)
06 Jun 2013 15:17:32,787 [DEBUG] ZkClient | Leaving process
event
06 Jun 2013 15:17:32,787 [DEBUG] ZkClient | State is
SyncConnected
After this it throws NumberFormat exception -
java.lang.NumberFormatException: null
at java.lang.Integer.parseInt(Integer.java:417)
at java.lang.Integer.parseInt(Integer.java:499)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:231)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:171)
at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:171)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at scala.collection.Iterator$class.foreach(Iterator.scala:660)
at
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at
scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
at
scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:615)
at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:171)
at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:167)
at scala.collection.Iterator$class.foreach(Iterator.scala:660)
at
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at
scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
at
kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo(ZKBrokerPartitionInfo.scala:167)
at
kafka.producer.ZKBrokerPartitionInfo.<init>(ZKBrokerPartitionInfo.scala:66)
at kafka.producer.Producer.<init>(Producer.scala:53)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:33)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:40)
at
com.shn.analytics.shneventprocessor.KafkaTestClient.test(KafkaTestClient.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Any idea why would this happen.
Thanks
Ashish