I've started having problems with the latest version of the 0.8 branch. The test below has started failing. It was working fine with a prior version of 0.8, going back to Apr 30 (sha 988d4d8e65a14390abd748318a64e281e4a37c19).
I haven't figured out when exactly it started failing, but I saw it with a version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and now also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4). The test code is essentially this (assume a zk server is running with 'zkConnect', and a kafka broker running with a metadata port at 'port': Properties pProps = new Properties(); pProps.put("metadata.broker.list", "localhost:" + port); pProps.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig pConfig = new ProducerConfig(pProps); Producer<Integer, String> producer = new Producer<Integer, String>(pConfig); KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>("test-topic", "test-message"); producer.send(data); producer.close(); Properties cProps = new Properties(); cProps.put("zookeeper.connect", zkConnect); cProps.put("group.id", "group1"); ConsumerConfig consumerConfig = new ConsumerConfig(cProps); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1)); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test-topic"); final KafkaStream<byte[], byte[]> stream = streams.get(0); final ConsumerIterator<byte[], byte[]> iter = stream.iterator(); // run in a separate thread final AtomicBoolean success = new AtomicBoolean(false); Thread consumeThread = new Thread(new Runnable() { public void run() { while (iter.hasNext()) { byte[] msg = iter.next().message(); String msgStr = new String(msg); success.set(msgStr.equals("test-message")); break; } } }); consumeThread.start(); // this now hangs with the latest code consumeThread.join(); consumerConnector.shutdown(); assertTrue(success.get()); The output looks like this: 912 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic test-topic -> No partition metadata for topic test-topic due to kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class kafka.common.LeaderNotAvailableException 922 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic test-topic -> No partition metadata for topic test-topic due to kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class kafka.common.LeaderNotAvailableException 923 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test-topic 980 [kafka-request-handler-2] WARN kafka.server.HighwaterMarkCheckpoint - No highwatermark file is found. Returning 0 as the highwatermark for partition [test-topic,0] The consumer never receives a message, and so the test hangs.... This test worked fine as I said with an older version of the branch, but it would output exceptions about LeaderNotAvailable, etc... Thoughts? Jason