Subbu Srinivasan created KAFKA-973:
--------------------------------------

             Summary: Messages From Producer Not being Partitioned 
                 Key: KAFKA-973
                 URL: https://issues.apache.org/jira/browse/KAFKA-973
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.8
         Environment: Linux
            Reporter: Subbu Srinivasan
            Assignee: Neha Narkhede


I created a two node cluster.

2 zoo keepers
2 brokers
1 topic with replication factor (2) and no of partition 2.
my consumer group has two threads

1) From my Java client - I send few  messages to the topic. I have set multiple 
brokers
kafka2:9092,kafka1:9092.

Only one thread in my consumer always gets the messages. It looks like producer 
is not
partitioning the requests properly.

2) However if I send some sample using the simple console producer, I see 
multiple threads getting
requests and is load balanced.

What am I doing wrong in my client?


public class KafkaProducer {

          
          private final Properties props = new Properties();

          private static AtomicLong counter = new AtomicLong(0);
          kafka.javaapi.producer.Producer<Integer, String> producer = null;
          
          public KafkaProducer() 
          {
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("metadata.broker.list", 
ConfigurationUtility.getKafkaHost());
            producer = new kafka.javaapi.producer.Producer<Integer, String>(new 
ProducerConfig(props));
          } 
          
          public void sendMessage(String msg) throws Exception
          {
                  producer.send(new KeyedMessage<Integer, 
String>(ConfigurationUtility.getTopicName(), msg));
          }       
          
          
          public static void main(String arg[]) throws Exception
          {
                  
                  ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092");
                        ConfigurationUtility.setTopicName("dnslog");
                        
ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181");
                        ConfigurationUtility.setConsumerGroupId("dnslog");
                        
                        for(int i = 0 ; i < 2 ; ++i)
                        {
                                (new 
KafkaProducer()).sendMessage(UUID.randomUUID().toString());
                        }
          }
}




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to