I am executing a simple code like this: public class FirstKafkaTester { public Producer<Integer, String> initProducer(){ Properties props = new Properties(); props.put("zk.connect", "127.0.0.1:2181"); props.put("broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); return new Producer<Integer, String>(new ProducerConfig(props)); }
public void executeProducer(String topic){ int messageNo = 1; Producer<Integer, String> producer = initProducer(); String messageStr = new String("Message_" + messageNo); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); } public static void main(String[] args) { FirstKafkaTester firstKafkaTester = new FirstKafkaTester(); firstKafkaTester.executeProducer("topic_1"); } } I use Kafka 0.8. I put manually to the maven in my local repository and add dependency of yammer <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> I got this exception: 2013-04-04 11:09:56,909 WARN async.DefaultEventHandler (Logging.scala:warn(89)) - Failed to send producer request with correlation id 2 to broker 3 with data for partitions [topic_1,0] java.lang.NoSuchMethodError: com.yammer.metrics.core.TimerContext.stop()J at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:243) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:105) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:74) at kafka.javaapi.producer.Producer.send(Producer.scala:32) at kafka_eval.FirstKafkaTester.executeProducer(FirstKafkaTester.java:26) at kafka_eval.FirstKafkaTester.main(FirstKafkaTester.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) Is this something wrong that I am doing? What should I do to resolve the issue. Thanks Oleg.