Hi all;

I'm new to kafka and wrote a simple multithreaded kafka consumer. when try
to consume the messages,It continuously throwing timeoutexception..How can
i get rid of this?

I have multiple topics.


*Executor*


public class MessageListener {

private Properties properties;


private ConsumerConnector consumerConnector;

private String topic;

private ExecutorService executor;


public MessageListener(String topic) {

this.topic = topic;


KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();

try {

properties = confLoader.loadConsumerConfig();

ConsumerConfig consumerConfig = new ConsumerConfig(properties);

consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

} catch (FileNotFoundException e) {

e.printStackTrace();

}

}


public void start(RawFile file) {


Map<String, Integer> topicCountMap = new HashMap<>();

topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));


Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector

.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);


for (KafkaStream<byte[], byte[]> stream : streams) {

executor.submit(new ListenerThread(stream));


}

}



}



*Thread*

public class ListenerThread implements Runnable {

private KafkaStream<byte[], byte[]> stream;;


public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {

this.stream = msgStream;


}


@Override

public void run() {

try {


ConsumerIterator<byte[], byte[]> it = stream.iterator();

while (it.hasNext()) {

MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();

String topic = messageAndMetadata.topic();

byte[] message = messageAndMetadata.message();

System.out.println("111111111111111111111111111");

FileProcessor processor = new FileProcessor();

processor.processFile(topic, message);

}

} catch (ConsumerTimeoutException cte) {

System.out.println("Consumer timed out");

}



Thanks.

-- 
-Ratha
http://vvratha.blogspot.com/

Reply via email to