Got it. Thanks guys! Chen
On Wed, Aug 13, 2014 at 9:35 AM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > I am using consumer.timeout.ms to force a consumer jump out hasNext call, > which will throw ConsumerTimeoutException. > > Yes, this is the downside of the blocking iterator approach. If you want to > pull data in batches and process messages, the iterator is not the best API > as it can block at any time longer than your app is comfortable with. To > fix this and a bunch of other problems with the consumer API, we are > working on a new consumer client, that will replace the existing one and > will support completely new APIs. > > Take a look at the new APIs here > < > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html > >. > Let us know if you have feedback. > > > On Tue, Aug 12, 2014 at 4:37 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Chen, > > > > The rational of using the consumer timeout exception is to indicate when > > there is no more data to be consumed, and hence upon capturing the > > exception the consumer should be closed. > > > > If you want to restart the consumer in handling the timeout exception, > then > > you should probably just increasing the timeout value in the configs to > > avoid it throwing timeout exception. > > > > Guozhang > > > > > > On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang <chen.apache.s...@gmail.com> > > wrote: > > > > > Folks, > > > I am using consumer.timeout.ms to force a consumer jump out hasNext > > call, > > > which will throw ConsumerTimeoutException. It seems that upon receiving > > > this exception, the consumer is no longer usable and I need to call > > > .shutdown, and recreate: > > > > > > try{ > > > } catch (ConsumerTimeoutException ex) { > > > > > > logger.info("consumer timeout, we consider the topic is drained"); > > > > > > this.consumer.shutdown(); > > > > > > this.consumer = kafka.consumer.Consumer > > > > > > .createJavaConsumerConnector(new ConsumerConfig( > > > > > > this.consumerProperties)); > > > > > > } > > > > > > > > > Is this the expected behavior? I call > > > > > > this.consumer = kafka.consumer.Consumer > > > > > > .createJavaConsumerConnector(new ConsumerConfig( > > > > > > this.consumerProperties)); > > > > > > in the thread initialization phase, and hope to reuse it upon > > > ConsumerTimeoutException > > > > > > Thanks, > > > > > > Chen > > > > > > > > > > > -- > > -- Guozhang > > >