[ https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao closed KAFKA-973. ------------------------- > Messages From Producer Not being Partitioned > --------------------------------------------- > > Key: KAFKA-973 > URL: https://issues.apache.org/jira/browse/KAFKA-973 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8 > Environment: Linux > Reporter: Subbu Srinivasan > Assignee: Neha Narkhede > Labels: newbie > > I created a two node cluster. > 2 zoo keepers > 2 brokers > 1 topic with replication factor (2) and no of partition 2. > my consumer group has two threads > 1) From my Java client - I send few messages to the topic. I have set > multiple brokers > kafka2:9092,kafka1:9092. > Only one thread in my consumer always gets the messages. It looks like > producer is not > partitioning the requests properly. > 2) However if I send some sample using the simple console producer, I see > multiple threads getting > requests and is load balanced. > What am I doing wrong in my client? > public class KafkaProducer { > > private final Properties props = new Properties(); > private static AtomicLong counter = new AtomicLong(0); > kafka.javaapi.producer.Producer<Integer, String> producer = null; > > public KafkaProducer() > { > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("metadata.broker.list", > ConfigurationUtility.getKafkaHost()); > producer = new kafka.javaapi.producer.Producer<Integer, String>(new > ProducerConfig(props)); > } > > public void sendMessage(String msg) throws Exception > { > producer.send(new KeyedMessage<Integer, > String>(ConfigurationUtility.getTopicName(), msg)); > } > > > public static void main(String arg[]) throws Exception > { > > ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092"); > ConfigurationUtility.setTopicName("dnslog"); > > ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181"); > ConfigurationUtility.setConsumerGroupId("dnslog"); > > for(int i = 0 ; i < 2 ; ++i) > { > (new > KafkaProducer()).sendMessage(UUID.randomUUID().toString()); > } > } > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira