Hi Guozhang,
By explicitly stopping consumer, do you mean *System.exit(-1)* ?? Also, as
per the solution provided by you, the code loops over the ack variable..is
it correct? because even if ack received from writeToDB is true, there is a
possibility that next message may have come making ack vari
Hi,
In your case the exceptions thrown from the database operation may not stop
the consumer automatically, hence you may need to catch the exception and
explicitly stop consumer if you wanted it to behave so.
With iter.hasNext() the code becomes
while (iter.hasNext()) {
message = consumer.it
Also, is there any need of checking any acknowledgement variable? as on any
exception, while dealing with database, would make the consumer program
stop and hence *consumer.commit()* wouldn't have been called..right??
The above question is for single topic.Now, let's assume there are 5
topics. Fir
Thanks Guozhang!!
Below is the code for iterating over log messages:
.
.
for (final KafkaStream stream : streams) {
ConsumerIterator consumerIte =
stream.iterator();
Hi Anand,
You can use the high-level consumer and turn of auto.offset.commit, and do
sth. like:
message = consumer.iter.next();
bool acked = false
while (!acked) {
process(message)
acked = writeToDB();
}
consumer.commit()
Guozhang
On Fri, Aug 1, 2014 at 3:30 AM, anand jain wrote:
> I am ver
I am very much new to Kafka and we are using Kafka 0.8.1.
What I need to do is to consume a message from topic. For that, I will have
to write one consumer in Java which will consume a message from topic and
then save that message to database. After a message is saved, some
acknowledgement will be