vcrfxia commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1093907117
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * <pre> + * <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp> + * </pre> + * where: + * <ul> + * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this + * segment,</li> + * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored + * in this segment, and</li> + * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone, + * in order to distinguish from empty array which has {@code value_size} of zero. In practice, + * {@code value_size} is always set to -1 for the tombstone case, though this need not be true + * in general.</li> + * </ul> + * <p> + * Note that the value format above does not store the number of record versions contained in the + * segment. It is not necessary to store this information separately because this information is + * never required on its own. Record versions are always deserialized in order, and we can + * determine when we have reached the end of the list based on whether the (validFrom) timestamp of + * the record version equals the {@code min_timestamp}. + * <p> + * Additionally, there is one edge case / exception to the segment value format described above. + * If the latest record version (for a particular key) is a tombstone, and the segment in which + * this tombstone is to be stored contains currently no record versions, then this will result in + * an "empty" segment -- i.e., the segment will contain only a single tombstone with no validTo + * timestamp associated with it. When this happens, the serialized segment will contain the + * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an empty segment, the + * tombstone's timestamp can be fetched as the {@code next_timestamp} of the segment. (An empty + * segment can be thought of as having {@code min_timestamp} and {@code next_timestamp} both equal + * to the timestamp of the single tombstone record version. To avoid the redundancy of serializing + * the same timestamp twice, it is only serialized once and stored as the first timestamp of the + * segment, which happens to be {@code next_timestamp}.) + */ +final class RocksDBVersionedStoreSegmentValueFormatter { + private static final int TIMESTAMP_SIZE = 8; + private static final int VALUE_SIZE = 4; + + /** + * @return the validTo timestamp of the latest record in the provided segment + */ + static long getNextTimestamp(final byte[] segmentValue) { + return ByteBuffer.wrap(segmentValue).getLong(0); + } + + /** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * <p> + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. + * + * @return whether the segment is "empty" + */ + static boolean isEmpty(final byte[] segmentValue) { + return segmentValue.length <= TIMESTAMP_SIZE; + } + + /** + * Requires that the segment is not empty. Caller is responsible for verifying that this + * is the case before calling this method. + * + * @return the (validFrom) timestamp of the earliest record in the provided segment. + */ + static long getMinTimestamp(final byte[] segmentValue) { + return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); + } + + /** + * @return the deserialized segment value + */ + static SegmentValue deserialize(final byte[] segmentValue) { + return new PartiallyDeserializedSegmentValue(segmentValue); + } + + /** + * Creates a new segment value that contains the provided record. + * + * @param value the record value + * @param validFrom the record's (validFrom) timestamp + * @param validTo the record's validTo timestamp + * @return the newly created segment value + */ + static SegmentValue newSegmentValueWithRecord( Review Comment: This actually is "record" in the sense of a Kafka message -- it's how we create a new segment row with a single record (version) in it. Is the confusing part that these record versions also contain a validTo, whereas Kafka messages don't really? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org