mimaison commented on code in PR #13038:
URL: https://github.com/apache/kafka/pull/13038#discussion_r1057271751


##########
storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public class LogOffsetMetadata {
+
+    //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
+    private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+    public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new 
LogOffsetMetadata(-1L, 0L, 0);
+
+    private static final int UNKNOWN_FILE_POSITION = -1;
+
+    private final long messageOffset;
+    private final long segmentBaseOffset;
+    private final int relativePositionInSegment;
+
+    public LogOffsetMetadata(long messageOffset) {
+        this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
+    }
+
+    public LogOffsetMetadata(long messageOffset,
+                             long segmentBaseOffset,
+                             int relativePositionInSegment) {
+        this.messageOffset = messageOffset;
+        this.segmentBaseOffset = segmentBaseOffset;
+        this.relativePositionInSegment = relativePositionInSegment;
+    }
+
+    public long messageOffset() {
+        return messageOffset;
+    }
+
+    public long segmentBaseOffset() {
+        return segmentBaseOffset;
+    }
+
+    public int relativePositionInSegment() {
+        return relativePositionInSegment;
+    }
+
+    // check if this offset is already on an older segment compared with the 
given offset
+    public boolean onOlderSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset < that.segmentBaseOffset;
+    }
+
+    // check if this offset is on the same segment with the given offset
+    private boolean onSameSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset == that.segmentBaseOffset;
+    }
+
+    // compute the number of bytes between this offset to the given offset
+    // if they are on the same segment and this offset precedes the given 
offset
+    public int positionDiff(LogOffsetMetadata that) {
+        if (!onSameSegment(that))
+            throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since they are not on the same segment");
+

Review Comment:
   The `if(messageOffsetOnly)` check here was effectively unreachable code. If 
this was true, `onSameSegment()` would have thrown already (with the exact same 
message).
   
   Since `onSameSegment()` was not marked private, I guess it made sense to 
have the check there too, "just in case". But this method is only called from 
`positionDiff()`, so I've marked it private now and thought we did not need to 
have the redundant/unreachable check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to