here is the source code of the program package kafka.examples;
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.consumer.ConsumerIterator; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ExecutionException; import java.io.*; import java.util.Date; import java.text.DateFormat; import java.text.SimpleDateFormat; public class KafkaConsumerMultithread { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public static DateFormat dateFormat = new SimpleDateFormat("yyyyMMddhhmmss"); //get current date time with Date() public static Date date = new Date(); public static String filename = dateFormat.format(date)+"-topicname.csv"; public static long starttime = 0; public static long endtime = 0; public List<ConsumerPrintMessage> cpm_list; public static class ConsumerPrintMessage implements Runnable { ConsumerIterator<Message> it; int threadNumber; public long messageCount=0; ConsumerPrintMessage(KafkaStream<Message> stream, int threadNumber) { it = stream.iterator(); this.threadNumber = threadNumber; } public void run() { try { while(it.hasNext()) { System.out.println("\nThread Consuming Data : "+threadNumber+"\n"); String msg = ""; msg = ExampleUtils.getMessage(it.next().message()) ; this.messageCount++; if(System.currentTimeMillis() >= starttime+36000000) { System.out.println("\n\n Code to Send data to hdfs is commented - uncomment \n\n"); System.out.println("\n\nNext Hour Data \n\n"); date = new Date(); filename = dateFormat.format(date)+"-topicname.csv"; starttime = System.currentTimeMillis(); } } } catch(Exception ex) { ex.printStackTrace(); System.out.println("\n\t\t\tError : IO Exception : Unable to write data to temp.\n"); } } } public KafkaConsumerMultithread(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } /* To shutdown consumer and producer */ public void shutdown() { int count = 0; // get the total message count for(int i = 0; i<cpm_list.size();i++){ ConsumerPrintMessage cpm = cpm_list.get(i); count+=cpm.messageCount; } System.out.println("MessageCount : "+count); //System.out.println("consumer : "+consumer); //System.out.println("executer : "+executor); System.out.println("thread job"); System.out.println("end time :"+ ( System.currentTimeMillis() - starttime ) ); if (consumer != null) { consumer.shutdown(); } if (executor != null) { executor.shutdown(); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<Message>> streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; //Future< ? > fut_1 = null; starttime = System.currentTimeMillis(); cpm_list = new ArrayList<ConsumerPrintMessage>(); for (final KafkaStream<Message> stream : streams) { //fut_1 = ConsumerPrintMessage cx = new ConsumerPrintMessage(stream, threadNumber); cpm_list.add(cx); executor.submit(cx); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); //props.put("broker.list", "1:192.168.0.46:9092,2:192.168.0.47:9092"); props.put("zk.connect", a_zookeeper); props.put("groupid", a_groupId); props.put("zk.sessiontimeout.ms", "400"); props.put("zk.synctime.ms", "200"); props.put("autocommit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { /* Note : This part specifies your consumer properties - dynamic Uncomment following lines for specifying values dynamically execution : KafkaConsumerMultithread <zk: arg[0]> <gid: arg[1]> <topic: arg[2]> <num_threads: arg[3]> */ /* String zooKeeper = args[0]; // First argument is your zookeer connection address String groupId = args[1]; // Second argument will be your groupid String topic = args[2]; // Third argument topic to consume int threads = Integer.parseInt(args[3]); // Fourth argument Number of threads to run */ /* Note : This part specifies your consumer properties - hard coded */ String zooKeeper = "zoo1:2181,zoo2:2181,zoo3:2181"; String groupId = "group1"; String topic = "topic1"; int threads = Integer.parseInt(args[4]); //Process mktemp = Runtime.getRuntime().exec("mkdir temp"); KafkaConsumerMultithread example = new KafkaConsumerMultithread(zooKeeper, groupId, topic); example.run(threads); try { System.out.println("main thread going to sleep for 10 seconds"); Thread.sleep(900); } catch (InterruptedException ie) { System.out.println("\n\nNote : Execution Interrupted..!!\n\n"); } System.out.println("main thread woke up"); example.shutdown(); } } On Mon, Jul 8, 2013 at 8:43 PM, Anurup Raveendran < anurup.raveend...@fluturasolutions.com> wrote: > 305 bytes > > > On Mon, Jul 8, 2013 at 8:41 PM, Tom Brown <tombrow...@gmail.com> wrote: > >> What is the size of each message? >> >> --Tom >> >> >> On Mon, Jul 8, 2013 at 9:04 AM, Anurup Raveendran < >> anurup.raveend...@fluturasolutions.com> wrote: >> >> > I have 2 kafka brokers running on two systems with the same >> configuration >> > >> > CPU - Dual Core >> > RAM - 4 GB >> > >> > I'm trying to benchmark my kafka setup >> > Number of messages - 10000 >> > >> > for 2 partitions & 2 threads configuration - consumer consumes in 1.175 >> > seconds >> > >> > for 1 partition & 1 thread configuration - consumer consumes in 1.253 >> > seconds >> > >> > Is there some specific configuration which would enable better >> throughput? >> > >> > >> > >> > >> > >