Folks, I am using consumer.timeout.ms to force a consumer jump out hasNext call, which will throw ConsumerTimeoutException. It seems that upon receiving this exception, the consumer is no longer usable and I need to call .shutdown, and recreate:
try{ } catch (ConsumerTimeoutException ex) { logger.info("consumer timeout, we consider the topic is drained"); this.consumer.shutdown(); this.consumer = kafka.consumer.Consumer .createJavaConsumerConnector(new ConsumerConfig( this.consumerProperties)); } Is this the expected behavior? I call this.consumer = kafka.consumer.Consumer .createJavaConsumerConnector(new ConsumerConfig( this.consumerProperties)); in the thread initialization phase, and hope to reuse it upon ConsumerTimeoutException Thanks, Chen