vcrfxia commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1084488305
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ########## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * <pre> + * <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp> + * </pre> + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { + private static final int TIMESTAMP_SIZE = 8; + private static final int VALUE_SIZE = 4; + + /** + * @return the validTo timestamp of the latest record in the provided segment + */ + static long getNextTimestamp(final byte[] segmentValue) { + return ByteBuffer.wrap(segmentValue).getLong(0); + } + + /** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * <p> + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. + * + * @return whether the segment is "empty" + */ + static boolean isEmpty(final byte[] segmentValue) { + return segmentValue.length <= TIMESTAMP_SIZE; + } + + /** + * Requires that the segment is not empty. Caller is responsible for verifying that this + * is the case before calling this method. + * + * @return the timestamp of the earliest record in the provided segment. + */ + static long getMinTimestamp(final byte[] segmentValue) { + return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); + } + + /** + * @return the deserialized segment value + */ + static SegmentValue deserialize(final byte[] segmentValue) { + return new PartiallyDeserializedSegmentValue(segmentValue); + } + + /** + * Creates a new segment value that contains the provided record. + * + * @param value the record value + * @param validFrom the record's timestamp + * @param validTo the record's validTo timestamp + * @return the newly created segment value + */ + static SegmentValue newSegmentValueWithRecord( + final byte[] value, final long validFrom, final long validTo) { + return new PartiallyDeserializedSegmentValue(value, validFrom, validTo); + } + + /** + * Creates a new empty segment value. + * + * @param timestamp the timestamp of the tombstone for this empty segment value + * @return the newly created segment value + */ + static SegmentValue newSegmentValueWithTombstone(final long timestamp) { + return new PartiallyDeserializedSegmentValue(timestamp); + } + + interface SegmentValue { + + /** + * @return whether the segment is empty. See + * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} for details. + */ + boolean isEmpty(); + + /** + * Finds the latest record in this segment with timestamp not exceeding the provided + * timestamp bound. This method requires that the provided timestamp bound exists in + * this segment, i.e., the segment is not empty, and the provided timestamp bound is + * at least minTimestamp and is smaller than nextTimestamp. + * + * @param timestamp the timestamp to find + * @param includeValue whether the value of the found record should be returned with the result + * @return the record that is found + * @throws IllegalArgumentException if the segment is empty, or if the provided timestamp + * is not contained within this segment + */ + SegmentSearchResult find(long timestamp, boolean includeValue); + + /** + * Inserts the provided record into the segment as the latest record in the segment. + * This operation is allowed even if the segment is empty. + * <p> + * It is the caller's responsibility to enough that this action is desirable. In the event + * that the new record's timestamp is smaller than the current {@code nextTimestamp} of the Review Comment: I pushed a commit to update all the javadocs which mention "record timestamp" without specifying validFrom or validTo to say "(validFrom) timestamp" instead, to emphasize that what we typically think of as a record's timestamp (e.g., Kafka record timestamp) is the validFrom timestamp in this context. I think this is preferable to scrapping "record timestamp" entirely and only using validFrom / validTo. I worry that doing the latter would actually be more confusing to people seeing this code for the same time because it may (mistakenly) suggest that "validFrom" and "validTo" timestamps are two new timestamps when they are not. (Rather, they are derived from the same record timestamps that we are already used to.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org