Hi Chen,

The rational of using the consumer timeout exception is to indicate when
there is no more data to be consumed, and hence upon capturing the
exception the consumer should be closed.

If you want to restart the consumer in handling the timeout exception, then
you should probably just increasing the timeout value in the configs to
avoid it throwing timeout exception.

Guozhang


On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <chen.apache.s...@gmail.com>
wrote:

> Folks,
> I am using consumer.timeout.ms to force a consumer jump out hasNext call,
> which will throw ConsumerTimeoutException. It seems that upon receiving
> this exception, the consumer is no longer usable and I need to call
> .shutdown, and recreate:
>
> try{
> } catch (ConsumerTimeoutException ex) {
>
>  logger.info("consumer timeout, we consider the topic is drained");
>
>  this.consumer.shutdown();
>
> this.consumer = kafka.consumer.Consumer
>
>   .createJavaConsumerConnector(new ConsumerConfig(
>
>    this.consumerProperties));
>
>  }
>
>
> Is this the expected behavior? I call
>
> this.consumer = kafka.consumer.Consumer
>
>   .createJavaConsumerConnector(new ConsumerConfig(
>
>    this.consumerProperties));
>
> in the thread initialization phase, and hope to reuse it upon
> ConsumerTimeoutException
>
> Thanks,
>
> Chen
>



-- 
-- Guozhang

Reply via email to