Got it.
Thanks guys!
Chen

On Wed, Aug 13, 2014 at 9:35 AM, Neha Narkhede <neha.narkh...@gmail.com>
wrote:

> I am using consumer.timeout.ms to force a consumer jump out hasNext call,
> which will throw ConsumerTimeoutException.
>
> Yes, this is the downside of the blocking iterator approach. If you want to
> pull data in batches and process messages, the iterator is not the best API
> as it can block at any time longer than your app is comfortable with. To
> fix this and a bunch of other problems with the consumer API, we are
> working on a new consumer client, that will replace the existing one and
> will support completely new APIs.
>
> Take a look at the new APIs here
> <
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >.
> Let us know if you have feedback.
>
>
> On Tue, Aug 12, 2014 at 4:37 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Chen,
> >
> > The rational of using the consumer timeout exception is to indicate when
> > there is no more data to be consumed, and hence upon capturing the
> > exception the consumer should be closed.
> >
> > If you want to restart the consumer in handling the timeout exception,
> then
> > you should probably just increasing the timeout value in the configs to
> > avoid it throwing timeout exception.
> >
> > Guozhang
> >
> >
> > On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <chen.apache.s...@gmail.com>
> > wrote:
> >
> > > Folks,
> > > I am using consumer.timeout.ms to force a consumer jump out hasNext
> > call,
> > > which will throw ConsumerTimeoutException. It seems that upon receiving
> > > this exception, the consumer is no longer usable and I need to call
> > > .shutdown, and recreate:
> > >
> > > try{
> > > } catch (ConsumerTimeoutException ex) {
> > >
> > >  logger.info("consumer timeout, we consider the topic is drained");
> > >
> > >  this.consumer.shutdown();
> > >
> > > this.consumer = kafka.consumer.Consumer
> > >
> > >   .createJavaConsumerConnector(new ConsumerConfig(
> > >
> > >    this.consumerProperties));
> > >
> > >  }
> > >
> > >
> > > Is this the expected behavior? I call
> > >
> > > this.consumer = kafka.consumer.Consumer
> > >
> > >   .createJavaConsumerConnector(new ConsumerConfig(
> > >
> > >    this.consumerProperties));
> > >
> > > in the thread initialization phase, and hope to reuse it upon
> > > ConsumerTimeoutException
> > >
> > > Thanks,
> > >
> > > Chen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to