Forgot to mention, I'm using kafka 2.11 version On 24 March 2016 at 16:55, Ratha v <vijayara...@gmail.com> wrote:
> Hi all; > > I'm new to kafka and wrote a simple multithreaded kafka consumer. when try > to consume the messages,It continuously throwing timeoutexception..How can > i get rid of this? > > I have multiple topics. > > > *Executor* > > > public class MessageListener { > > private Properties properties; > > > private ConsumerConnector consumerConnector; > > private String topic; > > private ExecutorService executor; > > > public MessageListener(String topic) { > > this.topic = topic; > > > KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader(); > > try { > > properties = confLoader.loadConsumerConfig(); > > ConsumerConfig consumerConfig = new ConsumerConfig(properties); > > consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); > > } catch (FileNotFoundException e) { > > e.printStackTrace(); > > } > > } > > > public void start(RawFile file) { > > > Map<String, Integer> topicCountMap = new HashMap<>(); > > topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE)); > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumerConnector > > .createMessageStreams(topicCountMap); > > List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); > > executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE); > > > for (KafkaStream<byte[], byte[]> stream : streams) { > > executor.submit(new ListenerThread(stream)); > > > } > > } > > > > } > > > > *Thread* > > public class ListenerThread implements Runnable { > > private KafkaStream<byte[], byte[]> stream;; > > > public ListenerThread(KafkaStream<byte[], byte[]> msgStream) { > > this.stream = msgStream; > > > } > > > @Override > > public void run() { > > try { > > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > while (it.hasNext()) { > > MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext(); > > String topic = messageAndMetadata.topic(); > > byte[] message = messageAndMetadata.message(); > > System.out.println("111111111111111111111111111"); > > FileProcessor processor = new FileProcessor(); > > processor.processFile(topic, message); > > } > > } catch (ConsumerTimeoutException cte) { > > System.out.println("Consumer timed out"); > > } > > > > Thanks. > > -- > -Ratha > http://vvratha.blogspot.com/ > -- -Ratha http://vvratha.blogspot.com/