nicktelford commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2854587001


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -667,6 +636,21 @@ private boolean isOverflowing(final long value) {
         return value < 0;
     }
 
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        try {
+            return cfAccessor.getCommitedOffset(dbAccessor, partition);
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while getting committed 
offset for partition " + partition, e);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public boolean managesOffsets() {
+        return true;

Review Comment:
   The key issue is that, at present, under EOS we detect crashes and wipe 
state by looking for the absence of the `.checkpoint` file. Obviously with 
offsets stored in the store itself, we can't use this strategy.
   
   I wonder if we can emulate it somehow by having `ProcessorStateManager` 
write a `DIRTY` file during initialization, and only deleting it during a clean 
shutdown? That should resolve this issue, and enable us to use StateStore 
managed offsets prior to KIP-892 landing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to