vcrfxia commented on code in PR #13431:
URL: https://github.com/apache/kafka/pull/13431#discussion_r1143859781


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -821,4 +823,20 @@ public Options getOptions() {
     public Position getPosition() {
         return position;
     }
+
+    /**
+     * Same as {@link Bytes#increment(Bytes)} but {@code null} is returned 
instead of throwing
+     * {@code IndexOutOfBoundsException} in the event of overflow.
+     *
+     * @param input bytes to increment
+     * @return A new copy of the incremented byte array, or {@code null} if 
incrementing would
+     *         result in overflow.
+     */
+    static Bytes incrementWithoutOverflow(final Bytes input) {

Review Comment:
   Now that negative segment ID is allowed (only for reserved segments within 
`LogicalKeyValueSegments`), it's possible that a valid segment ID will overflow 
when incremented, and we want to handle this gracefully instead of throwing an 
exception when it happens.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##########
@@ -16,15 +16,33 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
+/**
+ * A {@link Segments} implementation which uses a single underlying RocksDB 
instance.
+ * Regular segments with {@code segmentId >= 0} expire according to the 
specified
+ * retention period. "Reserved" segments with {@code segmentId < 0} do not 
expire
+ * and are completely separate from regular segments in that methods such as
+ * {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, 
ProcessorContext)},
+ * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
+ * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
+ * only return regular segments and not reserved segments. The methods {@link 
#flush()}
+ * and {@link #close()} flush and close both regular and reserved segments, 
due to
+ * the fact that both types of segments share the same physical RocksDB 
instance.
+ * To create a reserved segment, use {@link #createReservedSegment(long, 
String)} instead.
+ */
 public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSegment> {
 
     private final RocksDBMetricsRecorder metricsRecorder;
     private final RocksDBStore physicalStore;
 
+    // reserved segments do not expire, and are tracked here separately from 
regular segments
+    private final Map<Long, LogicalKeyValueSegment> reservedSegments = new 
HashMap<>();

Review Comment:
   Only one reserved segment is needed for the versioned store implementation 
(used for the latest value store) but I've chosen to implement support for 
reserved segments more generally in this class. If we think this is 
unnecessarily complex, I can update this to be a single 
`Optional<LogicalKeyValueSegment>` (or equivalent) instead.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -124,7 +124,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     protected Position position;
     private OffsetCheckpoint positionCheckpoint;
 
-    // VisibleForTesting

Review Comment:
   This cleanup is unrelated to the changes in this PR -- I happened to notice 
that this method is labeled as visible for testing but is actually called in a 
number of places from production code, so I've removed the misleading comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to