Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic.
The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put("metadata.broker.list", args[0]); props.put("zk.connect", args[1]); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>( config); finalEvent = new Timestamp(new Date().getTime()) + "|" + truckIds[0] + "|" + driverIds[0] + "|" + events[random .nextInt(evtCnt)] + "|" + getLatLong(arrayroute17[i]); try { KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, finalEvent); LOG.info("Sending Messge #: " + routeName[0] + ": " + i +", msg:" + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()){ System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Shutting down Thread: " + m_threadNumber); } } public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("consumer.timeout.ms", "-1"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } }