Hi,
I am getting exception when Producer tries to send messages to Kafka queue.

It tries to connect to zookeeper instances and then throws exception.
Here's initial log -

06 Jun 2013 15:17:29,541 [DEBUG] ZkConnection             | Creating new
ZookKeeper instance to connect to 172.16.1.104:2181

06 Jun 2013 15:17:29,541 [INFO ] ZkEventThread            | Starting
ZkClient event thread.

06 Jun 2013 15:17:31,610 [DEBUG] ZkClient                 | Awaiting
connection to Zookeeper server

06 Jun 2013 15:17:31,611 [DEBUG] ZkClient                 | Waiting for
keeper state SyncConnected

06 Jun 2013 15:17:32,787 [DEBUG] ZkClient                 | Received event:
WatchedEvent state:SyncConnected type:None path:null

06 Jun 2013 15:17:32,787 [INFO ] ZkClient                 | zookeeper state
changed (SyncConnected)

06 Jun 2013 15:17:32,787 [DEBUG] ZkClient                 | Leaving process
event

06 Jun 2013 15:17:32,787 [DEBUG] ZkClient                 | State is
SyncConnected


After this it throws NumberFormat exception -


java.lang.NumberFormatException: null

at java.lang.Integer.parseInt(Integer.java:417)

at java.lang.Integer.parseInt(Integer.java:499)

at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:231)

at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)

at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:171)

at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:171)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)

at scala.collection.Iterator$class.foreach(Iterator.scala:660)

at
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)

at
scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)

at
scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:615)

at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:171)

at
kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:167)

at scala.collection.Iterator$class.foreach(Iterator.scala:660)

at
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)

at
scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)

at
kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo(ZKBrokerPartitionInfo.scala:167)

at
kafka.producer.ZKBrokerPartitionInfo.<init>(ZKBrokerPartitionInfo.scala:66)

at kafka.producer.Producer.<init>(Producer.scala:53)

at kafka.javaapi.producer.Producer.<init>(Producer.scala:33)

at kafka.javaapi.producer.Producer.<init>(Producer.scala:40)

at
com.shn.analytics.shneventprocessor.KafkaTestClient.test(KafkaTestClient.java:86)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)

at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)

at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)

at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)

at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)

at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)

at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)

at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)

at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)

at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)

at org.junit.runners.ParentRunner.run(ParentRunner.java:236)

at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)

at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)

at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)

at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)

at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)

at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

Any idea why would this happen.

Thanks
Ashish

Reply via email to