Hi all; I'm new to kafka and wrote a simple multithreaded kafka consumer. when try to consume the messages,It continuously throwing timeoutexception..How can i get rid of this?
I have multiple topics. *Executor* public class MessageListener { private Properties properties; private ConsumerConnector consumerConnector; private String topic; private ExecutorService executor; public MessageListener(String topic) { this.topic = topic; KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader(); try { properties = confLoader.loadConsumerConfig(); ConsumerConfig consumerConfig = new ConsumerConfig(properties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); } catch (FileNotFoundException e) { e.printStackTrace(); } } public void start(RawFile file) { Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE); for (KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ListenerThread(stream)); } } } *Thread* public class ListenerThread implements Runnable { private KafkaStream<byte[], byte[]> stream;; public ListenerThread(KafkaStream<byte[], byte[]> msgStream) { this.stream = msgStream; } @Override public void run() { try { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext(); String topic = messageAndMetadata.topic(); byte[] message = messageAndMetadata.message(); System.out.println("111111111111111111111111111"); FileProcessor processor = new FileProcessor(); processor.processFile(topic, message); } } catch (ConsumerTimeoutException cte) { System.out.println("Consumer timed out"); } Thanks. -- -Ratha http://vvratha.blogspot.com/