Hello, I wrote a simple junit test to test a Kafka producer.
public class KafkaProducerTest { private int brokerId = 0; private String topic = "test"; @Test public void producerTest() throws InterruptedException { // setup Zookeeper String zkConnect = TestZKUtils.zookeeperConnect(); EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); ZkClient zkClient = new ZkClient(zkServer.connectString()); // setup Broker int port = TestUtils.choosePort(); Properties props = TestUtils.createBrokerConfig(brokerId, port); KafkaConfig config = new KafkaConfig(props); Time mock = new MockTime(); KafkaServer kafkaServer = TestUtils.createServer(config, mock); // create topic AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties()); // setup producer Properties properties = TestUtils.getProducerConfig("localhost:" + port, "kafka.producer.DefaultPartitioner"); ProducerConfig pConfig = new ProducerConfig(properties); Producer producer = new Producer(pConfig); // send message KeyedMessage<Integer, String> data = new KeyedMessage(topic, "test-message"); List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); messages.add(data); producer.send(scala.collection.JavaConversions.asBuffer(messages)); // cleanup producer.close(); kafkaServer.shutdown(); zkClient.close(); zkServer.shutdown(); } } However when I run the test I get the following error messages [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler:97) [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler:97) [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler:97) [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler:97) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9 0) at kafka.producer.Producer.send(Producer.scala:74) at de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest. java:57) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5 7) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp l.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod .java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable. java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j ava:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja va:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja va:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja va:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest Runner.java:77) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt arter.java:195) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5 7) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97) I tried to write my unit test following the scala unit tests in the kafka core. But it seems like I'm still missing something basic to make it work. Can someone help me with that? I'm developing on Mac OS X 10.8.3, and compiled the latest Kafka (plus the TestUtils) from the git repository using Scala 2.9.2. Regards, Andreas Maier