Folks,
I am using consumer.timeout.ms to force a consumer jump out hasNext call,
which will throw ConsumerTimeoutException. It seems that upon receiving
this exception, the consumer is no longer usable and I need to call
.shutdown, and recreate:

try{
} catch (ConsumerTimeoutException ex) {

 logger.info("consumer timeout, we consider the topic is drained");

 this.consumer.shutdown();

this.consumer = kafka.consumer.Consumer

  .createJavaConsumerConnector(new ConsumerConfig(

   this.consumerProperties));

 }


Is this the expected behavior? I call

this.consumer = kafka.consumer.Consumer

  .createJavaConsumerConnector(new ConsumerConfig(

   this.consumerProperties));

in the thread initialization phase, and hope to reuse it upon
ConsumerTimeoutException

Thanks,

Chen

Reply via email to