junrao commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r609073411



##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed 
by in-memory store.

Review comment:
       It would be useful to add a comment on whether the methods in this class 
are thread-safe or not.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * This class represents the in-memory state of segments associated with a 
leader epoch. This includes the mapping of offset to
+ * segment ids and unreferenced segments which are not mapped to any offset 
but they exist in remote storage.
+ * <p>
+ * This is used by {@link RemoteLogMetadataCache} to track the segments for 
each leader epoch.
+ */
+class RemoteLogLeaderEpochState {
+
+    // It contains offset to segment ids mapping with the segment state as 
COPY_SEGMENT_FINISHED.
+    private final NavigableMap<Long, RemoteLogSegmentId> offsetToId = new 
ConcurrentSkipListMap<>();
+
+    /**
+     * It represents unreferenced segments for this leader epoch. It contains 
the segments still in COPY_SEGMENT_STARTED
+     * and DELETE_SEGMENT_STARTED state or these have been replaced by callers 
with other segments having the same
+     * start offset for the leader epoch. These will be returned by {@link 
RemoteLogMetadataCache#listAllRemoteLogSegments()}
+     * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int 
leaderEpoch)} so that callers can clean them up if
+     * they still exist. These will be cleaned from the cache once they reach 
DELETE_SEGMENT_FINISHED state.
+     */
+    private final Set<RemoteLogSegmentId> unreferencedSegmentIds = 
ConcurrentHashMap.newKeySet();
+
+    // It represents the highest log offset of the segments that were updated 
with updateHighestLogOffset.
+    private volatile Long highestLogOffset;
+
+    /**
+     * Returns all the segments associated with this leader epoch sorted by 
start offset in ascending order.
+     *
+     * @param idToSegmentMetadata mapping of id to segment metadata. This will 
be used to get RemoteLogSegmentMetadata
+     *                            for an id to be used for sorting.
+     * @return
+     */
+    Iterator<RemoteLogSegmentMetadata> 
listAllRemoteLogSegments(Map<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata) {
+        // Return all the segments including unreferenced metadata.
+        int size = offsetToId.size() + unreferencedSegmentIds.size();
+        if (size == 0) {
+            return Collections.emptyIterator();
+        }
+
+        ArrayList<RemoteLogSegmentMetadata> metadataList = new 
ArrayList<>(size);
+        for (RemoteLogSegmentId id : offsetToId.values()) {
+            metadataList.add(idToSegmentMetadata.get(id));
+        }
+
+        if (!unreferencedSegmentIds.isEmpty()) {
+            for (RemoteLogSegmentId id : unreferencedSegmentIds) {
+                metadataList.add(idToSegmentMetadata.get(id));
+            }
+
+            // sort only when unreferenced entries exist as they are already 
sorted in offsetToId.
+            
metadataList.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset));
+        }
+
+        return metadataList.iterator();
+    }
+
+    void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
+                                                   Long leaderEpochEndOffset) {
+        // Add the segment epochs mapping as the segment is copied 
successfully.
+        RemoteLogSegmentId oldEntry = offsetToId.put(startOffset, 
remoteLogSegmentId);
+
+        // Remove the metadata from unreferenced entries as it is successfully 
copied and added to the offset mapping.
+        unreferencedSegmentIds.remove(remoteLogSegmentId);
+
+        // Add the old entry to unreferenced entries as the mapping is removed 
for the old entry.
+        if (oldEntry != null) {
+            unreferencedSegmentIds.add(oldEntry);
+        }
+
+        // Update the highest offset entry for this leader epoch as we added a 
new mapping.
+        maybeUpdateHighestLogOffset(leaderEpochEndOffset);
+    }
+
+    void handleSegmentWithDeleteSegmentStartedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
+                                                    Long leaderEpochEndOffset) 
{
+        // Remove the offset mappings as this segment is getting deleted.
+        offsetToId.remove(startOffset, remoteLogSegmentId);
+
+        // Add this entry to unreferenced set for the leader epoch as it is 
being deleted.
+        // This allows any retries of deletion as these are returned from 
listAllSegments and listSegments(leaderEpoch).
+        unreferencedSegmentIds.add(remoteLogSegmentId);
+
+        // Update the highest offset entry for this leader epoch. This needs 
to be done as a segment can reach this
+        // state without going through COPY_SEGMENT_FINISHED state.
+        maybeUpdateHighestLogOffset(leaderEpochEndOffset);

Review comment:
       It seems that it's inconsistent that we update highest log offset here 
but not in handleSegmentWithCopySegmentStartedState(). 
   
   Could we comment on whether highestLogOffset reflects the segments that have 
reached  COPY_SEGMENT_FINISHED or not?

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class RemoteLogMetadataCacheTest {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class);
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+
+    private final Time time = new MockTime(1);
+
+    @Test
+    public void testSegmentsLifeCycleInCache() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+        // Create remote log segment metadata and add them to 
RemoteLogMetadataCache.
+
+        // segment 0
+        // 0-100
+        // leader epochs (0,0), (1,20), (2,80)
+        Map<Integer, Long> segment0LeaderEpochs = new HashMap<>();
+        segment0LeaderEpochs.put(0, 0L);
+        segment0LeaderEpochs.put(1, 20L);
+        segment0LeaderEpochs.put(2, 80L);
+        RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segment0Metadata = new 
RemoteLogSegmentMetadata(segment0Id, 0L, 100L,
+                -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, 
segment0LeaderEpochs);
+        cache.addCopyInProgressSegment(segment0Metadata);
+
+        // We should not get this as the segment is still getting copied and 
it is not yet considered successful until
+        // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED.
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 
1).isPresent());
+
+        RemoteLogSegmentMetadataUpdate segment0Update = new 
RemoteLogSegmentMetadataUpdate(
+                segment0Id, time.milliseconds(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+        cache.updateRemoteLogSegmentMetadata(segment0Update);
+        RemoteLogSegmentMetadata expectedSegment0Metadata = 
segment0Metadata.createWithUpdates(segment0Update);
+
+        // segment 1
+        // 101 - 200
+        // no changes in leadership with in this segment
+        // leader epochs (2, 101)
+        Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 
101L);
+        RemoteLogSegmentMetadata segment1Metadata = 
createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 2
+        // 201 - 300
+        // moved to epoch 3 in between
+        // leader epochs (2, 201), (3, 240)
+        Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
+        segment2LeaderEpochs.put(2, 201L);
+        segment2LeaderEpochs.put(3, 240L);
+        RemoteLogSegmentMetadata segment2Metadata = 
createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 3
+        // 250 - 400
+        // leader epochs (3, 250), (4, 370)
+        Map<Integer, Long> segment3LeaderEpochs = new HashMap<>();
+        segment3LeaderEpochs.put(3, 250L);
+        segment3LeaderEpochs.put(4, 370L);
+        RemoteLogSegmentMetadata segment3Metadata = 
createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        
//////////////////////////////////////////////////////////////////////////////////////////
+        // Four segments are added with different boundaries and leader epochs.
+        // Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset)  for 
different
+        // epochs and offsets
+        
//////////////////////////////////////////////////////////////////////////////////////////
+
+        HashMap<EpochOffset, RemoteLogSegmentMetadata> 
expectedEpochOffsetToSegmentMetadata = new HashMap<>();
+        // Existing metadata entries.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), 
expectedSegment0Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), 
segment1Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), 
segment2Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), 
segment3Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), 
segment3Metadata);
+
+        // Non existing metadata entries.
+        // Search for offset 110, epoch 1, and it should not exist.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), 
null);
+        // Search for non existing offset 401, epoch 4.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), 
null);
+        // Search for non existing epoch 5.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), 
null);
+
+        for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : 
expectedEpochOffsetToSegmentMetadata.entrySet()) {
+            EpochOffset epochOffset = entry.getKey();
+            Optional<RemoteLogSegmentMetadata> segmentMetadata = 
cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset);
+            RemoteLogSegmentMetadata expectedSegmentMetadata = 
entry.getValue();
+            log.info("Searching for {} , result: {}, expected: {} ", 
epochOffset, segmentMetadata,
+                    expectedSegmentMetadata);
+            if (expectedSegmentMetadata != null) {
+                Assertions.assertEquals(Optional.of(expectedSegmentMetadata), 
segmentMetadata);
+            } else {
+                Assertions.assertFalse(segmentMetadata.isPresent());
+            }
+        }
+
+        // Update segment with state as DELETE_SEGMENT_STARTED.
+        // It should not be available when we search for that segment.
+        cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
+                time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 
10).isPresent());
+
+        // Update segment with state as DELETE_SEGMENT_FINISHED.
+        // It should not be available when we search for that segment.
+        cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
+                time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 
10).isPresent());
+
+        
//////////////////////////////////////////////////////////////////////////////////////////
+        //  Search for cache.highestLogOffset(leaderEpoch) for all the leader 
epochs
+        
//////////////////////////////////////////////////////////////////////////////////////////
+
+        Map<Integer, Long> expectedEpochToHighestOffset = new HashMap<>();
+        expectedEpochToHighestOffset.put(0, 19L);

Review comment:
       It's kind of weird that the segment with epoch 0 is already deleted and 
yet we still expect the highest offset for epoch 0 to be returned.

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class RemoteLogMetadataCacheTest {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class);
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+
+    private final Time time = new MockTime(1);
+
+    @Test
+    public void testSegmentsLifeCycleInCache() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+        // Create remote log segment metadata and add them to 
RemoteLogMetadataCache.
+
+        // segment 0
+        // 0-100
+        // leader epochs (0,0), (1,20), (2,80)
+        Map<Integer, Long> segment0LeaderEpochs = new HashMap<>();
+        segment0LeaderEpochs.put(0, 0L);
+        segment0LeaderEpochs.put(1, 20L);
+        segment0LeaderEpochs.put(2, 80L);
+        RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segment0Metadata = new 
RemoteLogSegmentMetadata(segment0Id, 0L, 100L,
+                -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, 
segment0LeaderEpochs);
+        cache.addCopyInProgressSegment(segment0Metadata);
+
+        // We should not get this as the segment is still getting copied and 
it is not yet considered successful until
+        // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED.
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 
1).isPresent());
+
+        RemoteLogSegmentMetadataUpdate segment0Update = new 
RemoteLogSegmentMetadataUpdate(
+                segment0Id, time.milliseconds(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+        cache.updateRemoteLogSegmentMetadata(segment0Update);
+        RemoteLogSegmentMetadata expectedSegment0Metadata = 
segment0Metadata.createWithUpdates(segment0Update);
+
+        // segment 1
+        // 101 - 200
+        // no changes in leadership with in this segment
+        // leader epochs (2, 101)
+        Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 
101L);
+        RemoteLogSegmentMetadata segment1Metadata = 
createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 2
+        // 201 - 300
+        // moved to epoch 3 in between
+        // leader epochs (2, 201), (3, 240)
+        Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
+        segment2LeaderEpochs.put(2, 201L);
+        segment2LeaderEpochs.put(3, 240L);
+        RemoteLogSegmentMetadata segment2Metadata = 
createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 3
+        // 250 - 400
+        // leader epochs (3, 250), (4, 370)
+        Map<Integer, Long> segment3LeaderEpochs = new HashMap<>();
+        segment3LeaderEpochs.put(3, 250L);
+        segment3LeaderEpochs.put(4, 370L);
+        RemoteLogSegmentMetadata segment3Metadata = 
createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        
//////////////////////////////////////////////////////////////////////////////////////////
+        // Four segments are added with different boundaries and leader epochs.
+        // Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset)  for 
different
+        // epochs and offsets
+        
//////////////////////////////////////////////////////////////////////////////////////////
+
+        HashMap<EpochOffset, RemoteLogSegmentMetadata> 
expectedEpochOffsetToSegmentMetadata = new HashMap<>();
+        // Existing metadata entries.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), 
expectedSegment0Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), 
segment1Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), 
segment2Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), 
segment3Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), 
segment3Metadata);
+
+        // Non existing metadata entries.
+        // Search for offset 110, epoch 1, and it should not exist.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), 
null);
+        // Search for non existing offset 401, epoch 4.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), 
null);
+        // Search for non existing epoch 5.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), 
null);
+
+        for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : 
expectedEpochOffsetToSegmentMetadata.entrySet()) {
+            EpochOffset epochOffset = entry.getKey();
+            Optional<RemoteLogSegmentMetadata> segmentMetadata = 
cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset);
+            RemoteLogSegmentMetadata expectedSegmentMetadata = 
entry.getValue();
+            log.info("Searching for {} , result: {}, expected: {} ", 
epochOffset, segmentMetadata,
+                    expectedSegmentMetadata);
+            if (expectedSegmentMetadata != null) {
+                Assertions.assertEquals(Optional.of(expectedSegmentMetadata), 
segmentMetadata);
+            } else {
+                Assertions.assertFalse(segmentMetadata.isPresent());
+            }
+        }
+
+        // Update segment with state as DELETE_SEGMENT_STARTED.
+        // It should not be available when we search for that segment.
+        cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
+                time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 
10).isPresent());
+
+        // Update segment with state as DELETE_SEGMENT_FINISHED.
+        // It should not be available when we search for that segment.
+        cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
+                time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 
10).isPresent());
+
+        
//////////////////////////////////////////////////////////////////////////////////////////
+        //  Search for cache.highestLogOffset(leaderEpoch) for all the leader 
epochs
+        
//////////////////////////////////////////////////////////////////////////////////////////
+
+        Map<Integer, Long> expectedEpochToHighestOffset = new HashMap<>();
+        expectedEpochToHighestOffset.put(0, 19L);
+        expectedEpochToHighestOffset.put(1, 79L);
+        expectedEpochToHighestOffset.put(2, 239L);
+        expectedEpochToHighestOffset.put(3, 369L);
+        expectedEpochToHighestOffset.put(4, 400L);
+
+        for (Map.Entry<Integer, Long> entry : 
expectedEpochToHighestOffset.entrySet()) {
+            Integer epoch = entry.getKey();
+            Long expectedOffset = entry.getValue();
+            Optional<Long> offset = cache.highestOffsetForEpoch(epoch);
+            log.info("Fetching highest offset for epoch: {} , returned: {} , 
expected: {}", epoch, offset, expectedOffset);
+            Assertions.assertEquals(Optional.of(expectedOffset), offset);
+        }
+
+        // Search for non existing leader epoch
+        Optional<Long> highestOffsetForEpoch5 = cache.highestOffsetForEpoch(5);
+        Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
+    }
+
+    private RemoteLogSegmentMetadata 
createSegmentUpdateWithState(RemoteLogMetadataCache cache,
+                                                                  Map<Integer, 
Long> segmentLeaderEpochs,
+                                                                  long 
startOffset,
+                                                                  long 
endOffset,
+                                                                  
RemoteLogSegmentState state)
+            throws RemoteResourceNotFoundException {
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L,
+                BROKER_ID_0, time.milliseconds(), SEG_SIZE, 
segmentLeaderEpochs);
+        cache.addCopyInProgressSegment(segmentMetadata);
+
+        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId,
+                time.milliseconds(), state, BROKER_ID_1);
+        cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
+
+        return segmentMetadata.createWithUpdates(segMetadataUpdate);
+    }
+
+    @Test
+    public void testCacheSegmentWithCopySegmentStartedState() {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Create a segment with state COPY_SEGMENT_STARTED, and check for 
searching that segment and listing the
+        // segments.
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
+                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
0L));
+        cache.addCopyInProgressSegment(segmentMetadata);
+
+        // This segment should not be available as the state is not reached to 
COPY_SEGMENT_FINISHED.
+        Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 = 
cache.remoteLogSegmentMetadata(0, 0);
+        Assertions.assertFalse(segMetadataForOffset0Epoch0.isPresent());
+
+        // cache.listRemoteLogSegments APIs should contain the above segment.
+        checkListSegments(cache, 0, segmentMetadata);
+    }
+
+    @Test
+    public void testCacheSegmentWithCopySegmentFinishedState() throws 
Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Create a segment and move it to state COPY_SEGMENT_FINISHED. and 
check for searching that segment and
+        // listing the segments.
+        RemoteLogSegmentMetadata segmentMetadata = 
createSegmentUpdateWithState(cache, Collections.singletonMap(0, 101L),
+                101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 = 
cache.remoteLogSegmentMetadata(0, 150);
+        Assertions.assertEquals(Optional.of(segmentMetadata), 
segMetadataForOffset150);
+
+        // cache.listRemoteLogSegments should contain the above segments.
+        checkListSegments(cache, 0, segmentMetadata);
+    }
+
+    @Test
+    public void testCacheSegmentWithDeleteSegmentStartedState() throws 
Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Create a segment and move it to state DELETE_SEGMENT_STARTED, and 
check for searching that segment and
+        // listing the segments.
+        RemoteLogSegmentMetadata segmentMetadata = 
createSegmentUpdateWithState(cache, Collections.singletonMap(0, 201L),
+                201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+
+        // Search should not return the above segment as their leader epoch 
state is cleared.
+        Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset250Epoch0 = 
cache.remoteLogSegmentMetadata(0, 250);
+        Assertions.assertFalse(segmentMetadataForOffset250Epoch0.isPresent());
+
+        checkListSegments(cache, 0, segmentMetadata);
+    }
+
+    @Test
+    public void testCacheSegmentsWithDeleteSegmentFinishedState() throws 
Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Create a segment and move it to state DELETE_SEGMENT_FINISHED, and 
check for searching that segment and
+        // listing the segments.
+        RemoteLogSegmentMetadata segmentMetadata = 
createSegmentUpdateWithState(cache, Collections.singletonMap(0, 301L),
+                301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+
+        // Search should not return the above segment as their leader epoch 
state is cleared.
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 
350).isPresent());
+
+        RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
+                time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1);
+        cache.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
+
+        // listRemoteLogSegments(0) and listRemoteLogSegments() should not 
contain the above segment.
+        Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
+        Assertions.assertFalse(cache.listAllRemoteLogSegments().hasNext());
+    }
+
+    @Test
+    public void testCacheListSegments() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Create a few segments and add them to the cache.
+        RemoteLogSegmentMetadata segment0 = 
createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0, 100,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        RemoteLogSegmentMetadata segment1 = 
createSegmentUpdateWithState(cache, Collections.singletonMap(0, 101L), 101, 200,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
+        segment2LeaderEpochs.put(0, 201L);
+        segment2LeaderEpochs.put(1, 301L);
+        RemoteLogSegmentMetadata segment2 = 
createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201, 400,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // listRemoteLogSegments(0) and listAllRemoteLogSegments() should 
contain all the above segments.
+        List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = 
Arrays.asList(segment0, segment1, segment2);
+        
Assertions.assertTrue(TestUtils.sameElementsWithOrder(cache.listRemoteLogSegments(0),
+                expectedSegmentsForEpoch0.iterator()));
+        
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(cache.listAllRemoteLogSegments(),
+                expectedSegmentsForEpoch0.iterator()));
+
+        // listRemoteLogSegments(0) should contain only segment2.

Review comment:
       listRemoteLogSegments(0) => listRemoteLogSegments(1)

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class InmemoryRemoteStorageManagerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManagerTest.class);
+
+    private static final TopicPartition TP = new TopicPartition("foo", 1);
+    private static final File DIR = TestUtils.tempDirectory("inmem-rsm-");
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testCopyLogSegment() throws Exception {
+        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
+        RemoteLogSegmentMetadata segmentMetadata = 
createRemoteLogSegmentMetadata();
+        LogSegmentData logSegmentData = createLogSegmentData();
+        // Copy all the segment data.
+        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
+
+        // Check that the segment data exists in in-memory RSM.
+        boolean containsSegment = 
rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForSegment(segmentMetadata));
+        Assertions.assertTrue(containsSegment);
+
+        // Check that the indexes exist in in-memory RSM.
+        for (RemoteStorageManager.IndexType indexType : 
RemoteStorageManager.IndexType.values()) {
+            boolean containsIndex = 
rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForIndex(segmentMetadata,
 indexType));
+            Assertions.assertTrue(containsIndex);
+        }
+    }
+
+    private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
+        TopicIdPartition topicPartition = new 
TopicIdPartition(Uuid.randomUuid(), TP);
+        RemoteLogSegmentId id = new RemoteLogSegmentId(topicPartition, 
Uuid.randomUuid());
+        return new RemoteLogSegmentMetadata(id, 100L, 200L, 
System.currentTimeMillis(), 0,
+                System.currentTimeMillis(), 100, Collections.singletonMap(1, 
100L));
+    }
+
+    @Test
+    public void testFetchLogSegmentIndexes() throws Exception {
+        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
+        RemoteLogSegmentMetadata segmentMetadata = 
createRemoteLogSegmentMetadata();
+        int segSize = 100;
+        LogSegmentData logSegmentData = createLogSegmentData(segSize);
+
+        // Copy the segment
+        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
+
+        // Check segment data exists for the copied segment.
+        try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 
0)) {
+            checkContentSame(segmentStream, logSegmentData.logSegment());
+        }
+
+        HashMap<RemoteStorageManager.IndexType, Path> expectedIndexToPaths = 
new HashMap<>();
+        expectedIndexToPaths.put(RemoteStorageManager.IndexType.OFFSET, 
logSegmentData.offsetIndex());
+        expectedIndexToPaths.put(RemoteStorageManager.IndexType.TIMESTAMP, 
logSegmentData.timeIndex());
+        expectedIndexToPaths.put(RemoteStorageManager.IndexType.TRANSACTION, 
logSegmentData.txnIndex());
+        
expectedIndexToPaths.put(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 
logSegmentData.producerSnapshotIndex());
+
+        // Check all segment indexes exist for the copied segment.
+        for (Map.Entry<RemoteStorageManager.IndexType, Path> entry : 
expectedIndexToPaths.entrySet()) {
+            RemoteStorageManager.IndexType indexType = entry.getKey();
+            Path indexPath = entry.getValue();
+            log.info("Fetching index type: {}, indexPath: {}", indexType, 
indexPath);

Review comment:
       Is this logging needed? Does it need to be in info level?

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCacheTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class covers basic unit tests for {@link 
InmemoryRemoteLogMetadataManager}. InmemoryRemoteLogMetadataManager is
+ * used only in integration tests but not in production code. It mostly uses 
{@link RemoteLogMetadataCache} and it has
+ * broad test coverage with {@link RemoteLogMetadataCacheTest}.
+ */
+public class InmemoryRemoteLogMetadataManagerTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+
+    private final Time time = new MockTime(1);
+
+    @Test
+    public void testFetchSegments() throws Exception {
+        InmemoryRemoteLogMetadataManager rlmm = new 
InmemoryRemoteLogMetadataManager();
+
+        // 1.Create a segment with state COPY_SEGMENT_STARTED, and this 
segment should not be available.
+        Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 
101L);
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0,
+                time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
+        rlmm.addRemoteLogSegmentMetadata(segmentMetadata);
+
+        // Search should not return the above segment.
+        Assertions.assertFalse(rlmm.remoteLogSegmentMetadata(TP0, 0, 
150).isPresent());
+
+        // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment 
should be available.
+        RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+        rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
+        RemoteLogSegmentMetadata expectedSegmentMetadata = 
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = 
rlmm.remoteLogSegmentMetadata(TP0, 0, 150);
+        Assertions.assertEquals(Optional.of(expectedSegmentMetadata), 
segmentMetadataForOffset150);
+    }
+
+    @Test
+    public void testRemotePartitionDeletion() throws Exception {
+        InmemoryRemoteLogMetadataManager rlmm = new 
InmemoryRemoteLogMetadataManager();
+
+        // Create remote log segment metadata and add them to RLMM.
+
+        // segment 0
+        // 0-100
+        // leader epochs (0,0), (1,20), (2,80)
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        segmentLeaderEpochs.put(1, 20L);
+        segmentLeaderEpochs.put(2, 50L);
+        segmentLeaderEpochs.put(3, 80L);
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 100L,
+                -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, 
segmentLeaderEpochs);
+        rlmm.addRemoteLogSegmentMetadata(segmentMetadata);
+        RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                segmentId, time.milliseconds(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+        rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
+
+        RemoteLogSegmentMetadata expectedSegMetadata = 
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+        // Check that the seg exists in RLMM
+        Optional<RemoteLogSegmentMetadata> segMetadataForOffset30Epoch1 = 
rlmm.remoteLogSegmentMetadata(TP0, 1, 30L);
+        Assertions.assertEquals(Optional.of(expectedSegMetadata), 
segMetadataForOffset30Epoch1);
+
+        // Mark the partition for deletion. RLMM should clear all its internal 
state for that partition.

Review comment:
       At this point, RLMM hasn't cleared all its internal state yet.

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class RemoteLogMetadataCacheTest {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class);
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+
+    private final Time time = new MockTime(1);
+
+    @Test
+    public void testSegmentsLifeCycleInCache() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+        // Create remote log segment metadata and add them to 
RemoteLogMetadataCache.
+
+        // segment 0
+        // 0-100

Review comment:
       Could we make it clear this is for offset range?

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCacheTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class covers basic unit tests for {@link 
InmemoryRemoteLogMetadataManager}. InmemoryRemoteLogMetadataManager is
+ * used only in integration tests but not in production code. It mostly uses 
{@link RemoteLogMetadataCache} and it has
+ * broad test coverage with {@link RemoteLogMetadataCacheTest}.
+ */
+public class InmemoryRemoteLogMetadataManagerTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+
+    private final Time time = new MockTime(1);
+
+    @Test
+    public void testFetchSegments() throws Exception {
+        InmemoryRemoteLogMetadataManager rlmm = new 
InmemoryRemoteLogMetadataManager();
+
+        // 1.Create a segment with state COPY_SEGMENT_STARTED, and this 
segment should not be available.
+        Map<Integer, Long> segmentLeaderEpochs = Collections.singletonMap(0, 
101L);
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0,
+                time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
+        rlmm.addRemoteLogSegmentMetadata(segmentMetadata);
+
+        // Search should not return the above segment.
+        Assertions.assertFalse(rlmm.remoteLogSegmentMetadata(TP0, 0, 
150).isPresent());
+
+        // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment 
should be available.
+        RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+        rlmm.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
+        RemoteLogSegmentMetadata expectedSegmentMetadata = 
segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> segmentMetadataForOffset150 = 
rlmm.remoteLogSegmentMetadata(TP0, 0, 150);
+        Assertions.assertEquals(Optional.of(expectedSegmentMetadata), 
segmentMetadataForOffset150);
+    }
+
+    @Test
+    public void testRemotePartitionDeletion() throws Exception {
+        InmemoryRemoteLogMetadataManager rlmm = new 
InmemoryRemoteLogMetadataManager();
+
+        // Create remote log segment metadata and add them to RLMM.
+
+        // segment 0
+        // 0-100

Review comment:
       Could we make it clear this is for offset range?

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * <p>
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * <p>
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * <p>
+ * <ul>
+ * <li>
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * <br>
+ * Segment in this state indicates it is not yet copied successfully. So, 
these segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * </li>
+ * <li>
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * <br>
+ * Segment in this state indicates it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * </li>
+ * <li>
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicates it is getting deleted. That means, it is 
not available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * </li>
+ * <li>
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * </li>
+ * </ul>
+ *
+ * <p>

Review comment:
       It would be useful to document the meaning of the following table.

##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class RemoteLogMetadataCacheTest {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class);
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+
+    private final Time time = new MockTime(1);
+
+    @Test
+    public void testSegmentsLifeCycleInCache() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+        // Create remote log segment metadata and add them to 
RemoteLogMetadataCache.
+
+        // segment 0
+        // 0-100
+        // leader epochs (0,0), (1,20), (2,80)
+        Map<Integer, Long> segment0LeaderEpochs = new HashMap<>();
+        segment0LeaderEpochs.put(0, 0L);
+        segment0LeaderEpochs.put(1, 20L);
+        segment0LeaderEpochs.put(2, 80L);
+        RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segment0Metadata = new 
RemoteLogSegmentMetadata(segment0Id, 0L, 100L,
+                -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, 
segment0LeaderEpochs);
+        cache.addCopyInProgressSegment(segment0Metadata);
+
+        // We should not get this as the segment is still getting copied and 
it is not yet considered successful until
+        // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED.
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 
1).isPresent());
+
+        RemoteLogSegmentMetadataUpdate segment0Update = new 
RemoteLogSegmentMetadataUpdate(
+                segment0Id, time.milliseconds(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+        cache.updateRemoteLogSegmentMetadata(segment0Update);
+        RemoteLogSegmentMetadata expectedSegment0Metadata = 
segment0Metadata.createWithUpdates(segment0Update);
+
+        // segment 1
+        // 101 - 200
+        // no changes in leadership with in this segment
+        // leader epochs (2, 101)
+        Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 
101L);
+        RemoteLogSegmentMetadata segment1Metadata = 
createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 2
+        // 201 - 300
+        // moved to epoch 3 in between
+        // leader epochs (2, 201), (3, 240)
+        Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
+        segment2LeaderEpochs.put(2, 201L);
+        segment2LeaderEpochs.put(3, 240L);
+        RemoteLogSegmentMetadata segment2Metadata = 
createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 3
+        // 250 - 400
+        // leader epochs (3, 250), (4, 370)
+        Map<Integer, Long> segment3LeaderEpochs = new HashMap<>();
+        segment3LeaderEpochs.put(3, 250L);
+        segment3LeaderEpochs.put(4, 370L);
+        RemoteLogSegmentMetadata segment3Metadata = 
createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        
//////////////////////////////////////////////////////////////////////////////////////////
+        // Four segments are added with different boundaries and leader epochs.
+        // Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset)  for 
different
+        // epochs and offsets
+        
//////////////////////////////////////////////////////////////////////////////////////////
+
+        HashMap<EpochOffset, RemoteLogSegmentMetadata> 
expectedEpochOffsetToSegmentMetadata = new HashMap<>();
+        // Existing metadata entries.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), 
expectedSegment0Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), 
segment1Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), 
segment2Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), 
segment3Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), 
segment3Metadata);
+
+        // Non existing metadata entries.
+        // Search for offset 110, epoch 1, and it should not exist.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), 
null);
+        // Search for non existing offset 401, epoch 4.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), 
null);
+        // Search for non existing epoch 5.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), 
null);
+
+        for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : 
expectedEpochOffsetToSegmentMetadata.entrySet()) {
+            EpochOffset epochOffset = entry.getKey();
+            Optional<RemoteLogSegmentMetadata> segmentMetadata = 
cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset);
+            RemoteLogSegmentMetadata expectedSegmentMetadata = 
entry.getValue();
+            log.info("Searching for {} , result: {}, expected: {} ", 
epochOffset, segmentMetadata,

Review comment:
       Is this logging needed? Ditto below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to