ijuma commented on code in PR #13038:
URL: https://github.com/apache/kafka/pull/13038#discussion_r1056421122


##########
storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public class LogOffsetMetadata {
+
+    //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
+    private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+    public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new 
LogOffsetMetadata(-1L, 0L, 0);
+
+    private static final int UNKNOWN_FILE_POSITION = -1;
+
+    private final long messageOffset;
+    private final long segmentBaseOffset;
+    private final int relativePositionInSegment;

Review Comment:
   I think we should just make these public since they're final fields. We can 
then remove the accessors. For internal classes, it doesn't seem particularly 
useful to keep immutable fields private along with accessors that are public.



##########
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##########
@@ -190,7 +190,7 @@ final class KafkaMetadataLog private (
   override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {
     offsetMetadata.metadata.asScala match {
       case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark(
-        new kafka.server.LogOffsetMetadata(
+        new org.apache.kafka.server.log.internals.LogOffsetMetadata(

Review Comment:
   Why do we need to use the fully qualified name here?



##########
storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public class LogOffsetMetadata {
+
+    //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
+    private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+    public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new 
LogOffsetMetadata(-1L, 0L, 0);
+
+    private static final int UNKNOWN_FILE_POSITION = -1;
+
+    private final long messageOffset;
+    private final long segmentBaseOffset;
+    private final int relativePositionInSegment;
+
+    public LogOffsetMetadata(long messageOffset) {
+        this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
+    }
+
+    public LogOffsetMetadata(long messageOffset,
+                             long segmentBaseOffset,
+                             int relativePositionInSegment) {
+        this.messageOffset = messageOffset;
+        this.segmentBaseOffset = segmentBaseOffset;
+        this.relativePositionInSegment = relativePositionInSegment;
+    }
+
+    public long messageOffset() {
+        return messageOffset;
+    }
+
+    public long segmentBaseOffset() {
+        return segmentBaseOffset;
+    }
+
+    public int relativePositionInSegment() {
+        return relativePositionInSegment;
+    }
+
+    // check if this offset is already on an older segment compared with the 
given offset
+    public boolean onOlderSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset < that.segmentBaseOffset;
+    }
+
+    // check if this offset is on the same segment with the given offset
+    private boolean onSameSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset == that.segmentBaseOffset;
+    }
+
+    // compute the number of bytes between this offset to the given offset
+    // if they are on the same segment and this offset precedes the given 
offset
+    public int positionDiff(LogOffsetMetadata that) {
+        if (!onSameSegment(that))
+            throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since they are not on the same segment");
+
+        return this.relativePositionInSegment - that.relativePositionInSegment;
+    }
+
+    // decide if the offset metadata only contains message offset info
+    public boolean messageOffsetOnly() {
+        return segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET && 
relativePositionInSegment == UNKNOWN_FILE_POSITION;
+    }
+
+    @Override
+    public String toString() {
+        return "LogOffsetMetadata{" +
+                "messageOffset=" + messageOffset +
+                ", segmentBaseOffset=" + segmentBaseOffset +
+                ", relativePositionInSegment=" + relativePositionInSegment +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        LogOffsetMetadata that = (LogOffsetMetadata) o;
+        return messageOffset == that.messageOffset && segmentBaseOffset == 
that.segmentBaseOffset && relativePositionInSegment == 
that.relativePositionInSegment;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(messageOffset, segmentBaseOffset, 
relativePositionInSegment);

Review Comment:
   This implementation results in boxing all the values and then adding them to 
an array, etc. To avoid any regressions, we should go with the approach that 
doesn't do any of that.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -444,13 +444,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   def maybeUpdateHighWatermark(hw: Long): Option[Long] = {
     lock.synchronized {
-      val oldHighWatermark = highWatermarkMetadata
-      updateHighWatermark(LogOffsetMetadata(hw)) match {
-        case oldHighWatermark.messageOffset =>
-          None
-        case newHighWatermark =>
-          Some(newHighWatermark)
-      }
+      val oldHighWatermark = highWatermarkMetadata.messageOffset()
+      val updatedHighWatermark = updateHighWatermark(new LogOffsetMetadata(hw))
+      if (updatedHighWatermark == oldHighWatermark)
+        None
+      else
+        Some(updatedHighWatermark)

Review Comment:
   Is this change actually required?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -939,14 +938,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
           maybeDuplicate match {
             case Some(duplicate) =>
-              appendInfo.firstOffset = 
Some(LogOffsetMetadata(duplicate.firstOffset))
+              appendInfo.firstOffset = Some(new 
LogOffsetMetadata(duplicate.firstOffset))
               appendInfo.lastOffset = duplicate.lastOffset
               appendInfo.logAppendTime = duplicate.timestamp
               appendInfo.logStartOffset = logStartOffset
             case None =>
               // Before appending update the first offset metadata to include 
segment information
               appendInfo.firstOffset = appendInfo.firstOffset.map { 
offsetMetadata =>
-                offsetMetadata.copy(segmentBaseOffset = segment.baseOffset, 
relativePositionInSegment = segment.size)
+                new LogOffsetMetadata(offsetMetadata.messageOffset(), 
segment.baseOffset, segment.size)

Review Comment:
   Is it worth adding a method that keeps the same message offset, but updates 
the other two values? Something like `updateSegment`?



##########
storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public class LogOffsetMetadata {

Review Comment:
   Let's make this final.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public class LogOffsetMetadata {
+
+    //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
+    private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+    public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new 
LogOffsetMetadata(-1L, 0L, 0);
+
+    private static final int UNKNOWN_FILE_POSITION = -1;
+
+    private final long messageOffset;
+    private final long segmentBaseOffset;
+    private final int relativePositionInSegment;
+
+    public LogOffsetMetadata(long messageOffset) {
+        this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
+    }
+
+    public LogOffsetMetadata(long messageOffset,
+                             long segmentBaseOffset,
+                             int relativePositionInSegment) {
+        this.messageOffset = messageOffset;
+        this.segmentBaseOffset = segmentBaseOffset;
+        this.relativePositionInSegment = relativePositionInSegment;
+    }
+
+    public long messageOffset() {
+        return messageOffset;
+    }
+
+    public long segmentBaseOffset() {
+        return segmentBaseOffset;
+    }
+
+    public int relativePositionInSegment() {
+        return relativePositionInSegment;
+    }
+
+    // check if this offset is already on an older segment compared with the 
given offset
+    public boolean onOlderSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset < that.segmentBaseOffset;
+    }
+
+    // check if this offset is on the same segment with the given offset
+    private boolean onSameSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset == that.segmentBaseOffset;
+    }
+
+    // compute the number of bytes between this offset to the given offset
+    // if they are on the same segment and this offset precedes the given 
offset
+    public int positionDiff(LogOffsetMetadata that) {
+        if (!onSameSegment(that))
+            throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since they are not on the same segment");
+
+        return this.relativePositionInSegment - that.relativePositionInSegment;
+    }
+
+    // decide if the offset metadata only contains message offset info
+    public boolean messageOffsetOnly() {
+        return segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET && 
relativePositionInSegment == UNKNOWN_FILE_POSITION;
+    }
+
+    @Override
+    public String toString() {
+        return "LogOffsetMetadata{" +

Review Comment:
   This differs from the previous `toString` implementation. Is that 
intentional?



##########
storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+public class LogOffsetMetadata {
+
+    //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage 
module
+    private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
+
+    public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new 
LogOffsetMetadata(-1L, 0L, 0);
+
+    private static final int UNKNOWN_FILE_POSITION = -1;
+
+    private final long messageOffset;
+    private final long segmentBaseOffset;
+    private final int relativePositionInSegment;
+
+    public LogOffsetMetadata(long messageOffset) {
+        this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
+    }
+
+    public LogOffsetMetadata(long messageOffset,
+                             long segmentBaseOffset,
+                             int relativePositionInSegment) {
+        this.messageOffset = messageOffset;
+        this.segmentBaseOffset = segmentBaseOffset;
+        this.relativePositionInSegment = relativePositionInSegment;
+    }
+
+    public long messageOffset() {
+        return messageOffset;
+    }
+
+    public long segmentBaseOffset() {
+        return segmentBaseOffset;
+    }
+
+    public int relativePositionInSegment() {
+        return relativePositionInSegment;
+    }
+
+    // check if this offset is already on an older segment compared with the 
given offset
+    public boolean onOlderSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset < that.segmentBaseOffset;
+    }
+
+    // check if this offset is on the same segment with the given offset
+    private boolean onSameSegment(LogOffsetMetadata that) {
+        if (messageOffsetOnly())
+            throw new KafkaException(this + " cannot compare its segment info 
with " + that + " since it only has message offset info");
+
+        return this.segmentBaseOffset == that.segmentBaseOffset;
+    }
+
+    // compute the number of bytes between this offset to the given offset
+    // if they are on the same segment and this offset precedes the given 
offset
+    public int positionDiff(LogOffsetMetadata that) {
+        if (!onSameSegment(that))
+            throw new KafkaException(this + " cannot compare its segment 
position with " + that + " since they are not on the same segment");
+

Review Comment:
   Let's check if any test failed. If it did not, let's add coverage, please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to