nicktelford commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2854235081
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -667,6 +636,21 @@ private boolean isOverflowing(final long value) {
return value < 0;
}
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ try {
+ return cfAccessor.getCommitedOffset(dbAccessor, partition);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while getting committed
offset for partition " + partition, e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public boolean managesOffsets() {
+ return true;
Review Comment:
~I think we'll need to keep this as `false` until KIP-892 lands, because
until we're able to buffer writes between commits, there's no way to guarantee
that the committed offsets reflect the records written to the database.~
~To elaborate: between commits, new records are written to RocksDB. We can't
guarantee when those records will be written to disk by RocksDB (due to
background flushes), so if the application crashes between commits, some of the
records on disk might be newer than the most recently written offsets; this is
a problem even with atomic flush.~
~I still think we want this code, but we can't actually _use_ it until
KIP-892 lands, sadly~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]