*Kafka: *Apache Kafka 0.8.1.1
*SImplePartitioner.java* public int partition(Object key, int a_numPartitions) { int partition = Integer.parseInt((String)key); LOG.debug("SimplePartitioner Partion: " + partition); return partition; } On Sun, Dec 21, 2014 at 10:54 PM, Pramod Deshmukh <dpram...@gmail.com> wrote: > I have a requirement to prove kafka producer can produce 1 million > events/second to Kafka cluster. > > So far, best I could achieve is 200k events/sec on topic with 2 > partitions. The latency increases with adding more partitions so I want to > test with 2 partitions for now. > > Below are the details along with produce code (java). How can I achieve > produce 1million event/sec.? I went thru kafka benchmarking blog as well. > > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > > *Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs), > 64GB RAM. > *Broker:* Allocated 6GB, 16 io.threads, 8 network threads. > *Topic: 2* partition, replication factor of 1 (Get high latency) > *Zookeepers: *3 zk instances running individually on master nodes (not > co-located with kafka broker/servers) > > > *Producer Code:* > public class TestProducer { > > private static String msg = "TEST KAFKA PERFORMANCE"; > private static Logger LOG = Logger.getLogger(TestProducer.class); > > public static void main(String... args){ > System.out.println("START - Test Producer"); > > long messageCount = Long.parseLong(args[0]); > long messageCountForStat = Long.parseLong(args[0]); > String topic = args[1]; > String brokerList = args[2]; > int batchCount = Integer.parseInt(args[3]); > int topicPartions = Integer.parseInt(args[4]); > Producer<String, String> producer = getProducer(brokerList, > batchCount); > Date startTime = new Date(System.currentTimeMillis()); > Random rnd = new Random(); > String partition = ""; > //Produce messages. > while (messageCount != 0) { > partition = ""+(int)messageCount%topicPartions; > KeyedMessage<String, String> message = > new KeyedMessage<String, String>(topic, partition, > msg); > producer.send(message); > messageCount--; > } > > Date endTime = new Date(System.currentTimeMillis()); > System.out.println("#########################################"); > System.out.println("MESSAGES SENT: " + messageCountForStat); > System.out.println("START TIME: " + startTime); > System.out.println("END TIME: " + endTime); > System.out.println("#########################################"); > System.out.println("END - Test Producer"); > } > > public static Producer<String, String> getProducer(String brokerList, > int batchSize) { > > props.put("metadata.broker.list", brokerList); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("partitioner.class", "com.my.SimplePartitioner"); > props.put("request.required.acks", "0"); > props.put("producer.type", "async"); > props.put("compression.codec", "snappy"); > props.put("batch.num.messages", Integer.toString(batchSize)); > > ProducerConfig config = new ProducerConfig(props); > > Producer<String, String> producer = new Producer<String, > String>(config); > return producer; > } > > } >