fvaleri commented on code in PR #13038:
URL: https://github.com/apache/kafka/pull/13038#discussion_r1056375171


##########
storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public class LogOffsetMetadata {
+
+    //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
+    private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+    public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new 
LogOffsetMetadata(-1L, 0L, 0);
+
+    private static final int UNKNOWN_FILE_POSITION = -1;
+
+    private final long messageOffset;
+    private final long segmentBaseOffset;
+    private final int relativePositionInSegment;
+
+    public LogOffsetMetadata(long messageOffset) {
+        this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
+    }
+
+    public LogOffsetMetadata(long messageOffset,
+                             long segmentBaseOffset,
+                             int relativePositionInSegment) {
+        this.messageOffset = messageOffset;
+        this.segmentBaseOffset = segmentBaseOffset;
+        this.relativePositionInSegment = relativePositionInSegment;
+    }
+
+    public long messageOffset() {
+        return messageOffset;
+    }
+
+    public long segmentBaseOffset() {
+        return segmentBaseOffset;
+    }
+
+    public int relativePositionInSegment() {
+        return relativePositionInSegment;
+    }
+
+    // check if this offset is already on an older segment compared with the 
given offset
+    public boolean onOlderSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset < that.segmentBaseOffset;
+    }
+
+    // check if this offset is on the same segment with the given offset
+    private boolean onSameSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset == that.segmentBaseOffset;
+    }
+
+    // compute the number of bytes between this offset to the given offset
+    // if they are on the same segment and this offset precedes the given 
offset
+    public int positionDiff(LogOffsetMetadata that) {
+        if (!onSameSegment(that))
+            throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since they are not on the same segment");
+

Review Comment:
   Here you miss the `if(messageOffsetOnly)` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to