Jason,
Did you update your config file with the new name of the zookeeper settings: It was renamed from zk.connect to zookeeper.connect. You should check all of the settings because other setting names have changed as well. Cheers, Eric Sites On 6/16/13 5:14 PM, "Jason Rosenberg" <j...@squareup.com> wrote: >I've started having problems with the latest version of the 0.8 branch. > The test below has started failing. It was working fine with a prior >version of 0.8, going back to Apr 30 >(sha 988d4d8e65a14390abd748318a64e281e4a37c19). > >I haven't figured out when exactly it started failing, but I saw it with a >version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and now >also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4). > >The test code is essentially this (assume a zk server is running with >'zkConnect', and a kafka broker running with a metadata port at 'port': > > Properties pProps = new Properties(); > pProps.put("metadata.broker.list", "localhost:" + port); > pProps.put("serializer.class", "kafka.serializer.StringEncoder"); > ProducerConfig pConfig = new ProducerConfig(pProps); > Producer<Integer, String> producer = new Producer<Integer, >String>(pConfig); > KeyedMessage<Integer, String> data = > new KeyedMessage<Integer, String>("test-topic", "test-message"); > producer.send(data); > producer.close(); > > Properties cProps = new Properties(); > cProps.put("zookeeper.connect", zkConnect); > cProps.put("group.id", "group1"); > ConsumerConfig consumerConfig = new ConsumerConfig(cProps); > ConsumerConnector consumerConnector = >Consumer.createJavaConsumerConnector(consumerConfig); > > Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = > >consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1)); > List<KafkaStream<byte[], byte[]>> streams = >topicMessageStreams.get("test-topic"); > final KafkaStream<byte[], byte[]> stream = streams.get(0); > final ConsumerIterator<byte[], byte[]> iter = stream.iterator(); > > // run in a separate thread > final AtomicBoolean success = new AtomicBoolean(false); > Thread consumeThread = new Thread(new Runnable() { > public void run() { > while (iter.hasNext()) { > byte[] msg = iter.next().message(); > String msgStr = new String(msg); > success.set(msgStr.equals("test-message")); > break; > } > } > }); > > consumeThread.start(); > // this now hangs with the latest code > consumeThread.join(); > > consumerConnector.shutdown(); > assertTrue(success.get()); > >The output looks like this: > >912 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching >metadata [{TopicMetadata for topic test-topic -> >No partition metadata for topic test-topic due to >kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class >kafka.common.LeaderNotAvailableException >922 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching >metadata [{TopicMetadata for topic test-topic -> >No partition metadata for topic test-topic due to >kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class >kafka.common.LeaderNotAvailableException >923 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to >collate messages by topic, partition due to: Failed to fetch topic >metadata >for topic: test-topic >980 [kafka-request-handler-2] WARN kafka.server.HighwaterMarkCheckpoint - >No highwatermark file is found. Returning 0 as the highwatermark for >partition [test-topic,0] > >The consumer never receives a message, and so the test hangs.... > >This test worked fine as I said with an older version of the branch, but >it >would output exceptions about LeaderNotAvailable, etc... > >Thoughts? > >Jason