vcrfxia commented on code in PR #13431: URL: https://github.com/apache/kafka/pull/13431#discussion_r1143859781
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -821,4 +823,20 @@ public Options getOptions() { public Position getPosition() { return position; } + + /** + * Same as {@link Bytes#increment(Bytes)} but {@code null} is returned instead of throwing + * {@code IndexOutOfBoundsException} in the event of overflow. + * + * @param input bytes to increment + * @return A new copy of the incremented byte array, or {@code null} if incrementing would + * result in overflow. + */ + static Bytes incrementWithoutOverflow(final Bytes input) { Review Comment: Now that negative segment ID is allowed (only for reserved segments within `LogicalKeyValueSegments`), it's possible that a valid segment ID will overflow when incremented, and we want to handle this gracefully instead of throwing an exception when it happens. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java: ########## @@ -16,15 +16,33 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +/** + * A {@link Segments} implementation which uses a single underlying RocksDB instance. + * Regular segments with {@code segmentId >= 0} expire according to the specified + * retention period. "Reserved" segments with {@code segmentId < 0} do not expire + * and are completely separate from regular segments in that methods such as + * {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)}, + * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)}, + * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)} + * only return regular segments and not reserved segments. The methods {@link #flush()} + * and {@link #close()} flush and close both regular and reserved segments, due to + * the fact that both types of segments share the same physical RocksDB instance. + * To create a reserved segment, use {@link #createReservedSegment(long, String)} instead. + */ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSegment> { private final RocksDBMetricsRecorder metricsRecorder; private final RocksDBStore physicalStore; + // reserved segments do not expire, and are tracked here separately from regular segments + private final Map<Long, LogicalKeyValueSegment> reservedSegments = new HashMap<>(); Review Comment: Only one reserved segment is needed for the versioned store implementation (used for the latest value store) but I've chosen to implement support for reserved segments more generally in this class. If we think this is unnecessarily complex, I can update this to be a single `Optional<LogicalKeyValueSegment>` (or equivalent) instead. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ########## @@ -124,7 +124,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS protected Position position; private OffsetCheckpoint positionCheckpoint; - // VisibleForTesting Review Comment: This cleanup is unrelated to the changes in this PR -- I happened to notice that this method is labeled as visible for testing but is actually called in a number of places from production code, so I've removed the misleading comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org