hi Chen, Maybe you can take some tips from kafka.tools.KafkaMigrationTool.java, it's also a multi-threads using case. Reusing the same ConsumerConnector every time is ok. If you create ConsumerConnector repeatedly with the same consumer.id, the conflict will happen in ZK.
Yuanjia Li From: Chen Wang Date: 2014-12-11 04:19 To: users Subject: Reuse ConsumerConnector Hey Guys, I have a user case that my thread reads from different kafka topic periodically through a timer. The way I am reading from kafka in the timer callback is the following: try { Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streamList = consumerMap .get(kafkaTopic); int index = 0; for (KafkaStream<byte[], byte[]> stream : streamList) { this.subscriberExecutor.execute(new Subscriber(stream, namespace, sendTopic, producerDataChannel, index)); index++; } } catch (Exception ex) { logger.error("failed to create message stream", ex); } As you can see, I am reusing consumerConnector, and doesn't call shutdown. My question is, should I create a new consumerConnector each time in my timer callback, and shutdown the consumer when seeing ConsumerTimeoutException? Or reusing the same ConsumerConnector every time is ok? Is there any performance impact for creating ConsumerConnector repeatedly? When using the same consumer to read from the same topic twice, I am seeing: I wrote this conflicted ephemeral node [{"version":1,"subscription":{"My_topic_1":3},"pattern":"static","timestamp":"1418241613803"}] at /consumers/my_subscriber/my_subscriber_dev-trgt02-1418241313609-bbc26e95 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry" What does this message actually mean? Thanks, Chen