Re: Reading messages offset in Apache Kafka

2014-08-04 Thread anand jain
Hi Guozhang, By explicitly stopping consumer, do you mean *System.exit(-1)* ?? Also, as per the solution provided by you, the code loops over the ack variable..is it correct? because even if ack received from writeToDB is true, there is a possibility that next message may have come making ack vari

Re: Reading messages offset in Apache Kafka

2014-08-04 Thread Guozhang Wang
Hi, In your case the exceptions thrown from the database operation may not stop the consumer automatically, hence you may need to catch the exception and explicitly stop consumer if you wanted it to behave so. With iter.hasNext() the code becomes while (iter.hasNext()) { message = consumer.it

Re: Reading messages offset in Apache Kafka

2014-08-04 Thread anand jain
Also, is there any need of checking any acknowledgement variable? as on any exception, while dealing with database, would make the consumer program stop and hence *consumer.commit()* wouldn't have been called..right?? The above question is for single topic.Now, let's assume there are 5 topics. Fir

Re: Reading messages offset in Apache Kafka

2014-08-04 Thread anand jain
Thanks Guozhang!! Below is the code for iterating over log messages: . . for (final KafkaStream stream : streams) { ConsumerIterator consumerIte = stream.iterator();

Re: Reading messages offset in Apache Kafka

2014-08-01 Thread Guozhang Wang
Hi Anand, You can use the high-level consumer and turn of auto.offset.commit, and do sth. like: message = consumer.iter.next(); bool acked = false while (!acked) { process(message) acked = writeToDB(); } consumer.commit() Guozhang On Fri, Aug 1, 2014 at 3:30 AM, anand jain wrote: > I am ver