I should probably have been a little more clear what I'm asking, so here's an example.
Let's say that I have a consumer on a topic partition, and I'm doing manual commits. Because I'm doing manual commits and the messages are processed asynchronously from the consumer poll loop, I keep track of which offsets I have received from Kafka, and mark them as done once they have been processed. I don't commit offset X until all messages X-1, X-2... have been marked as done. Say that I've previously received offsets 0-10 and realize that I need to retry offset 2. I seek the consumer to offset 2 on that partition. I poll once and check the resulting records, and find that the earliest received message has offset 5. Is it now correct to assume that offset 2-4 must have been compacted away, so I should forget about those messages, stop trying to fetch them and mark them as done in my offset tracker? 2017-11-03 19:50 GMT+01:00 Stig Rohde Døssing <stigdoess...@gmail.com>: > Hi, > > I'm working on the Kafka connector for Apache Storm, which pulls messages > from Kafka and emits them into a Storm topology. The connector uses manual > offset control since message processing happens asynchronously to pulling > messages from Kafka, and we hit an issue a while back related to topic > compaction. I think we can solve it, but I'd like confirmation that the way > we're going about it isn't wrong. > > The connector keeps track of which offsets have been emitted into the > topology, along with other information such as how many times they've been > retried. When an offset should be retried the connector fetches the message > from Kafka again (it is not kept in-memory once emitted). We only clean up > the state for an offset once it is fully processed. > > The issue we hit is that if topic compaction is enabled, we need to know > that the offset is no longer available so we can delete the corresponding > state. Would the approach described here https://issues.apache.org/jira > /browse/STORM-2546?focusedCommentId=16151172&page=com. > atlassian.jira.plugin.system.issuetabpanels:comment- > tabpanel#comment-16151172 be reasonable for this, or is there another way > to check if an offset has been deleted? > > Thanks. >