I am executing a simple code like this:

public class FirstKafkaTester {
    public Producer<Integer, String> initProducer(){
        Properties props = new Properties();
        props.put("zk.connect", "127.0.0.1:2181");
        props.put("broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        return new Producer<Integer, String>(new ProducerConfig(props));
    }

    public void executeProducer(String topic){
        int messageNo = 1;
        Producer<Integer, String> producer = initProducer();
        String messageStr = new String("Message_" + messageNo);
        producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
    }
    public static void main(String[] args) {
        FirstKafkaTester firstKafkaTester = new FirstKafkaTester();
        firstKafkaTester.executeProducer("topic_1");
    }
}
I use Kafka 0.8. I put manually to the maven in my local repository and add
dependency of yammer

        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>

I got this exception:

2013-04-04 11:09:56,909 WARN  async.DefaultEventHandler
(Logging.scala:warn(89)) - Failed to send producer request with correlation
id 2 to broker 3 with data for partitions [topic_1,0]
java.lang.NoSuchMethodError: com.yammer.metrics.core.TimerContext.stop()J
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36)
 at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
 at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:243)
 at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:105)
 at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:99)
 at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
 at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
 at scala.collection.Iterator$class.foreach(Iterator.scala:631)
 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
 at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
 at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
 at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
 at kafka.producer.Producer.send(Producer.scala:74)
 at kafka.javaapi.producer.Producer.send(Producer.scala:32)
 at kafka_eval.FirstKafkaTester.executeProducer(FirstKafkaTester.java:26)
 at kafka_eval.FirstKafkaTester.main(FirstKafkaTester.java:34)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

Is this something wrong that I am doing?
What should I do to resolve the issue.

Thanks
Oleg.

Reply via email to