Enrico Olivelli created KAFKA-3041:
--------------------------------------

             Summary: NullPointerException in new Consumer API on broker restart
                 Key: KAFKA-3041
                 URL: https://issues.apache.org/jira/browse/KAFKA-3041
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.9.0.0
            Reporter: Enrico Olivelli
            Assignee: Neha Narkhede
            Priority: Blocker


I 'm unning a brand new Kafka cluster (version 0.9.0.0). During my tests I 
noticed this error at Consumer.partitionsFor during a full cluster restart.
My DEV cluster is made of 4 brokers

I cannot reproduce the problem consistently but it heppens sometimes during the 
restart of the brokers

This is my code:

       this.properties = new Properties();
        properties.put("bootstrap.servers", "list of servers"));
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("group.id", "test");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");

       String topic = "xxx”;
        try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(properties);) {
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
             ….
        }

This is the error:
java.lang.NullPointerException
                at 
org.apache.kafka.common.requests.MetadataResponse.<init>(MetadataResponse.java:130)
                at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:203)
                at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1143)
                at 
magnews.datastream.KafkaDataStreamConsumer.fetchNewData(KafkaDataStreamConsumer.java:44




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to