kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r598964030



##########
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##########
@@ -21,14 +21,16 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
  * It indicates the state of the remote log segment. This will be based on the 
action executed on this

Review comment:
       You can drop `It` and start with `Indicates the state...`.

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed 
by inmemory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
+
+    private final ConcurrentMap<TopicIdPartition, 
RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
+            new ConcurrentHashMap<>();
+
+    private final ConcurrentMap<TopicIdPartition, RemoteLogMetadataCache> 
partitionToRemoteLogMetadataCache =
+            new ConcurrentHashMap<>();
+
+    @Override
+    public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata)
+            throws RemoteStorageException {
+        log.debug("Adding remote log segment : [{}]", 
remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        // this method is allowed only to add remote log segment with the 
initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
+        // but not to update the existing remote log segment metadata.
+        if (remoteLogSegmentMetadata.state() != 
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {

Review comment:
       Can this be checked inside `RemoteLogMetadataCache.addToInProgress()` 
instead of here?

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by 
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+    // map of key to log data, which can be segment or any of its indexes.
+    private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+    public InmemoryRemoteStorageManager() {
+    }
+
+    static String generateKeyForSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
".segment";
+    }
+
+    static String generateKeyForIndex(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                      IndexType indexType) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
"." + indexType.toString();
+    }
+
+    // visible for testing.
+    boolean containsKey(String key) {
+        return keyToLogData.containsKey(key);
+    }
+
+    @Override
+    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                   LogSegmentData logSegmentData)
+            throws RemoteStorageException {
+        log.debug("copying log segment and indexes for : {}", 
remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+        Objects.requireNonNull(logSegmentData, "logSegmentData can not be 
null");
+        try {
+            keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+                    Files.readAllBytes(logSegmentData.logSegment().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Offset),
+                    Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Timestamp),
+                    Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Transaction),
+                    Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.LeaderEpoch),
+                    logSegmentData.leaderEpochIndex().array());
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.ProducerSnapshot),
+                    
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+        } catch (IOException e) {
+            throw new RemoteStorageException(e.getMessage(), e);

Review comment:
       We could add a c'tor overload to 
[RemoteStorageException](https://github.com/apache/kafka/blob/0d9a95a7d0ab06aecc4480901707e29dd2a3147e/clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageException.java)
 that takes a `Throwable` as argument, it would the need to pass 2 args here.

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by 
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+    // map of key to log data, which can be segment or any of its indexes.
+    private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+    public InmemoryRemoteStorageManager() {
+    }
+
+    static String generateKeyForSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
".segment";
+    }
+
+    static String generateKeyForIndex(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                      IndexType indexType) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
"." + indexType.toString();
+    }
+
+    // visible for testing.
+    boolean containsKey(String key) {
+        return keyToLogData.containsKey(key);
+    }
+
+    @Override
+    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                   LogSegmentData logSegmentData)
+            throws RemoteStorageException {
+        log.debug("copying log segment and indexes for : {}", 
remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+        Objects.requireNonNull(logSegmentData, "logSegmentData can not be 
null");
+        try {
+            keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+                    Files.readAllBytes(logSegmentData.logSegment().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Offset),
+                    Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Timestamp),
+                    Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Transaction),
+                    Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.LeaderEpoch),
+                    logSegmentData.leaderEpochIndex().array());
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.ProducerSnapshot),
+                    
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+        } catch (IOException e) {
+            throw new RemoteStorageException(e.getMessage(), e);
+        }
+        log.debug("copied log segment and indexes for : {} successfully.", 
remoteLogSegmentMetadata);
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                       int startPosition)
+            throws RemoteStorageException {
+        log.debug("Received fetch segment request at start position: [{}] for 
[{}]", startPosition, remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        return fetchLogSegment(remoteLogSegmentMetadata, startPosition, 
Integer.MAX_VALUE);
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                       int startPosition,
+                                       int endPosition) throws 
RemoteStorageException {
+        log.debug("Received fetch segment request at start position: [{}] and 
end position: [{}] for segment [{}]",
+                startPosition, endPosition, remoteLogSegmentMetadata);
+
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        if (startPosition < 0 || endPosition < 0) {
+            throw new IllegalArgumentException("Given start position or end 
position must not be negative.");
+        }
+
+        if (endPosition < startPosition) {
+            throw new IllegalArgumentException("end position must be greater 
than start position");
+        }
+
+        String key = generateKeyForSegment(remoteLogSegmentMetadata);
+        byte[] segment = keyToLogData.get(key);
+
+        if (segment == null) {
+            throw new RemoteResourceNotFoundException("No remote log segment 
found with start offset:"
+                                                      + 
remoteLogSegmentMetadata.startOffset() + " and id: "
+                                                      + 
remoteLogSegmentMetadata.remoteLogSegmentId());
+        }
+
+        if (startPosition >= segment.length) {
+            throw new IllegalArgumentException("start position: " + 
startPosition
+                                               + " must be less than the 
length of the segment: " + segment.length);
+        }
+
+        // check for boundaries like given end position is more than the 
length, length should never be more than the
+        // existing segment size.
+        int length = Math.min(segment.length - 1, endPosition) - startPosition 
+ 1;

Review comment:
       Hmm, do we need to explicitly check if `endPosition` < `segment.length`? 

##########
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteState.java
##########
@@ -83,4 +85,25 @@ public static RemotePartitionDeleteState forId(byte id) {
         return STATE_TYPES.get(id);
     }
 
+    public static boolean isValidTransition(RemotePartitionDeleteState 
srcState,
+                                            RemotePartitionDeleteState 
targetState) {
+        Objects.requireNonNull(targetState, "targetState can not be null");
+
+        if (srcState == null) {
+            // If the source state is null, check the target state as the 
initial state viz DELETE_PARTITION_MARKED
+            // Wanted to keep this logic simple here by taking null for 
srcState, instead of creating one more state like
+            // DELETE_PARTITION_NOT_MARKED and have the null check by caller 
and pass that state.

Review comment:
       IMHO, we can simplify this to say:
   
   ```
   // If the source state is null, check the target state as the initial state 
viz DELETE_PARTITION_MARKED.
   // This ensures simplicity as we don't have to define an additional state 
type to represent the initial state.
   ```

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID = 0;
+
+    @Test
+    public void testCacheSegmentsWithDifferentStates() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Add segments with different states and check 
cache.remoteLogSegmentMetadata(int leaderEpoch, long offset)
+        // cache.listRemoteLogSegments(int leaderEpoch), and 
cache.listAllRemoteLogSegments().
+
+        // 
=============================================================================================================
+        // 1.Create a segment with state COPY_SEGMENT_STARTED, and check for 
searching that segment and listing the
+        // segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg0leaderEpochs = Collections.singletonMap(0, 0L);
+        RemoteLogSegmentId seg0Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segCopyInProgress = new 
RemoteLogSegmentMetadata(seg0Id, 0L, 50L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+        cache.addToInProgress(segCopyInProgress);
+
+        // This segment should not be available as the state is not reached to 
COPY_SEGMENT_FINISHED.
+        Optional<RemoteLogSegmentMetadata> seg0s0e0 = 
cache.remoteLogSegmentMetadata(0, 0);
+        Assertions.assertFalse(seg0s0e0.isPresent());
+
+        // cache.listRemoteLogSegments(0) should not contain the above 
segment, it will be empty.
+        Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
+        // But cache.listRemoteLogSegments() should contain the above segment.
+        checkContainsAll(cache.listAllRemoteLogSegments(), 
Collections.singletonList(segCopyInProgress));
+
+        // 
=============================================================================================================
+        // 2.Create a segment and move it to state COPY_SEGMENT_FINISHED. and 
check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg1leaderEpochs = Collections.singletonMap(0, 
101L);
+        RemoteLogSegmentId seg1Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg1 = new RemoteLogSegmentMetadata(seg1Id, 
101L, 200L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg1leaderEpochs);
+        cache.addToInProgress(seg1);
+        RemoteLogSegmentMetadataUpdate seg1Update = new 
RemoteLogSegmentMetadataUpdate(seg1Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg1Update);
+        RemoteLogSegmentMetadata segCopyFinished = 
seg1.createRemoteLogSegmentWithUpdates(seg1Update);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg1S150 = 
cache.remoteLogSegmentMetadata(0, 150);
+        
Assertions.assertEquals(seg1.createRemoteLogSegmentWithUpdates(seg1Update), 
seg1S150.orElse(null));
+
+        // cache.listRemoteLogSegments(0) should not contain the above segment.
+        checkContainsAll(cache.listRemoteLogSegments(0), 
Collections.singletonList(segCopyFinished));
+        // But cache.listRemoteLogSegments() should contain both the segments.
+        checkContainsAll(cache.listAllRemoteLogSegments(), 
Arrays.asList(segCopyInProgress, segCopyFinished));
+
+        // 
=============================================================================================================
+        // 3.Create a segment and move it to state DELETE_SEGMENT_STARTED, and 
check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg2leaderEpochs = Collections.singletonMap(0, 
201L);
+        RemoteLogSegmentId seg2Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg2 = new RemoteLogSegmentMetadata(seg2Id, 
201L, 300L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg2leaderEpochs);
+        cache.addToInProgress(seg2);
+        RemoteLogSegmentMetadataUpdate seg2Update = new 
RemoteLogSegmentMetadataUpdate(seg2Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg2Update);
+        RemoteLogSegmentMetadata segDeleteStarted = 
seg2.createRemoteLogSegmentWithUpdates(seg2Update);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg2S250 = 
cache.remoteLogSegmentMetadata(0, 250);
+        
Assertions.assertEquals(seg2.createRemoteLogSegmentWithUpdates(seg2Update), 
seg2S250.orElse(null));
+
+        // cache.listRemoteLogSegments(0) should contain the above segment.
+        checkContainsAll(cache.listRemoteLogSegments(0), 
Arrays.asList(segCopyFinished, segDeleteStarted));
+        // But cache.listRemoteLogSegments() should contain all the segments.
+        checkContainsAll(cache.listAllRemoteLogSegments(),
+                Arrays.asList(segCopyInProgress, segCopyFinished, 
segDeleteStarted));
+
+        // 
=============================================================================================================
+        // 4.Create a segment and move it to state DELETE_SEGMENT_FINISHED, 
and check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg3leaderEpochs = Collections.singletonMap(0, 
301L);
+        RemoteLogSegmentId seg3Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg3 = new RemoteLogSegmentMetadata(seg3Id, 
301L, 400L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg3leaderEpochs);
+        cache.addToInProgress(seg3);
+        RemoteLogSegmentMetadataUpdate seg3Update1 = new 
RemoteLogSegmentMetadataUpdate(seg3Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg3Update1);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg3S350 = 
cache.remoteLogSegmentMetadata(0, 350);
+        
Assertions.assertEquals(seg3.createRemoteLogSegmentWithUpdates(seg3Update1), 
seg3S350.orElse(null));
+
+        RemoteLogSegmentMetadataUpdate seg3Update2 = new 
RemoteLogSegmentMetadataUpdate(seg3Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg3Update2);
+
+        // cache.listRemoteLogSegments(0) should not contain the above segment.
+        checkContainsAll(cache.listRemoteLogSegments(0), 
Arrays.asList(segCopyFinished, segDeleteStarted));
+        // But cache.listRemoteLogSegments() should not contain both the 
segments as it should have been removed.
+        checkContainsAll(cache.listAllRemoteLogSegments(),
+                Arrays.asList(segCopyInProgress, segCopyFinished, 
segDeleteStarted));
+    }
+
+    private void checkContainsAll(Iterator<RemoteLogSegmentMetadata> 
allSegments,

Review comment:
       The implementation compromises on the ordering, since it converts the 
iterator to a set. Is that intentional?

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID = 0;
+
+    @Test
+    public void testCacheSegmentsWithDifferentStates() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Add segments with different states and check 
cache.remoteLogSegmentMetadata(int leaderEpoch, long offset)
+        // cache.listRemoteLogSegments(int leaderEpoch), and 
cache.listAllRemoteLogSegments().
+
+        // 
=============================================================================================================
+        // 1.Create a segment with state COPY_SEGMENT_STARTED, and check for 
searching that segment and listing the
+        // segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg0leaderEpochs = Collections.singletonMap(0, 0L);
+        RemoteLogSegmentId seg0Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segCopyInProgress = new 
RemoteLogSegmentMetadata(seg0Id, 0L, 50L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+        cache.addToInProgress(segCopyInProgress);
+
+        // This segment should not be available as the state is not reached to 
COPY_SEGMENT_FINISHED.
+        Optional<RemoteLogSegmentMetadata> seg0s0e0 = 
cache.remoteLogSegmentMetadata(0, 0);
+        Assertions.assertFalse(seg0s0e0.isPresent());
+
+        // cache.listRemoteLogSegments(0) should not contain the above 
segment, it will be empty.
+        Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
+        // But cache.listRemoteLogSegments() should contain the above segment.
+        checkContainsAll(cache.listAllRemoteLogSegments(), 
Collections.singletonList(segCopyInProgress));
+
+        // 
=============================================================================================================
+        // 2.Create a segment and move it to state COPY_SEGMENT_FINISHED. and 
check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg1leaderEpochs = Collections.singletonMap(0, 
101L);
+        RemoteLogSegmentId seg1Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg1 = new RemoteLogSegmentMetadata(seg1Id, 
101L, 200L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg1leaderEpochs);
+        cache.addToInProgress(seg1);
+        RemoteLogSegmentMetadataUpdate seg1Update = new 
RemoteLogSegmentMetadataUpdate(seg1Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg1Update);
+        RemoteLogSegmentMetadata segCopyFinished = 
seg1.createRemoteLogSegmentWithUpdates(seg1Update);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg1S150 = 
cache.remoteLogSegmentMetadata(0, 150);
+        
Assertions.assertEquals(seg1.createRemoteLogSegmentWithUpdates(seg1Update), 
seg1S150.orElse(null));
+
+        // cache.listRemoteLogSegments(0) should not contain the above segment.
+        checkContainsAll(cache.listRemoteLogSegments(0), 
Collections.singletonList(segCopyFinished));
+        // But cache.listRemoteLogSegments() should contain both the segments.
+        checkContainsAll(cache.listAllRemoteLogSegments(), 
Arrays.asList(segCopyInProgress, segCopyFinished));
+
+        // 
=============================================================================================================
+        // 3.Create a segment and move it to state DELETE_SEGMENT_STARTED, and 
check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg2leaderEpochs = Collections.singletonMap(0, 
201L);
+        RemoteLogSegmentId seg2Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg2 = new RemoteLogSegmentMetadata(seg2Id, 
201L, 300L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg2leaderEpochs);
+        cache.addToInProgress(seg2);
+        RemoteLogSegmentMetadataUpdate seg2Update = new 
RemoteLogSegmentMetadataUpdate(seg2Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg2Update);
+        RemoteLogSegmentMetadata segDeleteStarted = 
seg2.createRemoteLogSegmentWithUpdates(seg2Update);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg2S250 = 
cache.remoteLogSegmentMetadata(0, 250);
+        
Assertions.assertEquals(seg2.createRemoteLogSegmentWithUpdates(seg2Update), 
seg2S250.orElse(null));
+
+        // cache.listRemoteLogSegments(0) should contain the above segment.
+        checkContainsAll(cache.listRemoteLogSegments(0), 
Arrays.asList(segCopyFinished, segDeleteStarted));
+        // But cache.listRemoteLogSegments() should contain all the segments.
+        checkContainsAll(cache.listAllRemoteLogSegments(),
+                Arrays.asList(segCopyInProgress, segCopyFinished, 
segDeleteStarted));
+
+        // 
=============================================================================================================
+        // 4.Create a segment and move it to state DELETE_SEGMENT_FINISHED, 
and check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg3leaderEpochs = Collections.singletonMap(0, 
301L);
+        RemoteLogSegmentId seg3Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg3 = new RemoteLogSegmentMetadata(seg3Id, 
301L, 400L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg3leaderEpochs);
+        cache.addToInProgress(seg3);
+        RemoteLogSegmentMetadataUpdate seg3Update1 = new 
RemoteLogSegmentMetadataUpdate(seg3Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg3Update1);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg3S350 = 
cache.remoteLogSegmentMetadata(0, 350);
+        
Assertions.assertEquals(seg3.createRemoteLogSegmentWithUpdates(seg3Update1), 
seg3S350.orElse(null));

Review comment:
       Could we assert just before this line that `seg3S350` is not empty? this 
will simplify the `seg3S350.orElse(null)` argument to `seg3S350.get()`.
   
   (same comment applies for other places in this test)

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are 
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+    private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata
+            = new ConcurrentHashMap<>();
+
+    // It keeps the segments which are not yet reached to 
COPY_SEGMENT_FINISHED state.
+    private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new 
HashSet<>();

Review comment:
       This is defined to be not thread safe unlike the other maps. Is there 
any reason?

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed 
by inmemory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
+
+    private final ConcurrentMap<TopicIdPartition, 
RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
+            new ConcurrentHashMap<>();
+
+    private final ConcurrentMap<TopicIdPartition, RemoteLogMetadataCache> 
partitionToRemoteLogMetadataCache =

Review comment:
       Could we call this `idToRemoteLogMetadataCache` to align with the naming 
of the other attribute thats called `idToPartitionDeleteMetadata` ?

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class InmemoryRemoteLogMetadataManagerTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID = 0;
+
+    @Test
+    public void testRLMMFetchSegment() throws Exception {
+        InmemoryRemoteLogMetadataManager rlmm = new 
InmemoryRemoteLogMetadataManager();
+        int brokerId = 0;
+        // Create remote log segment metadata and add them to RLMM.
+
+        // segment 0
+        // 0-100
+        // leader epochs (0,0), (1,20), (2,80)
+        Map<Integer, Long> seg0leaderEpochs = new HashMap<>();
+        seg0leaderEpochs.put(0, 0L);
+        seg0leaderEpochs.put(1, 20L);
+        seg0leaderEpochs.put(2, 80L);
+        RemoteLogSegmentId segIdFooTp0s0e100 = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segMetFooTp0s0e100 = new 
RemoteLogSegmentMetadata(segIdFooTp0s0e100, 0L, 100L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+        rlmm.addRemoteLogSegmentMetadata(segMetFooTp0s0e100);
+
+        // wWe should not get this as the segment is still gettign copied and 
it is not yet considered successful until

Review comment:
       Typos:
   1. s/wWe/We
   2. s/gettign/getting

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed 
by inmemory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
+
+    private final ConcurrentMap<TopicIdPartition, 
RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
+            new ConcurrentHashMap<>();
+
+    private final ConcurrentMap<TopicIdPartition, RemoteLogMetadataCache> 
partitionToRemoteLogMetadataCache =
+            new ConcurrentHashMap<>();
+
+    @Override
+    public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata)
+            throws RemoteStorageException {
+        log.debug("Adding remote log segment : [{}]", 
remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        // this method is allowed only to add remote log segment with the 
initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
+        // but not to update the existing remote log segment metadata.
+        if (remoteLogSegmentMetadata.state() != 
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+            throw new IllegalArgumentException("Given remoteLogSegmentMetadata 
should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED
+                    + " but it contains state as: " + 
remoteLogSegmentMetadata.state());
+        }
+
+        RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+
+        RemoteLogMetadataCache remoteLogMetadataCache = 
partitionToRemoteLogMetadataCache
+                .computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> 
new RemoteLogMetadataCache());
+
+        remoteLogMetadataCache.addToInProgress(remoteLogSegmentMetadata);
+    }
+
+    @Override
+    public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
+            throws RemoteStorageException {
+        log.debug("Updating remote log segment: [{}]", metadataUpdate);
+        Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be 
null");
+
+        RemoteLogSegmentState targetState = metadataUpdate.state();
+        // Callers should use putRemoteLogSegmentMetadata to add 
RemoteLogSegmentMetadata with state as
+        // RemoteLogSegmentState.COPY_SEGMENT_STARTED.
+        if (targetState == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {

Review comment:
       Similar to above comment, why not check this inside 
`remoteLogMetadataCache.updateRemoteLogSegmentMetadata()`?

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are 
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+    private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata

Review comment:
       Can we add a 1-line doc for this similar to other attributes below?

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are 
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+    private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata
+            = new ConcurrentHashMap<>();
+
+    // It keeps the segments which are not yet reached to 
COPY_SEGMENT_FINISHED state.
+    private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new 
HashSet<>();
+
+    // It will have all the segments except with state as COPY_SEGMENT_STARTED.
+    private final ConcurrentMap<Integer, NavigableMap<Long, 
RemoteLogSegmentId>> leaderEpochToOffsetToId
+            = new ConcurrentHashMap<>();
+
+    private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        log.debug("Adding remote log segment metadata: [{}]", 
remoteLogSegmentMetadata);
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);
+        Map<Integer, Long> leaderEpochToOffset = 
remoteLogSegmentMetadata.segmentLeaderEpochs();
+        for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
+            leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new 
ConcurrentSkipListMap<>())
+                    .put(entry.getValue(), 
remoteLogSegmentMetadata.remoteLogSegmentId());
+        }
+    }
+
+    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
+        NavigableMap<Long, RemoteLogSegmentId> offsetToId = 
leaderEpochToOffsetToId.get(leaderEpoch);
+        if (offsetToId == null || offsetToId.isEmpty()) {
+            return Optional.empty();
+        }
+
+        // look for floor entry as the given offset may exist in this entry.
+        Map.Entry<Long, RemoteLogSegmentId> entry = 
offsetToId.floorEntry(offset);
+        if (entry == null) {
+            // if the offset is lower than the minimum offset available in 
metadata then return empty.
+            return Optional.empty();
+        }
+
+        RemoteLogSegmentMetadata metadata = 
idToSegmentMetadata.get(entry.getValue());
+        // check whether the given offset with leaderEpoch exists in this 
segment.
+        // check for epoch's offset boundaries with in this segment.
+        //      1. get the next epoch's start offset -1 if exists
+        //      2. if no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
+        Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs()
+                .higherEntry(leaderEpoch);
+        long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : 
metadata.endOffset();
+
+        // seek offset should be <= epoch's end offset.
+        return (offset > epochEndOffset) ? Optional.empty() : 
Optional.of(metadata);
+    }
+
+    public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
+            throws RemoteResourceNotFoundException {
+        log.debug("Updating remote log segment metadata: [{}]", 
metadataUpdate);
+        RemoteLogSegmentId remoteLogSegmentId = 
metadataUpdate.remoteLogSegmentId();
+        RemoteLogSegmentMetadata existingMetadata = 
idToSegmentMetadata.get(remoteLogSegmentId);
+        if (existingMetadata == null) {
+            throw new RemoteResourceNotFoundException("No remote log segment 
metadata found for : "
+                                                      + remoteLogSegmentId);
+        }
+
+        RemoteLogSegmentState targetState = metadataUpdate.state();
+        RemoteLogSegmentState existingState = existingMetadata.state();
+        if (!RemoteLogSegmentState.isValidTransition(existingMetadata.state(), 
targetState)) {
+            throw new IllegalStateException("Current state: " + existingState 
+ ", target state: " + targetState);
+        }
+

Review comment:
       In this method, we allow for existing entries in `idToSegmentMetadata` 
to be replaced, even if the state of the existing and new entries are the same. 
Is that intentional?

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are 
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+    private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata
+            = new ConcurrentHashMap<>();
+
+    // It keeps the segments which are not yet reached to 
COPY_SEGMENT_FINISHED state.
+    private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new 
HashSet<>();
+
+    // It will have all the segments except with state as COPY_SEGMENT_STARTED.
+    private final ConcurrentMap<Integer, NavigableMap<Long, 
RemoteLogSegmentId>> leaderEpochToOffsetToId
+            = new ConcurrentHashMap<>();
+
+    private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        log.debug("Adding remote log segment metadata: [{}]", 
remoteLogSegmentMetadata);
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);
+        Map<Integer, Long> leaderEpochToOffset = 
remoteLogSegmentMetadata.segmentLeaderEpochs();
+        for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
+            leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new 
ConcurrentSkipListMap<>())
+                    .put(entry.getValue(), 
remoteLogSegmentMetadata.remoteLogSegmentId());
+        }
+    }
+
+    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
+        NavigableMap<Long, RemoteLogSegmentId> offsetToId = 
leaderEpochToOffsetToId.get(leaderEpoch);
+        if (offsetToId == null || offsetToId.isEmpty()) {
+            return Optional.empty();
+        }
+
+        // look for floor entry as the given offset may exist in this entry.
+        Map.Entry<Long, RemoteLogSegmentId> entry = 
offsetToId.floorEntry(offset);
+        if (entry == null) {
+            // if the offset is lower than the minimum offset available in 
metadata then return empty.
+            return Optional.empty();
+        }
+
+        RemoteLogSegmentMetadata metadata = 
idToSegmentMetadata.get(entry.getValue());
+        // check whether the given offset with leaderEpoch exists in this 
segment.
+        // check for epoch's offset boundaries with in this segment.
+        //      1. get the next epoch's start offset -1 if exists
+        //      2. if no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
+        Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs()
+                .higherEntry(leaderEpoch);
+        long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : 
metadata.endOffset();
+
+        // seek offset should be <= epoch's end offset.
+        return (offset > epochEndOffset) ? Optional.empty() : 
Optional.of(metadata);
+    }
+
+    public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
+            throws RemoteResourceNotFoundException {
+        log.debug("Updating remote log segment metadata: [{}]", 
metadataUpdate);
+        RemoteLogSegmentId remoteLogSegmentId = 
metadataUpdate.remoteLogSegmentId();
+        RemoteLogSegmentMetadata existingMetadata = 
idToSegmentMetadata.get(remoteLogSegmentId);
+        if (existingMetadata == null) {
+            throw new RemoteResourceNotFoundException("No remote log segment 
metadata found for : "
+                                                      + remoteLogSegmentId);
+        }
+
+        RemoteLogSegmentState targetState = metadataUpdate.state();
+        RemoteLogSegmentState existingState = existingMetadata.state();
+        if (!RemoteLogSegmentState.isValidTransition(existingMetadata.state(), 
targetState)) {
+            throw new IllegalStateException("Current state: " + existingState 
+ ", target state: " + targetState);
+        }
+
+        RemoteLogSegmentMetadata updatedMetadata = 
existingMetadata.createRemoteLogSegmentWithUpdates(metadataUpdate);
+        idToSegmentMetadata.put(remoteLogSegmentId, updatedMetadata);
+        if (targetState != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+            remoteLogSegmentIdInProgress.remove(remoteLogSegmentId);
+            addRemoteLogSegmentMetadata(updatedMetadata);
+        }
+
+        if (targetState == RemoteLogSegmentState.DELETE_SEGMENT_FINISHED) {
+            log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+            // remove this entry when the state is moved to 
delete_segment_finished
+            Map<Integer, Long> leaderEpochs = 
existingMetadata.segmentLeaderEpochs();

Review comment:
       Hmm, the entry for `existingMetadata` gets overwritten in the call to 
`addRemoteLogSegmentMetadata` in L110. Should we be accounting for the same 
here?

##########
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are 
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+    private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata
+            = new ConcurrentHashMap<>();
+
+    // It keeps the segments which are not yet reached to 
COPY_SEGMENT_FINISHED state.
+    private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new 
HashSet<>();
+
+    // It will have all the segments except with state as COPY_SEGMENT_STARTED.
+    private final ConcurrentMap<Integer, NavigableMap<Long, 
RemoteLogSegmentId>> leaderEpochToOffsetToId
+            = new ConcurrentHashMap<>();
+
+    private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        log.debug("Adding remote log segment metadata: [{}]", 
remoteLogSegmentMetadata);
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);
+        Map<Integer, Long> leaderEpochToOffset = 
remoteLogSegmentMetadata.segmentLeaderEpochs();
+        for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
+            leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new 
ConcurrentSkipListMap<>())
+                    .put(entry.getValue(), 
remoteLogSegmentMetadata.remoteLogSegmentId());
+        }
+    }
+
+    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
+        NavigableMap<Long, RemoteLogSegmentId> offsetToId = 
leaderEpochToOffsetToId.get(leaderEpoch);
+        if (offsetToId == null || offsetToId.isEmpty()) {
+            return Optional.empty();
+        }
+
+        // look for floor entry as the given offset may exist in this entry.
+        Map.Entry<Long, RemoteLogSegmentId> entry = 
offsetToId.floorEntry(offset);
+        if (entry == null) {
+            // if the offset is lower than the minimum offset available in 
metadata then return empty.
+            return Optional.empty();
+        }
+
+        RemoteLogSegmentMetadata metadata = 
idToSegmentMetadata.get(entry.getValue());
+        // check whether the given offset with leaderEpoch exists in this 
segment.
+        // check for epoch's offset boundaries with in this segment.
+        //      1. get the next epoch's start offset -1 if exists
+        //      2. if no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
+        Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs()
+                .higherEntry(leaderEpoch);
+        long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : 
metadata.endOffset();
+
+        // seek offset should be <= epoch's end offset.
+        return (offset > epochEndOffset) ? Optional.empty() : 
Optional.of(metadata);
+    }
+
+    public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
+            throws RemoteResourceNotFoundException {
+        log.debug("Updating remote log segment metadata: [{}]", 
metadataUpdate);
+        RemoteLogSegmentId remoteLogSegmentId = 
metadataUpdate.remoteLogSegmentId();
+        RemoteLogSegmentMetadata existingMetadata = 
idToSegmentMetadata.get(remoteLogSegmentId);
+        if (existingMetadata == null) {
+            throw new RemoteResourceNotFoundException("No remote log segment 
metadata found for : "
+                                                      + remoteLogSegmentId);
+        }
+
+        RemoteLogSegmentState targetState = metadataUpdate.state();
+        RemoteLogSegmentState existingState = existingMetadata.state();
+        if (!RemoteLogSegmentState.isValidTransition(existingMetadata.state(), 
targetState)) {
+            throw new IllegalStateException("Current state: " + existingState 
+ ", target state: " + targetState);
+        }
+
+        RemoteLogSegmentMetadata updatedMetadata = 
existingMetadata.createRemoteLogSegmentWithUpdates(metadataUpdate);
+        idToSegmentMetadata.put(remoteLogSegmentId, updatedMetadata);
+        if (targetState != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+            remoteLogSegmentIdInProgress.remove(remoteLogSegmentId);
+            addRemoteLogSegmentMetadata(updatedMetadata);
+        }
+
+        if (targetState == RemoteLogSegmentState.DELETE_SEGMENT_FINISHED) {
+            log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+            // remove this entry when the state is moved to 
delete_segment_finished
+            Map<Integer, Long> leaderEpochs = 
existingMetadata.segmentLeaderEpochs();
+            for (Map.Entry<Integer, Long> entry : leaderEpochs.entrySet()) {
+                NavigableMap<Long, RemoteLogSegmentId> offsetToIds = 
leaderEpochToOffsetToId.get(entry.getKey());
+                // remove the mappings where this segment is deleted.
+                offsetToIds.values().remove(remoteLogSegmentId);
+            }
+
+            // remove the segment-id mapping.
+            idToSegmentMetadata.remove(remoteLogSegmentId);
+        }
+    }
+
+    public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() {
+        ArrayList<RemoteLogSegmentMetadata> list = new 
ArrayList<>(idToSegmentMetadata.values());
+        list.addAll(remoteLogSegmentIdInProgress.stream().map(id -> 
idToSegmentMetadata.get(id))
+                .collect(Collectors.toList()));
+        
list.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset));
+        return list.iterator();
+    }
+
+    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int 
leaderEpoch) {
+        NavigableMap<Long, RemoteLogSegmentId> map = 
leaderEpochToOffsetToId.get(leaderEpoch);
+        return map != null ? map.values().stream().map(id -> 
idToSegmentMetadata.get(id)).iterator()
+                           : Collections.emptyIterator();
+    }
+
+    public Optional<Long> highestLogOffset(int leaderEpoch) {
+        NavigableMap<Long, RemoteLogSegmentId> offsetToSegmentId = 
leaderEpochToOffsetToId.get(leaderEpoch);
+        if (offsetToSegmentId == null) {
+            return Optional.empty();
+        }
+
+        long max = 0L;
+        for (RemoteLogSegmentId id : offsetToSegmentId.values()) {
+            RemoteLogSegmentMetadata metadata = idToSegmentMetadata.get(id);
+            Map.Entry<Integer, Long> nextEntry = 
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
+            // If there is an higher entry than the given leader epoch, 
end-offset of the given leader epoch is,
+            // (next leader epoch's start-offset) -1.
+            long nextVal = nextEntry != null ? nextEntry.getValue() - 1 : 
metadata.endOffset();
+            max = Math.max(max, nextVal);
+        }
+
+        return Optional.of(max);
+    }
+
+    /**
+     * This will be added to copy_in_progress metadata list. This will be 
removed from that list once it is moved to the
+     * next state which can be COPY_SEGMENT_FINISHED or DELETE_SEGMENT_STARTED.
+     *
+     * @param remoteLogSegmentMetadata
+     */
+    public void addToInProgress(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {

Review comment:
       Before we insert into the map/set, we should check if the provided 
`remoteLogSegmentMetadata.state()` is `COPY_SEGMENT_STARTED`.

##########
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##########
@@ -87,4 +89,27 @@ public byte id() {
     public static RemoteLogSegmentState forId(byte id) {
         return STATE_TYPES.get(id);
     }
+
+    public static boolean isValidTransition(RemoteLogSegmentState srcState, 
RemoteLogSegmentState targetState) {
+        Objects.requireNonNull(targetState, "targetState can not be null");
+
+        if (srcState == null) {

Review comment:
       It seems to me that `srcState` is never null in practice. Where does 
this check come into play in practice?

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class InmemoryRemoteLogMetadataManagerTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID = 0;
+
+    @Test
+    public void testRLMMFetchSegment() throws Exception {

Review comment:
       Can we improve the local variable names? for example 
`segIdFooTp0s0e100`, `segMetFooTp0s0e100` etc. is not easy to read. We can use 
simpler names.

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {

Review comment:
       Could we add test(s) for `highestLogOffset` API?

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID = 0;
+
+    @Test
+    public void testCacheSegmentsWithDifferentStates() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+        // Add segments with different states and check 
cache.remoteLogSegmentMetadata(int leaderEpoch, long offset)
+        // cache.listRemoteLogSegments(int leaderEpoch), and 
cache.listAllRemoteLogSegments().
+
+        // 
=============================================================================================================
+        // 1.Create a segment with state COPY_SEGMENT_STARTED, and check for 
searching that segment and listing the
+        // segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg0leaderEpochs = Collections.singletonMap(0, 0L);
+        RemoteLogSegmentId seg0Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segCopyInProgress = new 
RemoteLogSegmentMetadata(seg0Id, 0L, 50L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+        cache.addToInProgress(segCopyInProgress);
+
+        // This segment should not be available as the state is not reached to 
COPY_SEGMENT_FINISHED.
+        Optional<RemoteLogSegmentMetadata> seg0s0e0 = 
cache.remoteLogSegmentMetadata(0, 0);
+        Assertions.assertFalse(seg0s0e0.isPresent());
+
+        // cache.listRemoteLogSegments(0) should not contain the above 
segment, it will be empty.
+        Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
+        // But cache.listRemoteLogSegments() should contain the above segment.
+        checkContainsAll(cache.listAllRemoteLogSegments(), 
Collections.singletonList(segCopyInProgress));
+
+        // 
=============================================================================================================
+        // 2.Create a segment and move it to state COPY_SEGMENT_FINISHED. and 
check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg1leaderEpochs = Collections.singletonMap(0, 
101L);
+        RemoteLogSegmentId seg1Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg1 = new RemoteLogSegmentMetadata(seg1Id, 
101L, 200L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg1leaderEpochs);
+        cache.addToInProgress(seg1);
+        RemoteLogSegmentMetadataUpdate seg1Update = new 
RemoteLogSegmentMetadataUpdate(seg1Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg1Update);
+        RemoteLogSegmentMetadata segCopyFinished = 
seg1.createRemoteLogSegmentWithUpdates(seg1Update);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg1S150 = 
cache.remoteLogSegmentMetadata(0, 150);
+        
Assertions.assertEquals(seg1.createRemoteLogSegmentWithUpdates(seg1Update), 
seg1S150.orElse(null));
+
+        // cache.listRemoteLogSegments(0) should not contain the above segment.
+        checkContainsAll(cache.listRemoteLogSegments(0), 
Collections.singletonList(segCopyFinished));
+        // But cache.listRemoteLogSegments() should contain both the segments.
+        checkContainsAll(cache.listAllRemoteLogSegments(), 
Arrays.asList(segCopyInProgress, segCopyFinished));
+
+        // 
=============================================================================================================
+        // 3.Create a segment and move it to state DELETE_SEGMENT_STARTED, and 
check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg2leaderEpochs = Collections.singletonMap(0, 
201L);
+        RemoteLogSegmentId seg2Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg2 = new RemoteLogSegmentMetadata(seg2Id, 
201L, 300L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg2leaderEpochs);
+        cache.addToInProgress(seg2);
+        RemoteLogSegmentMetadataUpdate seg2Update = new 
RemoteLogSegmentMetadataUpdate(seg2Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+        cache.updateRemoteLogSegmentMetadata(seg2Update);
+        RemoteLogSegmentMetadata segDeleteStarted = 
seg2.createRemoteLogSegmentWithUpdates(seg2Update);
+
+        // Search should return the above segment.
+        Optional<RemoteLogSegmentMetadata> seg2S250 = 
cache.remoteLogSegmentMetadata(0, 250);
+        
Assertions.assertEquals(seg2.createRemoteLogSegmentWithUpdates(seg2Update), 
seg2S250.orElse(null));
+
+        // cache.listRemoteLogSegments(0) should contain the above segment.
+        checkContainsAll(cache.listRemoteLogSegments(0), 
Arrays.asList(segCopyFinished, segDeleteStarted));
+        // But cache.listRemoteLogSegments() should contain all the segments.
+        checkContainsAll(cache.listAllRemoteLogSegments(),
+                Arrays.asList(segCopyInProgress, segCopyFinished, 
segDeleteStarted));
+
+        // 
=============================================================================================================
+        // 4.Create a segment and move it to state DELETE_SEGMENT_FINISHED, 
and check for searching that segment and
+        // listing the segments.
+        // 
==============================================================================================================
+        Map<Integer, Long> seg3leaderEpochs = Collections.singletonMap(0, 
301L);
+        RemoteLogSegmentId seg3Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata seg3 = new RemoteLogSegmentMetadata(seg3Id, 
301L, 400L, -1L, BROKER_ID,
+                System.currentTimeMillis(), SEG_SIZE, seg3leaderEpochs);
+        cache.addToInProgress(seg3);
+        RemoteLogSegmentMetadataUpdate seg3Update1 = new 
RemoteLogSegmentMetadataUpdate(seg3Id,
+                System.currentTimeMillis(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);

Review comment:
       Should we alter the other arguments too, for example `BROKER_ID` and 
`eventTimestamp`? It appears that we expect `RemoteLogMetadataCache` to [apply 
all of the provided 
updates](https://github.com/apache/kafka/blob/0d9a95a7d0ab06aecc4480901707e29dd2a3147e/clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java#L240-L242),
 and this may include the other fields as well.

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by 
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+    // map of key to log data, which can be segment or any of its indexes.
+    private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+    public InmemoryRemoteStorageManager() {
+    }
+
+    static String generateKeyForSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
".segment";
+    }
+
+    static String generateKeyForIndex(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                      IndexType indexType) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
"." + indexType.toString();
+    }
+
+    // visible for testing.
+    boolean containsKey(String key) {
+        return keyToLogData.containsKey(key);
+    }
+
+    @Override
+    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                   LogSegmentData logSegmentData)
+            throws RemoteStorageException {
+        log.debug("copying log segment and indexes for : {}", 
remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+        Objects.requireNonNull(logSegmentData, "logSegmentData can not be 
null");
+        try {
+            keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),

Review comment:
       As per the interface we 
[mandate](https://github.com/apache/kafka/blob/0d9a95a7d0ab06aecc4480901707e29dd2a3147e/clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L76)
 the caller to ensure unique ID, but is it useful to add a guard that disallows 
replacing existing values?)

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by 
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+    // map of key to log data, which can be segment or any of its indexes.
+    private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+    public InmemoryRemoteStorageManager() {

Review comment:
       Can we remove this c'tor in exchange for the default generated c'tor?

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID = 0;
+
+    @Test
+    public void testCacheSegmentsWithDifferentStates() throws Exception {

Review comment:
       This particular test checks a number of things together in one test. 
Instead, could sections (1) to (4) from below each be defined as a separate 
unit test? Especially since each section seems to operate on a different 
segment, so it seems logically independent.

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by 
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+    // map of key to log data, which can be segment or any of its indexes.
+    private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+    public InmemoryRemoteStorageManager() {
+    }
+
+    static String generateKeyForSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
".segment";
+    }
+
+    static String generateKeyForIndex(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                      IndexType indexType) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
"." + indexType.toString();
+    }
+
+    // visible for testing.
+    boolean containsKey(String key) {
+        return keyToLogData.containsKey(key);
+    }
+
+    @Override
+    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                   LogSegmentData logSegmentData)
+            throws RemoteStorageException {
+        log.debug("copying log segment and indexes for : {}", 
remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+        Objects.requireNonNull(logSegmentData, "logSegmentData can not be 
null");
+        try {
+            keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+                    Files.readAllBytes(logSegmentData.logSegment().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Offset),
+                    Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Timestamp),
+                    Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Transaction),
+                    Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.LeaderEpoch),
+                    logSegmentData.leaderEpochIndex().array());
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.ProducerSnapshot),
+                    
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+        } catch (IOException e) {
+            throw new RemoteStorageException(e.getMessage(), e);
+        }
+        log.debug("copied log segment and indexes for : {} successfully.", 
remoteLogSegmentMetadata);
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                       int startPosition)
+            throws RemoteStorageException {
+        log.debug("Received fetch segment request at start position: [{}] for 
[{}]", startPosition, remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        return fetchLogSegment(remoteLogSegmentMetadata, startPosition, 
Integer.MAX_VALUE);
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                       int startPosition,
+                                       int endPosition) throws 
RemoteStorageException {
+        log.debug("Received fetch segment request at start position: [{}] and 
end position: [{}] for segment [{}]",
+                startPosition, endPosition, remoteLogSegmentMetadata);
+
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        if (startPosition < 0 || endPosition < 0) {
+            throw new IllegalArgumentException("Given start position or end 
position must not be negative.");
+        }
+
+        if (endPosition < startPosition) {
+            throw new IllegalArgumentException("end position must be greater 
than start position");
+        }
+
+        String key = generateKeyForSegment(remoteLogSegmentMetadata);
+        byte[] segment = keyToLogData.get(key);
+
+        if (segment == null) {
+            throw new RemoteResourceNotFoundException("No remote log segment 
found with start offset:"
+                                                      + 
remoteLogSegmentMetadata.startOffset() + " and id: "
+                                                      + 
remoteLogSegmentMetadata.remoteLogSegmentId());
+        }
+
+        if (startPosition >= segment.length) {
+            throw new IllegalArgumentException("start position: " + 
startPosition
+                                               + " must be less than the 
length of the segment: " + segment.length);
+        }
+
+        // check for boundaries like given end position is more than the 
length, length should never be more than the
+        // existing segment size.
+        int length = Math.min(segment.length - 1, endPosition) - startPosition 
+ 1;
+        log.debug("Length of the segment to be sent: [{}], for segment: [{}]", 
length, remoteLogSegmentMetadata);
+
+        return new ByteArrayInputStream(segment, startPosition, length);
+    }
+
+    @Override
+    public InputStream fetchIndex(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                  IndexType indexType) throws 
RemoteStorageException {
+        log.debug("Received fetch request for index type: [{}], segment [{}]", 
indexType, remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+        Objects.requireNonNull(indexType, "indexType can not be null");
+
+        String key = generateKeyForIndex(remoteLogSegmentMetadata, indexType);
+        byte[] index = keyToLogData.get(key);
+        if (index == null) {
+            throw new RemoteResourceNotFoundException("No remote log segment 
index found with start offset:"
+                                                      + 
remoteLogSegmentMetadata.startOffset() + " and id: "
+                                                      + 
remoteLogSegmentMetadata.remoteLogSegmentId());
+        }
+
+        return new ByteArrayInputStream(index);
+    }
+
+    @Override
+    public void deleteLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException {
+        log.info("Deleting log segment for: [{}]", remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+        String segmentKey = generateKeyForSegment(remoteLogSegmentMetadata);
+        keyToLogData.remove(segmentKey);
+        for (IndexType indexType : IndexType.values()) {
+            String key = generateKeyForIndex(remoteLogSegmentMetadata, 
indexType);
+            keyToLogData.remove(key);
+        }
+        log.info("Deleted log segment successfully for: [{}]", 
remoteLogSegmentMetadata);
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyToLogData = Collections.emptyMap();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {

Review comment:
       Is this intentionally left empty?

##########
File path: 
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by 
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+    private static final Logger log = 
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+    // map of key to log data, which can be segment or any of its indexes.
+    private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+    public InmemoryRemoteStorageManager() {
+    }
+
+    static String generateKeyForSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
".segment";
+    }
+
+    static String generateKeyForIndex(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                      IndexType indexType) {
+        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + 
"." + indexType.toString();
+    }
+
+    // visible for testing.
+    boolean containsKey(String key) {
+        return keyToLogData.containsKey(key);
+    }
+
+    @Override
+    public void copyLogSegmentData(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                   LogSegmentData logSegmentData)
+            throws RemoteStorageException {
+        log.debug("copying log segment and indexes for : {}", 
remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+        Objects.requireNonNull(logSegmentData, "logSegmentData can not be 
null");
+        try {
+            keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+                    Files.readAllBytes(logSegmentData.logSegment().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Offset),
+                    Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Timestamp),
+                    Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.Transaction),
+                    Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.LeaderEpoch),
+                    logSegmentData.leaderEpochIndex().array());
+            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, 
IndexType.ProducerSnapshot),
+                    
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+        } catch (IOException e) {
+            throw new RemoteStorageException(e.getMessage(), e);
+        }
+        log.debug("copied log segment and indexes for : {} successfully.", 
remoteLogSegmentMetadata);
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                       int startPosition)
+            throws RemoteStorageException {
+        log.debug("Received fetch segment request at start position: [{}] for 
[{}]", startPosition, remoteLogSegmentMetadata);
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        return fetchLogSegment(remoteLogSegmentMetadata, startPosition, 
Integer.MAX_VALUE);
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                       int startPosition,
+                                       int endPosition) throws 
RemoteStorageException {
+        log.debug("Received fetch segment request at start position: [{}] and 
end position: [{}] for segment [{}]",
+                startPosition, endPosition, remoteLogSegmentMetadata);
+
+        Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+        if (startPosition < 0 || endPosition < 0) {
+            throw new IllegalArgumentException("Given start position or end 
position must not be negative.");
+        }
+
+        if (endPosition < startPosition) {
+            throw new IllegalArgumentException("end position must be greater 
than start position");

Review comment:
       Probably better to say `...must be greater than or equal to...` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to