Jason,

Did you update your config file with the new name of the zookeeper
settings:

It was renamed from zk.connect to zookeeper.connect.

You should check all of the settings because other setting names have
changed as well.

Cheers,
Eric Sites

On 6/16/13 5:14 PM, "Jason Rosenberg" <j...@squareup.com> wrote:

>I've started having problems with the latest version of the 0.8 branch.
> The test below has started failing.  It was working fine with a prior
>version of 0.8, going back to Apr 30
>(sha 988d4d8e65a14390abd748318a64e281e4a37c19).
>
>I haven't figured out when exactly it started failing, but I saw it with a
>version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and now
>also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4).
>
>The test code is essentially this (assume a zk server is running with
>'zkConnect', and a kafka broker running with a metadata port at 'port':
>
>    Properties pProps = new Properties();
>    pProps.put("metadata.broker.list", "localhost:" + port);
>    pProps.put("serializer.class", "kafka.serializer.StringEncoder");
>    ProducerConfig pConfig = new ProducerConfig(pProps);
>    Producer<Integer, String> producer = new Producer<Integer,
>String>(pConfig);
>    KeyedMessage<Integer, String> data =
>        new KeyedMessage<Integer, String>("test-topic", "test-message");
>    producer.send(data);
>    producer.close();
>
>    Properties cProps = new Properties();
>    cProps.put("zookeeper.connect", zkConnect);
>    cProps.put("group.id", "group1");
>    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
>    ConsumerConnector consumerConnector =
>Consumer.createJavaConsumerConnector(consumerConfig);
>
>    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
>
>consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1));
>    List<KafkaStream<byte[], byte[]>> streams =
>topicMessageStreams.get("test-topic");
>    final KafkaStream<byte[], byte[]> stream = streams.get(0);
>    final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
>
>    // run in a separate thread
>    final AtomicBoolean success = new AtomicBoolean(false);
>    Thread consumeThread = new Thread(new Runnable() {
>      public void run() {
>        while (iter.hasNext()) {
>          byte[] msg = iter.next().message();
>          String msgStr = new String(msg);
>          success.set(msgStr.equals("test-message"));
>          break;
>        }
>      }
>    });
>
>    consumeThread.start();
>    // this now hangs with the latest code
>    consumeThread.join();
>
>    consumerConnector.shutdown();
>    assertTrue(success.get());
>
>The output looks like this:
>
>912 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
>metadata [{TopicMetadata for topic test-topic ->
>No partition metadata for topic test-topic due to
>kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
>kafka.common.LeaderNotAvailableException
>922 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
>metadata [{TopicMetadata for topic test-topic ->
>No partition metadata for topic test-topic due to
>kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
>kafka.common.LeaderNotAvailableException
>923 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
>collate messages by topic, partition due to: Failed to fetch topic
>metadata
>for topic: test-topic
>980 [kafka-request-handler-2] WARN kafka.server.HighwaterMarkCheckpoint  -
>No highwatermark file is found. Returning 0 as the highwatermark for
>partition [test-topic,0]
>
>The consumer never receives a message, and so the test hangs....
>
>This test worked fine as I said with an older version of the branch, but
>it
>would output exceptions about LeaderNotAvailable, etc...
>
>Thoughts?
>
>Jason

Reply via email to