vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1093907117


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, 
reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record 
version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record 
version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored 
is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of 
zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this 
need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record 
versions contained in the
+ * segment. It is not necessary to store this information separately because 
this information is
+ * never required on its own. Record versions are always deserialized in 
order, and we can
+ * determine when we have reached the end of the list based on whether the 
(validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * Additionally, there is one edge case / exception to the segment value 
format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the 
segment in which
+ * this tombstone is to be stored contains currently no record versions, then 
this will result in
+ * an "empty" segment -- i.e., the segment will contain only a single 
tombstone with no validTo
+ * timestamp associated with it. When this happens, the serialized segment 
will contain the
+ * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an 
empty segment, the
+ * tombstone's timestamp can be fetched as the {@code next_timestamp} of the 
segment. (An empty
+ * segment can be thought of as having {@code min_timestamp} and {@code 
next_timestamp} both equal
+ * to the timestamp of the single tombstone record version. To avoid the 
redundancy of serializing
+ * the same timestamp twice, it is only serialized once and stored as the 
first timestamp of the
+ * segment, which happens to be {@code next_timestamp}.)
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided 
segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is 
one that
+     * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+     * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key 
is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no 
validTo
+     * timestamp associated with it.
+     *
+     * @return whether the segment is "empty"
+     */
+    static boolean isEmpty(final byte[] segmentValue) {
+        return segmentValue.length <= TIMESTAMP_SIZE;
+    }
+
+    /**
+     * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+     * is the case before calling this method.
+     *
+     * @return the (validFrom) timestamp of the earliest record in the 
provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(

Review Comment:
   This actually is "record" in the sense of a Kafka message -- it's how we 
create a new segment row with a single record (version) in it. Is the confusing 
part that these record versions also contain a validTo, whereas Kafka messages 
don't really?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to