I am using consumer.timeout.ms to force a consumer jump out hasNext call,
which will throw ConsumerTimeoutException.

Yes, this is the downside of the blocking iterator approach. If you want to
pull data in batches and process messages, the iterator is not the best API
as it can block at any time longer than your app is comfortable with. To
fix this and a bunch of other problems with the consumer API, we are
working on a new consumer client, that will replace the existing one and
will support completely new APIs.

Take a look at the new APIs here
<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html>.
Let us know if you have feedback.


On Tue, Aug 12, 2014 at 4:37 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Chen,
>
> The rational of using the consumer timeout exception is to indicate when
> there is no more data to be consumed, and hence upon capturing the
> exception the consumer should be closed.
>
> If you want to restart the consumer in handling the timeout exception, then
> you should probably just increasing the timeout value in the configs to
> avoid it throwing timeout exception.
>
> Guozhang
>
>
> On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <chen.apache.s...@gmail.com>
> wrote:
>
> > Folks,
> > I am using consumer.timeout.ms to force a consumer jump out hasNext
> call,
> > which will throw ConsumerTimeoutException. It seems that upon receiving
> > this exception, the consumer is no longer usable and I need to call
> > .shutdown, and recreate:
> >
> > try{
> > } catch (ConsumerTimeoutException ex) {
> >
> >  logger.info("consumer timeout, we consider the topic is drained");
> >
> >  this.consumer.shutdown();
> >
> > this.consumer = kafka.consumer.Consumer
> >
> >   .createJavaConsumerConnector(new ConsumerConfig(
> >
> >    this.consumerProperties));
> >
> >  }
> >
> >
> > Is this the expected behavior? I call
> >
> > this.consumer = kafka.consumer.Consumer
> >
> >   .createJavaConsumerConnector(new ConsumerConfig(
> >
> >    this.consumerProperties));
> >
> > in the thread initialization phase, and hope to reuse it upon
> > ConsumerTimeoutException
> >
> > Thanks,
> >
> > Chen
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to