Hi Chen, The rational of using the consumer timeout exception is to indicate when there is no more data to be consumed, and hence upon capturing the exception the consumer should be closed.
If you want to restart the consumer in handling the timeout exception, then you should probably just increasing the timeout value in the configs to avoid it throwing timeout exception. Guozhang On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <chen.apache.s...@gmail.com> wrote: > Folks, > I am using consumer.timeout.ms to force a consumer jump out hasNext call, > which will throw ConsumerTimeoutException. It seems that upon receiving > this exception, the consumer is no longer usable and I need to call > .shutdown, and recreate: > > try{ > } catch (ConsumerTimeoutException ex) { > > logger.info("consumer timeout, we consider the topic is drained"); > > this.consumer.shutdown(); > > this.consumer = kafka.consumer.Consumer > > .createJavaConsumerConnector(new ConsumerConfig( > > this.consumerProperties)); > > } > > > Is this the expected behavior? I call > > this.consumer = kafka.consumer.Consumer > > .createJavaConsumerConnector(new ConsumerConfig( > > this.consumerProperties)); > > in the thread initialization phase, and hope to reuse it upon > ConsumerTimeoutException > > Thanks, > > Chen > -- -- Guozhang