I'm looking at implementing a synchronous auto offset commit solution. People have discussed the need for this in previous threads......Basically, in my consumer loop, I want to make sure a message has been actually processed before allowing it's offset to be committed. But I don't want to commit on every message, since that would be too expensive. So, I want to use the 'auto.commit.interval.ms' to periodically call commitOffsets, but only after a message is processed, but not after the next message has been issued via a call to 'next()' on the ConsumerIterator.
The builtin 'auto.commit.enable' feature unfortunately allows commits to happen on any message that has been returned via ConsumerIterator.next(). But if the consumer goes down before actually processing the message, or if it hangs indefinitely for some reason, then this message will get committed before it has actually been consumed successfully. I think there are issues with trying to implement this on top of the high-level consumer api. First, I need to worry about multiple threads consuming in the same connector (so for now I'm limiting this to support only 1 thread). Also, when shutting down the connector, I need to make sure any pending messages are committed before allowing the connector to shutdown. So, that seems easy enough to handle. One thing I'm more concerned with, is what happens when there's a consumer rebalance. Looking at the ZookeeperConsumerConnector code, it seems there are explicit calls to commitOffsets during the rebalance. I'm not sure how to handle that from the high-level api (and do I need to worry about that?). Thanks for any insight. Jason