junrao commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r609073411
########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class is an implementation of {@link RemoteLogMetadataManager} backed by in-memory store. Review comment: It would be useful to add a comment on whether the methods in this class are thread-safe or not. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * This class represents the in-memory state of segments associated with a leader epoch. This includes the mapping of offset to + * segment ids and unreferenced segments which are not mapped to any offset but they exist in remote storage. + * <p> + * This is used by {@link RemoteLogMetadataCache} to track the segments for each leader epoch. + */ +class RemoteLogLeaderEpochState { + + // It contains offset to segment ids mapping with the segment state as COPY_SEGMENT_FINISHED. + private final NavigableMap<Long, RemoteLogSegmentId> offsetToId = new ConcurrentSkipListMap<>(); + + /** + * It represents unreferenced segments for this leader epoch. It contains the segments still in COPY_SEGMENT_STARTED + * and DELETE_SEGMENT_STARTED state or these have been replaced by callers with other segments having the same + * start offset for the leader epoch. These will be returned by {@link RemoteLogMetadataCache#listAllRemoteLogSegments()} + * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int leaderEpoch)} so that callers can clean them up if + * they still exist. These will be cleaned from the cache once they reach DELETE_SEGMENT_FINISHED state. + */ + private final Set<RemoteLogSegmentId> unreferencedSegmentIds = ConcurrentHashMap.newKeySet(); + + // It represents the highest log offset of the segments that were updated with updateHighestLogOffset. + private volatile Long highestLogOffset; + + /** + * Returns all the segments associated with this leader epoch sorted by start offset in ascending order. + * + * @param idToSegmentMetadata mapping of id to segment metadata. This will be used to get RemoteLogSegmentMetadata + * for an id to be used for sorting. + * @return + */ + Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments(Map<RemoteLogSegmentId, RemoteLogSegmentMetadata> idToSegmentMetadata) { + // Return all the segments including unreferenced metadata. + int size = offsetToId.size() + unreferencedSegmentIds.size(); + if (size == 0) { + return Collections.emptyIterator(); + } + + ArrayList<RemoteLogSegmentMetadata> metadataList = new ArrayList<>(size); + for (RemoteLogSegmentId id : offsetToId.values()) { + metadataList.add(idToSegmentMetadata.get(id)); + } + + if (!unreferencedSegmentIds.isEmpty()) { + for (RemoteLogSegmentId id : unreferencedSegmentIds) { + metadataList.add(idToSegmentMetadata.get(id)); + } + + // sort only when unreferenced entries exist as they are already sorted in offsetToId. + metadataList.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset)); + } + + return metadataList.iterator(); + } + + void handleSegmentWithCopySegmentFinishedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, + Long leaderEpochEndOffset) { + // Add the segment epochs mapping as the segment is copied successfully. + RemoteLogSegmentId oldEntry = offsetToId.put(startOffset, remoteLogSegmentId); + + // Remove the metadata from unreferenced entries as it is successfully copied and added to the offset mapping. + unreferencedSegmentIds.remove(remoteLogSegmentId); + + // Add the old entry to unreferenced entries as the mapping is removed for the old entry. + if (oldEntry != null) { + unreferencedSegmentIds.add(oldEntry); + } + + // Update the highest offset entry for this leader epoch as we added a new mapping. + maybeUpdateHighestLogOffset(leaderEpochEndOffset); + } + + void handleSegmentWithDeleteSegmentStartedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId, + Long leaderEpochEndOffset) { + // Remove the offset mappings as this segment is getting deleted. + offsetToId.remove(startOffset, remoteLogSegmentId); + + // Add this entry to unreferenced set for the leader epoch as it is being deleted. + // This allows any retries of deletion as these are returned from listAllSegments and listSegments(leaderEpoch). + unreferencedSegmentIds.add(remoteLogSegmentId); + + // Update the highest offset entry for this leader epoch. This needs to be done as a segment can reach this + // state without going through COPY_SEGMENT_FINISHED state. + maybeUpdateHighestLogOffset(leaderEpochEndOffset); Review comment: It seems that it's inconsistent that we update highest log offset here but not in handleSegmentWithCopySegmentStartedState(). Could we comment on whether highestLogOffset reflects the segments that have reached COPY_SEGMENT_FINISHED or not? ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class RemoteLogMetadataCacheTest { + private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class); + + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), + new TopicPartition("foo", 0)); + private static final int SEG_SIZE = 1024 * 1024; + private static final int BROKER_ID_0 = 0; + private static final int BROKER_ID_1 = 1; + + private final Time time = new MockTime(1); + + @Test + public void testSegmentsLifeCycleInCache() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + // Create remote log segment metadata and add them to RemoteLogMetadataCache. + + // segment 0 + // 0-100 + // leader epochs (0,0), (1,20), (2,80) + Map<Integer, Long> segment0LeaderEpochs = new HashMap<>(); + segment0LeaderEpochs.put(0, 0L); + segment0LeaderEpochs.put(1, 20L); + segment0LeaderEpochs.put(2, 80L); + RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segment0Metadata = new RemoteLogSegmentMetadata(segment0Id, 0L, 100L, + -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segment0LeaderEpochs); + cache.addCopyInProgressSegment(segment0Metadata); + + // We should not get this as the segment is still getting copied and it is not yet considered successful until + // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED. + Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 1).isPresent()); + + RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate( + segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + cache.updateRemoteLogSegmentMetadata(segment0Update); + RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update); + + // segment 1 + // 101 - 200 + // no changes in leadership with in this segment + // leader epochs (2, 101) + Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 101L); + RemoteLogSegmentMetadata segment1Metadata = createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // segment 2 + // 201 - 300 + // moved to epoch 3 in between + // leader epochs (2, 201), (3, 240) + Map<Integer, Long> segment2LeaderEpochs = new HashMap<>(); + segment2LeaderEpochs.put(2, 201L); + segment2LeaderEpochs.put(3, 240L); + RemoteLogSegmentMetadata segment2Metadata = createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // segment 3 + // 250 - 400 + // leader epochs (3, 250), (4, 370) + Map<Integer, Long> segment3LeaderEpochs = new HashMap<>(); + segment3LeaderEpochs.put(3, 250L); + segment3LeaderEpochs.put(4, 370L); + RemoteLogSegmentMetadata segment3Metadata = createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + ////////////////////////////////////////////////////////////////////////////////////////// + // Four segments are added with different boundaries and leader epochs. + // Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset) for different + // epochs and offsets + ////////////////////////////////////////////////////////////////////////////////////////// + + HashMap<EpochOffset, RemoteLogSegmentMetadata> expectedEpochOffsetToSegmentMetadata = new HashMap<>(); + // Existing metadata entries. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), expectedSegment0Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), segment1Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), segment2Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), segment3Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), segment3Metadata); + + // Non existing metadata entries. + // Search for offset 110, epoch 1, and it should not exist. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), null); + // Search for non existing offset 401, epoch 4. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), null); + // Search for non existing epoch 5. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), null); + + for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : expectedEpochOffsetToSegmentMetadata.entrySet()) { + EpochOffset epochOffset = entry.getKey(); + Optional<RemoteLogSegmentMetadata> segmentMetadata = cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset); + RemoteLogSegmentMetadata expectedSegmentMetadata = entry.getValue(); + log.info("Searching for {} , result: {}, expected: {} ", epochOffset, segmentMetadata, + expectedSegmentMetadata); + if (expectedSegmentMetadata != null) { + Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadata); + } else { + Assertions.assertFalse(segmentMetadata.isPresent()); + } + } + + // Update segment with state as DELETE_SEGMENT_STARTED. + // It should not be available when we search for that segment. + cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(), + time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1)); + Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 10).isPresent()); + + // Update segment with state as DELETE_SEGMENT_FINISHED. + // It should not be available when we search for that segment. + cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(), + time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1)); + Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 10).isPresent()); + + ////////////////////////////////////////////////////////////////////////////////////////// + // Search for cache.highestLogOffset(leaderEpoch) for all the leader epochs + ////////////////////////////////////////////////////////////////////////////////////////// + + Map<Integer, Long> expectedEpochToHighestOffset = new HashMap<>(); + expectedEpochToHighestOffset.put(0, 19L); Review comment: It's kind of weird that the segment with epoch 0 is already deleted and yet we still expect the highest offset for epoch 0 to be returned. ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class RemoteLogMetadataCacheTest { + private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class); + + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), + new TopicPartition("foo", 0)); + private static final int SEG_SIZE = 1024 * 1024; + private static final int BROKER_ID_0 = 0; + private static final int BROKER_ID_1 = 1; + + private final Time time = new MockTime(1); + + @Test + public void testSegmentsLifeCycleInCache() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + // Create remote log segment metadata and add them to RemoteLogMetadataCache. + + // segment 0 + // 0-100 + // leader epochs (0,0), (1,20), (2,80) + Map<Integer, Long> segment0LeaderEpochs = new HashMap<>(); + segment0LeaderEpochs.put(0, 0L); + segment0LeaderEpochs.put(1, 20L); + segment0LeaderEpochs.put(2, 80L); + RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segment0Metadata = new RemoteLogSegmentMetadata(segment0Id, 0L, 100L, + -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segment0LeaderEpochs); + cache.addCopyInProgressSegment(segment0Metadata); + + // We should not get this as the segment is still getting copied and it is not yet considered successful until + // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED. + Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 1).isPresent()); + + RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate( + segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + cache.updateRemoteLogSegmentMetadata(segment0Update); + RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update); + + // segment 1 + // 101 - 200 + // no changes in leadership with in this segment + // leader epochs (2, 101) + Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 101L); + RemoteLogSegmentMetadata segment1Metadata = createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // segment 2 + // 201 - 300 + // moved to epoch 3 in between + // leader epochs (2, 201), (3, 240) + Map<Integer, Long> segment2LeaderEpochs = new HashMap<>(); + segment2LeaderEpochs.put(2, 201L); + segment2LeaderEpochs.put(3, 240L); + RemoteLogSegmentMetadata segment2Metadata = createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // segment 3 + // 250 - 400 + // leader epochs (3, 250), (4, 370) + Map<Integer, Long> segment3LeaderEpochs = new HashMap<>(); + segment3LeaderEpochs.put(3, 250L); + segment3LeaderEpochs.put(4, 370L); + RemoteLogSegmentMetadata segment3Metadata = createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + ////////////////////////////////////////////////////////////////////////////////////////// + // Four segments are added with different boundaries and leader epochs. + // Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset) for different + // epochs and offsets + ////////////////////////////////////////////////////////////////////////////////////////// + + HashMap<EpochOffset, RemoteLogSegmentMetadata> expectedEpochOffsetToSegmentMetadata = new HashMap<>(); + // Existing metadata entries. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), expectedSegment0Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), segment1Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), segment2Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), segment3Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), segment3Metadata); + + // Non existing metadata entries. + // Search for offset 110, epoch 1, and it should not exist. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), null); + // Search for non existing offset 401, epoch 4. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), null); + // Search for non existing epoch 5. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), null); + + for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : expectedEpochOffsetToSegmentMetadata.entrySet()) { + EpochOffset epochOffset = entry.getKey(); + Optional<RemoteLogSegmentMetadata> segmentMetadata = cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset); + RemoteLogSegmentMetadata expectedSegmentMetadata = entry.getValue(); + log.info("Searching for {} , result: {}, expected: {} ", epochOffset, segmentMetadata, + expectedSegmentMetadata); + if (expectedSegmentMetadata != null) { + Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadata); + } else { + Assertions.assertFalse(segmentMetadata.isPresent()); + } + } + + // Update segment with state as DELETE_SEGMENT_STARTED. + // It should not be available when we search for that segment. + cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(), + time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1)); + Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 10).isPresent()); + + // Update segment with state as DELETE_SEGMENT_FINISHED. + // It should not be available when we search for that segment. + cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(), + time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1)); + Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 10).isPresent()); + + ////////////////////////////////////////////////////////////////////////////////////////// + // Search for cache.highestLogOffset(leaderEpoch) for all the leader epochs + ////////////////////////////////////////////////////////////////////////////////////////// + + Map<Integer, Long> expectedEpochToHighestOffset = new HashMap<>(); + expectedEpochToHighestOffset.put(0, 19L); + expectedEpochToHighestOffset.put(1, 79L); + expectedEpochToHighestOffset.put(2, 239L); + expectedEpochToHighestOffset.put(3, 369L); + expectedEpochToHighestOffset.put(4, 400L); + + for (Map.Entry<Integer, Long> entry : expectedEpochToHighestOffset.entrySet()) { + Integer epoch = entry.getKey(); + Long expectedOffset = entry.getValue(); + Optional<Long> offset = cache.highestOffsetForEpoch(epoch); + log.info("Fetching highest offset for epoch: {} , returned: {} , expected: {}", epoch, offset, expectedOffset); + Assertions.assertEquals(Optional.of(expectedOffset), offset); + } + + // Search for non existing leader epoch + Optional<Long> highestOffsetForEpoch5 = cache.highestOffsetForEpoch(5); + Assertions.assertFalse(highestOffsetForEpoch5.isPresent()); + } + + private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogMetadataCache cache, + Map<Integer, Long> segmentLeaderEpochs, + long startOffset, + long endOffset, + RemoteLogSegmentState state) + throws RemoteResourceNotFoundException { + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, + BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); + cache.addCopyInProgressSegment(segmentMetadata); + + RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, + time.milliseconds(), state, BROKER_ID_1); + cache.updateRemoteLogSegmentMetadata(segMetadataUpdate); + + return segmentMetadata.createWithUpdates(segMetadataUpdate); + } + + @Test + public void testCacheSegmentWithCopySegmentStartedState() { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + + // Create a segment with state COPY_SEGMENT_STARTED, and check for searching that segment and listing the + // segments. + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + cache.addCopyInProgressSegment(segmentMetadata); + + // This segment should not be available as the state is not reached to COPY_SEGMENT_FINISHED. + Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 = cache.remoteLogSegmentMetadata(0, 0); + Assertions.assertFalse(segMetadataForOffset0Epoch0.isPresent()); + + // cache.listRemoteLogSegments APIs should contain the above segment. + checkListSegments(cache, 0, segmentMetadata); + } + + @Test + public void testCacheSegmentWithCopySegmentFinishedState() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + + // Create a segment and move it to state COPY_SEGMENT_FINISHED. and check for searching that segment and + // listing the segments. + RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 101L), + 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // Search should return the above segment. + Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 = cache.remoteLogSegmentMetadata(0, 150); + Assertions.assertEquals(Optional.of(segmentMetadata), segMetadataForOffset150); + + // cache.listRemoteLogSegments should contain the above segments. + checkListSegments(cache, 0, segmentMetadata); + } + + @Test + public void testCacheSegmentWithDeleteSegmentStartedState() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + + // Create a segment and move it to state DELETE_SEGMENT_STARTED, and check for searching that segment and + // listing the segments. + RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 201L), + 201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED); + + // Search should not return the above segment as their leader epoch state is cleared. + Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset250Epoch0 = cache.remoteLogSegmentMetadata(0, 250); + Assertions.assertFalse(segmentMetadataForOffset250Epoch0.isPresent()); + + checkListSegments(cache, 0, segmentMetadata); + } + + @Test + public void testCacheSegmentsWithDeleteSegmentFinishedState() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + + // Create a segment and move it to state DELETE_SEGMENT_FINISHED, and check for searching that segment and + // listing the segments. + RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 301L), + 301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED); + + // Search should not return the above segment as their leader epoch state is cleared. + Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 350).isPresent()); + + RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), + time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1); + cache.updateRemoteLogSegmentMetadata(segmentMetadataUpdate); + + // listRemoteLogSegments(0) and listRemoteLogSegments() should not contain the above segment. + Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext()); + Assertions.assertFalse(cache.listAllRemoteLogSegments().hasNext()); + } + + @Test + public void testCacheListSegments() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + + // Create a few segments and add them to the cache. + RemoteLogSegmentMetadata segment0 = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0, 100, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + RemoteLogSegmentMetadata segment1 = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 101L), 101, 200, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + Map<Integer, Long> segment2LeaderEpochs = new HashMap<>(); + segment2LeaderEpochs.put(0, 201L); + segment2LeaderEpochs.put(1, 301L); + RemoteLogSegmentMetadata segment2 = createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201, 400, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // listRemoteLogSegments(0) and listAllRemoteLogSegments() should contain all the above segments. + List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = Arrays.asList(segment0, segment1, segment2); + Assertions.assertTrue(TestUtils.sameElementsWithOrder(cache.listRemoteLogSegments(0), + expectedSegmentsForEpoch0.iterator())); + Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(cache.listAllRemoteLogSegments(), + expectedSegmentsForEpoch0.iterator())); + + // listRemoteLogSegments(0) should contain only segment2. Review comment: listRemoteLogSegments(0) => listRemoteLogSegments(1) ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class InmemoryRemoteStorageManagerTest { + private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteStorageManagerTest.class); + + private static final TopicPartition TP = new TopicPartition("foo", 1); + private static final File DIR = TestUtils.tempDirectory("inmem-rsm-"); + private static final Random RANDOM = new Random(); + + @Test + public void testCopyLogSegment() throws Exception { + InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager(); + RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata(); + LogSegmentData logSegmentData = createLogSegmentData(); + // Copy all the segment data. + rsm.copyLogSegmentData(segmentMetadata, logSegmentData); + + // Check that the segment data exists in in-memory RSM. + boolean containsSegment = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForSegment(segmentMetadata)); + Assertions.assertTrue(containsSegment); + + // Check that the indexes exist in in-memory RSM. + for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) { + boolean containsIndex = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForIndex(segmentMetadata, indexType)); + Assertions.assertTrue(containsIndex); + } + } + + private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { + TopicIdPartition topicPartition = new TopicIdPartition(Uuid.randomUuid(), TP); + RemoteLogSegmentId id = new RemoteLogSegmentId(topicPartition, Uuid.randomUuid()); + return new RemoteLogSegmentMetadata(id, 100L, 200L, System.currentTimeMillis(), 0, + System.currentTimeMillis(), 100, Collections.singletonMap(1, 100L)); + } + + @Test + public void testFetchLogSegmentIndexes() throws Exception { + InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager(); + RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata(); + int segSize = 100; + LogSegmentData logSegmentData = createLogSegmentData(segSize); + + // Copy the segment + rsm.copyLogSegmentData(segmentMetadata, logSegmentData); + + // Check segment data exists for the copied segment. + try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0)) { + checkContentSame(segmentStream, logSegmentData.logSegment()); + } + + HashMap<RemoteStorageManager.IndexType, Path> expectedIndexToPaths = new HashMap<>(); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.OFFSET, logSegmentData.offsetIndex()); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.TIMESTAMP, logSegmentData.timeIndex()); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.TRANSACTION, logSegmentData.txnIndex()); + expectedIndexToPaths.put(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, logSegmentData.producerSnapshotIndex()); + + // Check all segment indexes exist for the copied segment. + for (Map.Entry<RemoteStorageManager.IndexType, Path> entry : expectedIndexToPaths.entrySet()) { + RemoteStorageManager.IndexType indexType = entry.getKey(); + Path indexPath = entry.getValue(); + log.info("Fetching index type: {}, indexPath: {}", indexType, indexPath); Review comment: Is this logging needed? Does it need to be in info level? ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCacheTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * This class covers basic unit tests for {@link InmemoryRemoteLogMetadataManager}. InmemoryRemoteLogMetadataManager is + * used only in integration tests but not in production code. It mostly uses {@link RemoteLogMetadataCache} and it has + * broad test coverage with {@link RemoteLogMetadataCacheTest}. + */ +public class InmemoryRemoteLogMetadataManagerTest { + + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), + new TopicPartition("foo", 0)); + private static final int SEG_SIZE = 1024 * 1024; + private static final int BROKER_ID_0 = 0; + private static final int BROKER_ID_1 = 1; + + private final Time time = new MockTime(1); + + @Test + public void testFetchSegments() throws Exception { + InmemoryRemoteLogMetadataManager rlmm = new InmemoryRemoteLogMetadataManager(); + + // 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available. + Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0, + time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); + rlmm.addRemoteLogSegmentMetadata(segmentMetadata); + + // Search should not return the above segment. + Assertions.assertFalse(rlmm.remoteLogSegmentMetadata(TP0, 0, 150).isPresent()); + + // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available. + RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), + RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate); + RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); + + // Search should return the above segment. + Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = rlmm.remoteLogSegmentMetadata(TP0, 0, 150); + Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); + } + + @Test + public void testRemotePartitionDeletion() throws Exception { + InmemoryRemoteLogMetadataManager rlmm = new InmemoryRemoteLogMetadataManager(); + + // Create remote log segment metadata and add them to RLMM. + + // segment 0 + // 0-100 + // leader epochs (0,0), (1,20), (2,80) + Map<Integer, Long> segmentLeaderEpochs = new HashMap<>(); + segmentLeaderEpochs.put(0, 0L); + segmentLeaderEpochs.put(1, 20L); + segmentLeaderEpochs.put(2, 50L); + segmentLeaderEpochs.put(3, 80L); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 100L, + -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); + rlmm.addRemoteLogSegmentMetadata(segmentMetadata); + RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( + segmentId, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate); + + RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); + + // Check that the seg exists in RLMM + Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 = rlmm.remoteLogSegmentMetadata(TP0, 1, 30L); + Assertions.assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1); + + // Mark the partition for deletion. RLMM should clear all its internal state for that partition. Review comment: At this point, RLMM hasn't cleared all its internal state yet. ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class RemoteLogMetadataCacheTest { + private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class); + + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), + new TopicPartition("foo", 0)); + private static final int SEG_SIZE = 1024 * 1024; + private static final int BROKER_ID_0 = 0; + private static final int BROKER_ID_1 = 1; + + private final Time time = new MockTime(1); + + @Test + public void testSegmentsLifeCycleInCache() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + // Create remote log segment metadata and add them to RemoteLogMetadataCache. + + // segment 0 + // 0-100 Review comment: Could we make it clear this is for offset range? ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCacheTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * This class covers basic unit tests for {@link InmemoryRemoteLogMetadataManager}. InmemoryRemoteLogMetadataManager is + * used only in integration tests but not in production code. It mostly uses {@link RemoteLogMetadataCache} and it has + * broad test coverage with {@link RemoteLogMetadataCacheTest}. + */ +public class InmemoryRemoteLogMetadataManagerTest { + + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), + new TopicPartition("foo", 0)); + private static final int SEG_SIZE = 1024 * 1024; + private static final int BROKER_ID_0 = 0; + private static final int BROKER_ID_1 = 1; + + private final Time time = new MockTime(1); + + @Test + public void testFetchSegments() throws Exception { + InmemoryRemoteLogMetadataManager rlmm = new InmemoryRemoteLogMetadataManager(); + + // 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available. + Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 101L); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0, + time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); + rlmm.addRemoteLogSegmentMetadata(segmentMetadata); + + // Search should not return the above segment. + Assertions.assertFalse(rlmm.remoteLogSegmentMetadata(TP0, 0, 150).isPresent()); + + // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available. + RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), + RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate); + RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); + + // Search should return the above segment. + Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = rlmm.remoteLogSegmentMetadata(TP0, 0, 150); + Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); + } + + @Test + public void testRemotePartitionDeletion() throws Exception { + InmemoryRemoteLogMetadataManager rlmm = new InmemoryRemoteLogMetadataManager(); + + // Create remote log segment metadata and add them to RLMM. + + // segment 0 + // 0-100 Review comment: Could we make it clear this is for offset range? ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to leader epochs. + * <p> + * Remote log segment can go through the state transitions as mentioned in {@link RemoteLogSegmentState}. + * <p> + * This class will have all the segments which did not reach terminal state viz DELETE_SEGMENT_FINISHED. That means,any + * segment reaching the terminal state will get cleared from this instance. + * This class provides different methods to fetch segment metadata like {@link #remoteLogSegmentMetadata(int, long)}, + * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, {@link #listAllRemoteLogSegments()}. Those + * methods have different semantics to fetch the segment based on its state. + * <p> + * <ul> + * <li> + * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}: + * <br> + * Segment in this state indicates it is not yet copied successfully. So, these segments will not be + * accessible for reads but these are considered for cleanups when a partition is deleted. + * </li> + * <li> + * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}: + * <br> + * Segment in this state indicates it is successfully copied and it is available for reads. So, these segments + * will be accessible for reads. But this should be available for any cleanup activity like deleting segments by the + * caller of this class. + * </li> + * <li> + * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}: + * Segment in this state indicates it is getting deleted. That means, it is not available for reads. But it should be + * available for any cleanup activity like deleting segments by the caller of this class. + * </li> + * <li> + * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}: + * Segment in this state indicate it is already deleted. That means, it is not available for any activity including + * reads or cleanup activity. This cache will clear entries containing this state. + * </li> + * </ul> + * + * <p> Review comment: It would be useful to document the meaning of the following table. ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class RemoteLogMetadataCacheTest { + private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class); + + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), + new TopicPartition("foo", 0)); + private static final int SEG_SIZE = 1024 * 1024; + private static final int BROKER_ID_0 = 0; + private static final int BROKER_ID_1 = 1; + + private final Time time = new MockTime(1); + + @Test + public void testSegmentsLifeCycleInCache() throws Exception { + RemoteLogMetadataCache cache = new RemoteLogMetadataCache(); + // Create remote log segment metadata and add them to RemoteLogMetadataCache. + + // segment 0 + // 0-100 + // leader epochs (0,0), (1,20), (2,80) + Map<Integer, Long> segment0LeaderEpochs = new HashMap<>(); + segment0LeaderEpochs.put(0, 0L); + segment0LeaderEpochs.put(1, 20L); + segment0LeaderEpochs.put(2, 80L); + RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segment0Metadata = new RemoteLogSegmentMetadata(segment0Id, 0L, 100L, + -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segment0LeaderEpochs); + cache.addCopyInProgressSegment(segment0Metadata); + + // We should not get this as the segment is still getting copied and it is not yet considered successful until + // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED. + Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 1).isPresent()); + + RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate( + segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + cache.updateRemoteLogSegmentMetadata(segment0Update); + RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update); + + // segment 1 + // 101 - 200 + // no changes in leadership with in this segment + // leader epochs (2, 101) + Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 101L); + RemoteLogSegmentMetadata segment1Metadata = createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // segment 2 + // 201 - 300 + // moved to epoch 3 in between + // leader epochs (2, 201), (3, 240) + Map<Integer, Long> segment2LeaderEpochs = new HashMap<>(); + segment2LeaderEpochs.put(2, 201L); + segment2LeaderEpochs.put(3, 240L); + RemoteLogSegmentMetadata segment2Metadata = createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + // segment 3 + // 250 - 400 + // leader epochs (3, 250), (4, 370) + Map<Integer, Long> segment3LeaderEpochs = new HashMap<>(); + segment3LeaderEpochs.put(3, 250L); + segment3LeaderEpochs.put(4, 370L); + RemoteLogSegmentMetadata segment3Metadata = createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L, + RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + + ////////////////////////////////////////////////////////////////////////////////////////// + // Four segments are added with different boundaries and leader epochs. + // Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset) for different + // epochs and offsets + ////////////////////////////////////////////////////////////////////////////////////////// + + HashMap<EpochOffset, RemoteLogSegmentMetadata> expectedEpochOffsetToSegmentMetadata = new HashMap<>(); + // Existing metadata entries. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), expectedSegment0Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), segment1Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), segment2Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), segment3Metadata); + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), segment3Metadata); + + // Non existing metadata entries. + // Search for offset 110, epoch 1, and it should not exist. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), null); + // Search for non existing offset 401, epoch 4. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), null); + // Search for non existing epoch 5. + expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), null); + + for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : expectedEpochOffsetToSegmentMetadata.entrySet()) { + EpochOffset epochOffset = entry.getKey(); + Optional<RemoteLogSegmentMetadata> segmentMetadata = cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset); + RemoteLogSegmentMetadata expectedSegmentMetadata = entry.getValue(); + log.info("Searching for {} , result: {}, expected: {} ", epochOffset, segmentMetadata, Review comment: Is this logging needed? Ditto below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org