hi Chen,
Maybe you can take some tips from kafka.tools.KafkaMigrationTool.java, it's 
also a multi-threads using case.
Reusing the same ConsumerConnector every time is ok. If you create 
ConsumerConnector repeatedly with the same consumer.id, the conflict will 
happen in ZK.

Yuanjia Li

From: Chen Wang
Date: 2014-12-11 04:19
To: users
Subject: Reuse ConsumerConnector
Hey Guys,
I have a user case that my thread reads from different kafka topic
periodically through a timer. The way I am reading from kafka in the timer
callback is the following:

try {

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector

  .createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streamList = consumerMap

  .get(kafkaTopic);


  int index = 0;

for (KafkaStream<byte[], byte[]> stream : streamList) {

  this.subscriberExecutor.execute(new Subscriber(stream,

   namespace, sendTopic, producerDataChannel, index));

  index++;

}

} catch (Exception ex) {

logger.error("failed to create message stream", ex);

}
As you can see, I am reusing consumerConnector, and doesn't call shutdown.
My question is,
should I create a new consumerConnector each time in my timer callback, and
shutdown the consumer when seeing ConsumerTimeoutException? Or reusing the
same ConsumerConnector every time is  ok? Is there any performance impact
for creating ConsumerConnector repeatedly?

When using the same consumer to read from the same topic twice, I am seeing:
I wrote this conflicted ephemeral node
[{"version":1,"subscription":{"My_topic_1":3},"pattern":"static","timestamp":"1418241613803"}]
at /consumers/my_subscriber/my_subscriber_dev-trgt02-1418241313609-bbc26e95
a while back in a different session, hence I will backoff for this node to
be deleted by Zookeeper and retry"

What does this message actually mean?

Thanks,
Chen

Reply via email to