ijuma commented on code in PR #13038: URL: https://github.com/apache/kafka/pull/13038#discussion_r1056421122
########## storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the base message offset of the located segment + * 3. the physical position on the located segment + */ +public class LogOffsetMetadata { + + //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module + private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L; + + public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0); + + private static final int UNKNOWN_FILE_POSITION = -1; + + private final long messageOffset; + private final long segmentBaseOffset; + private final int relativePositionInSegment; Review Comment: I think we should just make these public since they're final fields. We can then remove the accessors. For internal classes, it doesn't seem particularly useful to keep immutable fields private along with accessors that are public. ########## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ########## @@ -190,7 +190,7 @@ final class KafkaMetadataLog private ( override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = { offsetMetadata.metadata.asScala match { case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark( - new kafka.server.LogOffsetMetadata( + new org.apache.kafka.server.log.internals.LogOffsetMetadata( Review Comment: Why do we need to use the fully qualified name here? ########## storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the base message offset of the located segment + * 3. the physical position on the located segment + */ +public class LogOffsetMetadata { + + //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module + private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L; + + public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0); + + private static final int UNKNOWN_FILE_POSITION = -1; + + private final long messageOffset; + private final long segmentBaseOffset; + private final int relativePositionInSegment; + + public LogOffsetMetadata(long messageOffset) { + this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION); + } + + public LogOffsetMetadata(long messageOffset, + long segmentBaseOffset, + int relativePositionInSegment) { + this.messageOffset = messageOffset; + this.segmentBaseOffset = segmentBaseOffset; + this.relativePositionInSegment = relativePositionInSegment; + } + + public long messageOffset() { + return messageOffset; + } + + public long segmentBaseOffset() { + return segmentBaseOffset; + } + + public int relativePositionInSegment() { + return relativePositionInSegment; + } + + // check if this offset is already on an older segment compared with the given offset + public boolean onOlderSegment(LogOffsetMetadata that) { + if (messageOffsetOnly()) + throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); + + return this.segmentBaseOffset < that.segmentBaseOffset; + } + + // check if this offset is on the same segment with the given offset + private boolean onSameSegment(LogOffsetMetadata that) { + if (messageOffsetOnly()) + throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); + + return this.segmentBaseOffset == that.segmentBaseOffset; + } + + // compute the number of bytes between this offset to the given offset + // if they are on the same segment and this offset precedes the given offset + public int positionDiff(LogOffsetMetadata that) { + if (!onSameSegment(that)) + throw new KafkaException(this + " cannot compare its segment position with " + that + " since they are not on the same segment"); + + return this.relativePositionInSegment - that.relativePositionInSegment; + } + + // decide if the offset metadata only contains message offset info + public boolean messageOffsetOnly() { + return segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET && relativePositionInSegment == UNKNOWN_FILE_POSITION; + } + + @Override + public String toString() { + return "LogOffsetMetadata{" + + "messageOffset=" + messageOffset + + ", segmentBaseOffset=" + segmentBaseOffset + + ", relativePositionInSegment=" + relativePositionInSegment + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LogOffsetMetadata that = (LogOffsetMetadata) o; + return messageOffset == that.messageOffset && segmentBaseOffset == that.segmentBaseOffset && relativePositionInSegment == that.relativePositionInSegment; + } + + @Override + public int hashCode() { + return Objects.hash(messageOffset, segmentBaseOffset, relativePositionInSegment); Review Comment: This implementation results in boxing all the values and then adding them to an array, etc. To avoid any regressions, we should go with the approach that doesn't do any of that. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -444,13 +444,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ def maybeUpdateHighWatermark(hw: Long): Option[Long] = { lock.synchronized { - val oldHighWatermark = highWatermarkMetadata - updateHighWatermark(LogOffsetMetadata(hw)) match { - case oldHighWatermark.messageOffset => - None - case newHighWatermark => - Some(newHighWatermark) - } + val oldHighWatermark = highWatermarkMetadata.messageOffset() + val updatedHighWatermark = updateHighWatermark(new LogOffsetMetadata(hw)) + if (updatedHighWatermark == oldHighWatermark) + None + else + Some(updatedHighWatermark) Review Comment: Is this change actually required? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -939,14 +938,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeDuplicate match { case Some(duplicate) => - appendInfo.firstOffset = Some(LogOffsetMetadata(duplicate.firstOffset)) + appendInfo.firstOffset = Some(new LogOffsetMetadata(duplicate.firstOffset)) appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp appendInfo.logStartOffset = logStartOffset case None => // Before appending update the first offset metadata to include segment information appendInfo.firstOffset = appendInfo.firstOffset.map { offsetMetadata => - offsetMetadata.copy(segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) + new LogOffsetMetadata(offsetMetadata.messageOffset(), segment.baseOffset, segment.size) Review Comment: Is it worth adding a method that keeps the same message offset, but updates the other two values? Something like `updateSegment`? ########## storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the base message offset of the located segment + * 3. the physical position on the located segment + */ +public class LogOffsetMetadata { Review Comment: Let's make this final. ########## storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the base message offset of the located segment + * 3. the physical position on the located segment + */ +public class LogOffsetMetadata { + + //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module + private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L; + + public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0); + + private static final int UNKNOWN_FILE_POSITION = -1; + + private final long messageOffset; + private final long segmentBaseOffset; + private final int relativePositionInSegment; + + public LogOffsetMetadata(long messageOffset) { + this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION); + } + + public LogOffsetMetadata(long messageOffset, + long segmentBaseOffset, + int relativePositionInSegment) { + this.messageOffset = messageOffset; + this.segmentBaseOffset = segmentBaseOffset; + this.relativePositionInSegment = relativePositionInSegment; + } + + public long messageOffset() { + return messageOffset; + } + + public long segmentBaseOffset() { + return segmentBaseOffset; + } + + public int relativePositionInSegment() { + return relativePositionInSegment; + } + + // check if this offset is already on an older segment compared with the given offset + public boolean onOlderSegment(LogOffsetMetadata that) { + if (messageOffsetOnly()) + throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); + + return this.segmentBaseOffset < that.segmentBaseOffset; + } + + // check if this offset is on the same segment with the given offset + private boolean onSameSegment(LogOffsetMetadata that) { + if (messageOffsetOnly()) + throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); + + return this.segmentBaseOffset == that.segmentBaseOffset; + } + + // compute the number of bytes between this offset to the given offset + // if they are on the same segment and this offset precedes the given offset + public int positionDiff(LogOffsetMetadata that) { + if (!onSameSegment(that)) + throw new KafkaException(this + " cannot compare its segment position with " + that + " since they are not on the same segment"); + + return this.relativePositionInSegment - that.relativePositionInSegment; + } + + // decide if the offset metadata only contains message offset info + public boolean messageOffsetOnly() { + return segmentBaseOffset == UNIFIED_LOG_UNKNOWN_OFFSET && relativePositionInSegment == UNKNOWN_FILE_POSITION; + } + + @Override + public String toString() { + return "LogOffsetMetadata{" + Review Comment: This differs from the previous `toString` implementation. Is that intentional? ########## storage/src/main/java/org/apache/kafka/server/log/internals/LogOffsetMetadata.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the base message offset of the located segment + * 3. the physical position on the located segment + */ +public class LogOffsetMetadata { + + //TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module + private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L; + + public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0); + + private static final int UNKNOWN_FILE_POSITION = -1; + + private final long messageOffset; + private final long segmentBaseOffset; + private final int relativePositionInSegment; + + public LogOffsetMetadata(long messageOffset) { + this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION); + } + + public LogOffsetMetadata(long messageOffset, + long segmentBaseOffset, + int relativePositionInSegment) { + this.messageOffset = messageOffset; + this.segmentBaseOffset = segmentBaseOffset; + this.relativePositionInSegment = relativePositionInSegment; + } + + public long messageOffset() { + return messageOffset; + } + + public long segmentBaseOffset() { + return segmentBaseOffset; + } + + public int relativePositionInSegment() { + return relativePositionInSegment; + } + + // check if this offset is already on an older segment compared with the given offset + public boolean onOlderSegment(LogOffsetMetadata that) { + if (messageOffsetOnly()) + throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); + + return this.segmentBaseOffset < that.segmentBaseOffset; + } + + // check if this offset is on the same segment with the given offset + private boolean onSameSegment(LogOffsetMetadata that) { + if (messageOffsetOnly()) + throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info"); + + return this.segmentBaseOffset == that.segmentBaseOffset; + } + + // compute the number of bytes between this offset to the given offset + // if they are on the same segment and this offset precedes the given offset + public int positionDiff(LogOffsetMetadata that) { + if (!onSameSegment(that)) + throw new KafkaException(this + " cannot compare its segment position with " + that + " since they are not on the same segment"); + Review Comment: Let's check if any test failed. If it did not, let's add coverage, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org