[ https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13704434#comment-13704434 ]
Chris Curtin commented on KAFKA-966: ------------------------------------ Not to be dense, but wouldn't managing the offsets that way remove the ability to easily multi-thread the consumer? The commitOffsets method is on the ConsumerConnector not the KafkaStream, so to do a multi-threaded client I'd need to write logic to checkpoint all the threads to make sure they are all okay before committing back to Kafka. commitOffsets would also require that all the messages on all partitions succeed or be rolled back together, so a failure on one message could stop everything. In a multi-partition model where the partitions end up in different Shards, databases etc. that makes the consumer a lot more complicated. > Allow high level consumer to 'nak' a message and force Kafka to close the > KafkaStream without losing that message > ----------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-966 > URL: https://issues.apache.org/jira/browse/KAFKA-966 > Project: Kafka > Issue Type: Improvement > Components: consumer > Affects Versions: 0.8 > Reporter: Chris Curtin > Assignee: Neha Narkhede > Priority: Minor > > Enhancement request. > The high level consumer is very close to handling a lot of situations a > 'typical' client would need. Except for when the message received from Kafka > is valid, but the business logic that wants to consume it has a problem. > For example if I want to write the value to a MongoDB or Cassandra database > and the database is not available. I won't know until I go to do the write > that the database isn't available, but by then it is too late to NOT read the > message from Kafka. Thus if I call shutdown() to stop reading, that message > is lost since the offset Kafka writes to ZooKeeper is the next offset. > Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the > next offset to read for this partition to this message when I start up again. > And if there are any messages in the BlockingQueue for other partitions, find > the lowest # and use it for that partitions offset since I haven't consumed > them yet. > Thus I can cleanly shutdown my processing, resolve whatever the issue is and > restart the process. > Another idea might be to allow a 'peek' into the next message and if I > succeed in writing to the database call 'next' to remove it from the queue. > I understand this won't deal with a 'kill -9' or hard failure of the JVM > leading to the latest offsets not being written to ZooKeeper but it addresses > a likely common scenario for consumers. Nor will it add true transactional > support since the ZK update could fail. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira