kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r598964030
##########
File path:
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##########
@@ -21,14 +21,16 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* It indicates the state of the remote log segment. This will be based on the
action executed on this
Review comment:
You can drop `It` and start with `Indicates the state...`.
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed
by inmemory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements
RemoteLogMetadataManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
+
+ private final ConcurrentMap<TopicIdPartition,
RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<TopicIdPartition, RemoteLogMetadataCache>
partitionToRemoteLogMetadataCache =
+ new ConcurrentHashMap<>();
+
+ @Override
+ public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata)
+ throws RemoteStorageException {
+ log.debug("Adding remote log segment : [{}]",
remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ // this method is allowed only to add remote log segment with the
initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
+ // but not to update the existing remote log segment metadata.
+ if (remoteLogSegmentMetadata.state() !=
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
Review comment:
Can this be checked inside `RemoteLogMetadataCache.addToInProgress()`
instead of here?
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+ // map of key to log data, which can be segment or any of its indexes.
+ private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+ public InmemoryRemoteStorageManager() {
+ }
+
+ static String generateKeyForSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
".segment";
+ }
+
+ static String generateKeyForIndex(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ IndexType indexType) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
"." + indexType.toString();
+ }
+
+ // visible for testing.
+ boolean containsKey(String key) {
+ return keyToLogData.containsKey(key);
+ }
+
+ @Override
+ public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
+ throws RemoteStorageException {
+ log.debug("copying log segment and indexes for : {}",
remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+ Objects.requireNonNull(logSegmentData, "logSegmentData can not be
null");
+ try {
+ keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+ Files.readAllBytes(logSegmentData.logSegment().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Offset),
+ Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Timestamp),
+ Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Transaction),
+ Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.LeaderEpoch),
+ logSegmentData.leaderEpochIndex().array());
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.ProducerSnapshot),
+
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+ } catch (IOException e) {
+ throw new RemoteStorageException(e.getMessage(), e);
Review comment:
We could add a c'tor overload to
[RemoteStorageException](https://github.com/apache/kafka/blob/0d9a95a7d0ab06aecc4480901707e29dd2a3147e/clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageException.java)
that takes a `Throwable` as argument, it would the need to pass 2 args here.
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+ // map of key to log data, which can be segment or any of its indexes.
+ private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+ public InmemoryRemoteStorageManager() {
+ }
+
+ static String generateKeyForSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
".segment";
+ }
+
+ static String generateKeyForIndex(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ IndexType indexType) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
"." + indexType.toString();
+ }
+
+ // visible for testing.
+ boolean containsKey(String key) {
+ return keyToLogData.containsKey(key);
+ }
+
+ @Override
+ public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
+ throws RemoteStorageException {
+ log.debug("copying log segment and indexes for : {}",
remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+ Objects.requireNonNull(logSegmentData, "logSegmentData can not be
null");
+ try {
+ keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+ Files.readAllBytes(logSegmentData.logSegment().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Offset),
+ Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Timestamp),
+ Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Transaction),
+ Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.LeaderEpoch),
+ logSegmentData.leaderEpochIndex().array());
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.ProducerSnapshot),
+
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+ } catch (IOException e) {
+ throw new RemoteStorageException(e.getMessage(), e);
+ }
+ log.debug("copied log segment and indexes for : {} successfully.",
remoteLogSegmentMetadata);
+ }
+
+ @Override
+ public InputStream fetchLogSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ int startPosition)
+ throws RemoteStorageException {
+ log.debug("Received fetch segment request at start position: [{}] for
[{}]", startPosition, remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ return fetchLogSegment(remoteLogSegmentMetadata, startPosition,
Integer.MAX_VALUE);
+ }
+
+ @Override
+ public InputStream fetchLogSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ int startPosition,
+ int endPosition) throws
RemoteStorageException {
+ log.debug("Received fetch segment request at start position: [{}] and
end position: [{}] for segment [{}]",
+ startPosition, endPosition, remoteLogSegmentMetadata);
+
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ if (startPosition < 0 || endPosition < 0) {
+ throw new IllegalArgumentException("Given start position or end
position must not be negative.");
+ }
+
+ if (endPosition < startPosition) {
+ throw new IllegalArgumentException("end position must be greater
than start position");
+ }
+
+ String key = generateKeyForSegment(remoteLogSegmentMetadata);
+ byte[] segment = keyToLogData.get(key);
+
+ if (segment == null) {
+ throw new RemoteResourceNotFoundException("No remote log segment
found with start offset:"
+ +
remoteLogSegmentMetadata.startOffset() + " and id: "
+ +
remoteLogSegmentMetadata.remoteLogSegmentId());
+ }
+
+ if (startPosition >= segment.length) {
+ throw new IllegalArgumentException("start position: " +
startPosition
+ + " must be less than the
length of the segment: " + segment.length);
+ }
+
+ // check for boundaries like given end position is more than the
length, length should never be more than the
+ // existing segment size.
+ int length = Math.min(segment.length - 1, endPosition) - startPosition
+ 1;
Review comment:
Hmm, do we need to explicitly check if `endPosition` < `segment.length`?
##########
File path:
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteState.java
##########
@@ -83,4 +85,25 @@ public static RemotePartitionDeleteState forId(byte id) {
return STATE_TYPES.get(id);
}
+ public static boolean isValidTransition(RemotePartitionDeleteState
srcState,
+ RemotePartitionDeleteState
targetState) {
+ Objects.requireNonNull(targetState, "targetState can not be null");
+
+ if (srcState == null) {
+ // If the source state is null, check the target state as the
initial state viz DELETE_PARTITION_MARKED
+ // Wanted to keep this logic simple here by taking null for
srcState, instead of creating one more state like
+ // DELETE_PARTITION_NOT_MARKED and have the null check by caller
and pass that state.
Review comment:
IMHO, we can simplify this to say:
```
// If the source state is null, check the target state as the initial state
viz DELETE_PARTITION_MARKED.
// This ensures simplicity as we don't have to define an additional state
type to represent the initial state.
```
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+ private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("foo", 0));
+ private static final int SEG_SIZE = 1024 * 1024;
+ private static final int BROKER_ID = 0;
+
+ @Test
+ public void testCacheSegmentsWithDifferentStates() throws Exception {
+ RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+ // Add segments with different states and check
cache.remoteLogSegmentMetadata(int leaderEpoch, long offset)
+ // cache.listRemoteLogSegments(int leaderEpoch), and
cache.listAllRemoteLogSegments().
+
+ //
=============================================================================================================
+ // 1.Create a segment with state COPY_SEGMENT_STARTED, and check for
searching that segment and listing the
+ // segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg0leaderEpochs = Collections.singletonMap(0, 0L);
+ RemoteLogSegmentId seg0Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata segCopyInProgress = new
RemoteLogSegmentMetadata(seg0Id, 0L, 50L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+ cache.addToInProgress(segCopyInProgress);
+
+ // This segment should not be available as the state is not reached to
COPY_SEGMENT_FINISHED.
+ Optional<RemoteLogSegmentMetadata> seg0s0e0 =
cache.remoteLogSegmentMetadata(0, 0);
+ Assertions.assertFalse(seg0s0e0.isPresent());
+
+ // cache.listRemoteLogSegments(0) should not contain the above
segment, it will be empty.
+ Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
+ // But cache.listRemoteLogSegments() should contain the above segment.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
Collections.singletonList(segCopyInProgress));
+
+ //
=============================================================================================================
+ // 2.Create a segment and move it to state COPY_SEGMENT_FINISHED. and
check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg1leaderEpochs = Collections.singletonMap(0,
101L);
+ RemoteLogSegmentId seg1Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg1 = new RemoteLogSegmentMetadata(seg1Id,
101L, 200L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg1leaderEpochs);
+ cache.addToInProgress(seg1);
+ RemoteLogSegmentMetadataUpdate seg1Update = new
RemoteLogSegmentMetadataUpdate(seg1Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg1Update);
+ RemoteLogSegmentMetadata segCopyFinished =
seg1.createRemoteLogSegmentWithUpdates(seg1Update);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg1S150 =
cache.remoteLogSegmentMetadata(0, 150);
+
Assertions.assertEquals(seg1.createRemoteLogSegmentWithUpdates(seg1Update),
seg1S150.orElse(null));
+
+ // cache.listRemoteLogSegments(0) should not contain the above segment.
+ checkContainsAll(cache.listRemoteLogSegments(0),
Collections.singletonList(segCopyFinished));
+ // But cache.listRemoteLogSegments() should contain both the segments.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
Arrays.asList(segCopyInProgress, segCopyFinished));
+
+ //
=============================================================================================================
+ // 3.Create a segment and move it to state DELETE_SEGMENT_STARTED, and
check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg2leaderEpochs = Collections.singletonMap(0,
201L);
+ RemoteLogSegmentId seg2Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg2 = new RemoteLogSegmentMetadata(seg2Id,
201L, 300L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg2leaderEpochs);
+ cache.addToInProgress(seg2);
+ RemoteLogSegmentMetadataUpdate seg2Update = new
RemoteLogSegmentMetadataUpdate(seg2Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg2Update);
+ RemoteLogSegmentMetadata segDeleteStarted =
seg2.createRemoteLogSegmentWithUpdates(seg2Update);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg2S250 =
cache.remoteLogSegmentMetadata(0, 250);
+
Assertions.assertEquals(seg2.createRemoteLogSegmentWithUpdates(seg2Update),
seg2S250.orElse(null));
+
+ // cache.listRemoteLogSegments(0) should contain the above segment.
+ checkContainsAll(cache.listRemoteLogSegments(0),
Arrays.asList(segCopyFinished, segDeleteStarted));
+ // But cache.listRemoteLogSegments() should contain all the segments.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
+ Arrays.asList(segCopyInProgress, segCopyFinished,
segDeleteStarted));
+
+ //
=============================================================================================================
+ // 4.Create a segment and move it to state DELETE_SEGMENT_FINISHED,
and check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg3leaderEpochs = Collections.singletonMap(0,
301L);
+ RemoteLogSegmentId seg3Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg3 = new RemoteLogSegmentMetadata(seg3Id,
301L, 400L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg3leaderEpochs);
+ cache.addToInProgress(seg3);
+ RemoteLogSegmentMetadataUpdate seg3Update1 = new
RemoteLogSegmentMetadataUpdate(seg3Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg3Update1);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg3S350 =
cache.remoteLogSegmentMetadata(0, 350);
+
Assertions.assertEquals(seg3.createRemoteLogSegmentWithUpdates(seg3Update1),
seg3S350.orElse(null));
+
+ RemoteLogSegmentMetadataUpdate seg3Update2 = new
RemoteLogSegmentMetadataUpdate(seg3Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg3Update2);
+
+ // cache.listRemoteLogSegments(0) should not contain the above segment.
+ checkContainsAll(cache.listRemoteLogSegments(0),
Arrays.asList(segCopyFinished, segDeleteStarted));
+ // But cache.listRemoteLogSegments() should not contain both the
segments as it should have been removed.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
+ Arrays.asList(segCopyInProgress, segCopyFinished,
segDeleteStarted));
+ }
+
+ private void checkContainsAll(Iterator<RemoteLogSegmentMetadata>
allSegments,
Review comment:
The implementation compromises on the ordering, since it converts the
iterator to a set. Is that intentional?
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+ private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("foo", 0));
+ private static final int SEG_SIZE = 1024 * 1024;
+ private static final int BROKER_ID = 0;
+
+ @Test
+ public void testCacheSegmentsWithDifferentStates() throws Exception {
+ RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+ // Add segments with different states and check
cache.remoteLogSegmentMetadata(int leaderEpoch, long offset)
+ // cache.listRemoteLogSegments(int leaderEpoch), and
cache.listAllRemoteLogSegments().
+
+ //
=============================================================================================================
+ // 1.Create a segment with state COPY_SEGMENT_STARTED, and check for
searching that segment and listing the
+ // segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg0leaderEpochs = Collections.singletonMap(0, 0L);
+ RemoteLogSegmentId seg0Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata segCopyInProgress = new
RemoteLogSegmentMetadata(seg0Id, 0L, 50L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+ cache.addToInProgress(segCopyInProgress);
+
+ // This segment should not be available as the state is not reached to
COPY_SEGMENT_FINISHED.
+ Optional<RemoteLogSegmentMetadata> seg0s0e0 =
cache.remoteLogSegmentMetadata(0, 0);
+ Assertions.assertFalse(seg0s0e0.isPresent());
+
+ // cache.listRemoteLogSegments(0) should not contain the above
segment, it will be empty.
+ Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
+ // But cache.listRemoteLogSegments() should contain the above segment.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
Collections.singletonList(segCopyInProgress));
+
+ //
=============================================================================================================
+ // 2.Create a segment and move it to state COPY_SEGMENT_FINISHED. and
check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg1leaderEpochs = Collections.singletonMap(0,
101L);
+ RemoteLogSegmentId seg1Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg1 = new RemoteLogSegmentMetadata(seg1Id,
101L, 200L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg1leaderEpochs);
+ cache.addToInProgress(seg1);
+ RemoteLogSegmentMetadataUpdate seg1Update = new
RemoteLogSegmentMetadataUpdate(seg1Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg1Update);
+ RemoteLogSegmentMetadata segCopyFinished =
seg1.createRemoteLogSegmentWithUpdates(seg1Update);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg1S150 =
cache.remoteLogSegmentMetadata(0, 150);
+
Assertions.assertEquals(seg1.createRemoteLogSegmentWithUpdates(seg1Update),
seg1S150.orElse(null));
+
+ // cache.listRemoteLogSegments(0) should not contain the above segment.
+ checkContainsAll(cache.listRemoteLogSegments(0),
Collections.singletonList(segCopyFinished));
+ // But cache.listRemoteLogSegments() should contain both the segments.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
Arrays.asList(segCopyInProgress, segCopyFinished));
+
+ //
=============================================================================================================
+ // 3.Create a segment and move it to state DELETE_SEGMENT_STARTED, and
check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg2leaderEpochs = Collections.singletonMap(0,
201L);
+ RemoteLogSegmentId seg2Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg2 = new RemoteLogSegmentMetadata(seg2Id,
201L, 300L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg2leaderEpochs);
+ cache.addToInProgress(seg2);
+ RemoteLogSegmentMetadataUpdate seg2Update = new
RemoteLogSegmentMetadataUpdate(seg2Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg2Update);
+ RemoteLogSegmentMetadata segDeleteStarted =
seg2.createRemoteLogSegmentWithUpdates(seg2Update);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg2S250 =
cache.remoteLogSegmentMetadata(0, 250);
+
Assertions.assertEquals(seg2.createRemoteLogSegmentWithUpdates(seg2Update),
seg2S250.orElse(null));
+
+ // cache.listRemoteLogSegments(0) should contain the above segment.
+ checkContainsAll(cache.listRemoteLogSegments(0),
Arrays.asList(segCopyFinished, segDeleteStarted));
+ // But cache.listRemoteLogSegments() should contain all the segments.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
+ Arrays.asList(segCopyInProgress, segCopyFinished,
segDeleteStarted));
+
+ //
=============================================================================================================
+ // 4.Create a segment and move it to state DELETE_SEGMENT_FINISHED,
and check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg3leaderEpochs = Collections.singletonMap(0,
301L);
+ RemoteLogSegmentId seg3Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg3 = new RemoteLogSegmentMetadata(seg3Id,
301L, 400L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg3leaderEpochs);
+ cache.addToInProgress(seg3);
+ RemoteLogSegmentMetadataUpdate seg3Update1 = new
RemoteLogSegmentMetadataUpdate(seg3Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg3Update1);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg3S350 =
cache.remoteLogSegmentMetadata(0, 350);
+
Assertions.assertEquals(seg3.createRemoteLogSegmentWithUpdates(seg3Update1),
seg3S350.orElse(null));
Review comment:
Could we assert just before this line that `seg3S350` is not empty? this
will simplify the `seg3S350.orElse(null)` argument to `seg3S350.get()`.
(same comment applies for other places in this test)
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+ private static final Logger log =
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+ private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata>
idToSegmentMetadata
+ = new ConcurrentHashMap<>();
+
+ // It keeps the segments which are not yet reached to
COPY_SEGMENT_FINISHED state.
+ private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new
HashSet<>();
Review comment:
This is defined to be not thread safe unlike the other maps. Is there
any reason?
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed
by inmemory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements
RemoteLogMetadataManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
+
+ private final ConcurrentMap<TopicIdPartition,
RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<TopicIdPartition, RemoteLogMetadataCache>
partitionToRemoteLogMetadataCache =
Review comment:
Could we call this `idToRemoteLogMetadataCache` to align with the naming
of the other attribute thats called `idToPartitionDeleteMetadata` ?
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class InmemoryRemoteLogMetadataManagerTest {
+
+ private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+ private static final int SEG_SIZE = 1024 * 1024;
+ private static final int BROKER_ID = 0;
+
+ @Test
+ public void testRLMMFetchSegment() throws Exception {
+ InmemoryRemoteLogMetadataManager rlmm = new
InmemoryRemoteLogMetadataManager();
+ int brokerId = 0;
+ // Create remote log segment metadata and add them to RLMM.
+
+ // segment 0
+ // 0-100
+ // leader epochs (0,0), (1,20), (2,80)
+ Map<Integer, Long> seg0leaderEpochs = new HashMap<>();
+ seg0leaderEpochs.put(0, 0L);
+ seg0leaderEpochs.put(1, 20L);
+ seg0leaderEpochs.put(2, 80L);
+ RemoteLogSegmentId segIdFooTp0s0e100 = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata segMetFooTp0s0e100 = new
RemoteLogSegmentMetadata(segIdFooTp0s0e100, 0L, 100L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+ rlmm.addRemoteLogSegmentMetadata(segMetFooTp0s0e100);
+
+ // wWe should not get this as the segment is still gettign copied and
it is not yet considered successful until
Review comment:
Typos:
1. s/wWe/We
2. s/gettign/getting
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class is an implementation of {@link RemoteLogMetadataManager} backed
by inmemory store.
+ */
+public class InmemoryRemoteLogMetadataManager implements
RemoteLogMetadataManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
+
+ private final ConcurrentMap<TopicIdPartition,
RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<TopicIdPartition, RemoteLogMetadataCache>
partitionToRemoteLogMetadataCache =
+ new ConcurrentHashMap<>();
+
+ @Override
+ public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata)
+ throws RemoteStorageException {
+ log.debug("Adding remote log segment : [{}]",
remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ // this method is allowed only to add remote log segment with the
initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
+ // but not to update the existing remote log segment metadata.
+ if (remoteLogSegmentMetadata.state() !=
RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+ throw new IllegalArgumentException("Given remoteLogSegmentMetadata
should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED
+ + " but it contains state as: " +
remoteLogSegmentMetadata.state());
+ }
+
+ RemoteLogSegmentId remoteLogSegmentId =
remoteLogSegmentMetadata.remoteLogSegmentId();
+
+ RemoteLogMetadataCache remoteLogMetadataCache =
partitionToRemoteLogMetadataCache
+ .computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id ->
new RemoteLogMetadataCache());
+
+ remoteLogMetadataCache.addToInProgress(remoteLogSegmentMetadata);
+ }
+
+ @Override
+ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
metadataUpdate)
+ throws RemoteStorageException {
+ log.debug("Updating remote log segment: [{}]", metadataUpdate);
+ Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be
null");
+
+ RemoteLogSegmentState targetState = metadataUpdate.state();
+ // Callers should use putRemoteLogSegmentMetadata to add
RemoteLogSegmentMetadata with state as
+ // RemoteLogSegmentState.COPY_SEGMENT_STARTED.
+ if (targetState == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
Review comment:
Similar to above comment, why not check this inside
`remoteLogMetadataCache.updateRemoteLogSegmentMetadata()`?
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+ private static final Logger log =
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+ private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata>
idToSegmentMetadata
Review comment:
Can we add a 1-line doc for this similar to other attributes below?
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+ private static final Logger log =
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+ private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata>
idToSegmentMetadata
+ = new ConcurrentHashMap<>();
+
+ // It keeps the segments which are not yet reached to
COPY_SEGMENT_FINISHED state.
+ private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new
HashSet<>();
+
+ // It will have all the segments except with state as COPY_SEGMENT_STARTED.
+ private final ConcurrentMap<Integer, NavigableMap<Long,
RemoteLogSegmentId>> leaderEpochToOffsetToId
+ = new ConcurrentHashMap<>();
+
+ private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ log.debug("Adding remote log segment metadata: [{}]",
remoteLogSegmentMetadata);
+ idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(),
remoteLogSegmentMetadata);
+ Map<Integer, Long> leaderEpochToOffset =
remoteLogSegmentMetadata.segmentLeaderEpochs();
+ for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
+ leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new
ConcurrentSkipListMap<>())
+ .put(entry.getValue(),
remoteLogSegmentMetadata.remoteLogSegmentId());
+ }
+ }
+
+ public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int
leaderEpoch, long offset) {
+ NavigableMap<Long, RemoteLogSegmentId> offsetToId =
leaderEpochToOffsetToId.get(leaderEpoch);
+ if (offsetToId == null || offsetToId.isEmpty()) {
+ return Optional.empty();
+ }
+
+ // look for floor entry as the given offset may exist in this entry.
+ Map.Entry<Long, RemoteLogSegmentId> entry =
offsetToId.floorEntry(offset);
+ if (entry == null) {
+ // if the offset is lower than the minimum offset available in
metadata then return empty.
+ return Optional.empty();
+ }
+
+ RemoteLogSegmentMetadata metadata =
idToSegmentMetadata.get(entry.getValue());
+ // check whether the given offset with leaderEpoch exists in this
segment.
+ // check for epoch's offset boundaries with in this segment.
+ // 1. get the next epoch's start offset -1 if exists
+ // 2. if no next epoch exists, then segment end offset can be
considered as epoch's relative end offset.
+ Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs()
+ .higherEntry(leaderEpoch);
+ long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 :
metadata.endOffset();
+
+ // seek offset should be <= epoch's end offset.
+ return (offset > epochEndOffset) ? Optional.empty() :
Optional.of(metadata);
+ }
+
+ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
metadataUpdate)
+ throws RemoteResourceNotFoundException {
+ log.debug("Updating remote log segment metadata: [{}]",
metadataUpdate);
+ RemoteLogSegmentId remoteLogSegmentId =
metadataUpdate.remoteLogSegmentId();
+ RemoteLogSegmentMetadata existingMetadata =
idToSegmentMetadata.get(remoteLogSegmentId);
+ if (existingMetadata == null) {
+ throw new RemoteResourceNotFoundException("No remote log segment
metadata found for : "
+ + remoteLogSegmentId);
+ }
+
+ RemoteLogSegmentState targetState = metadataUpdate.state();
+ RemoteLogSegmentState existingState = existingMetadata.state();
+ if (!RemoteLogSegmentState.isValidTransition(existingMetadata.state(),
targetState)) {
+ throw new IllegalStateException("Current state: " + existingState
+ ", target state: " + targetState);
+ }
+
Review comment:
In this method, we allow for existing entries in `idToSegmentMetadata`
to be replaced, even if the state of the existing and new entries are the same.
Is that intentional?
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+ private static final Logger log =
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+ private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata>
idToSegmentMetadata
+ = new ConcurrentHashMap<>();
+
+ // It keeps the segments which are not yet reached to
COPY_SEGMENT_FINISHED state.
+ private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new
HashSet<>();
+
+ // It will have all the segments except with state as COPY_SEGMENT_STARTED.
+ private final ConcurrentMap<Integer, NavigableMap<Long,
RemoteLogSegmentId>> leaderEpochToOffsetToId
+ = new ConcurrentHashMap<>();
+
+ private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ log.debug("Adding remote log segment metadata: [{}]",
remoteLogSegmentMetadata);
+ idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(),
remoteLogSegmentMetadata);
+ Map<Integer, Long> leaderEpochToOffset =
remoteLogSegmentMetadata.segmentLeaderEpochs();
+ for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
+ leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new
ConcurrentSkipListMap<>())
+ .put(entry.getValue(),
remoteLogSegmentMetadata.remoteLogSegmentId());
+ }
+ }
+
+ public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int
leaderEpoch, long offset) {
+ NavigableMap<Long, RemoteLogSegmentId> offsetToId =
leaderEpochToOffsetToId.get(leaderEpoch);
+ if (offsetToId == null || offsetToId.isEmpty()) {
+ return Optional.empty();
+ }
+
+ // look for floor entry as the given offset may exist in this entry.
+ Map.Entry<Long, RemoteLogSegmentId> entry =
offsetToId.floorEntry(offset);
+ if (entry == null) {
+ // if the offset is lower than the minimum offset available in
metadata then return empty.
+ return Optional.empty();
+ }
+
+ RemoteLogSegmentMetadata metadata =
idToSegmentMetadata.get(entry.getValue());
+ // check whether the given offset with leaderEpoch exists in this
segment.
+ // check for epoch's offset boundaries with in this segment.
+ // 1. get the next epoch's start offset -1 if exists
+ // 2. if no next epoch exists, then segment end offset can be
considered as epoch's relative end offset.
+ Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs()
+ .higherEntry(leaderEpoch);
+ long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 :
metadata.endOffset();
+
+ // seek offset should be <= epoch's end offset.
+ return (offset > epochEndOffset) ? Optional.empty() :
Optional.of(metadata);
+ }
+
+ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
metadataUpdate)
+ throws RemoteResourceNotFoundException {
+ log.debug("Updating remote log segment metadata: [{}]",
metadataUpdate);
+ RemoteLogSegmentId remoteLogSegmentId =
metadataUpdate.remoteLogSegmentId();
+ RemoteLogSegmentMetadata existingMetadata =
idToSegmentMetadata.get(remoteLogSegmentId);
+ if (existingMetadata == null) {
+ throw new RemoteResourceNotFoundException("No remote log segment
metadata found for : "
+ + remoteLogSegmentId);
+ }
+
+ RemoteLogSegmentState targetState = metadataUpdate.state();
+ RemoteLogSegmentState existingState = existingMetadata.state();
+ if (!RemoteLogSegmentState.isValidTransition(existingMetadata.state(),
targetState)) {
+ throw new IllegalStateException("Current state: " + existingState
+ ", target state: " + targetState);
+ }
+
+ RemoteLogSegmentMetadata updatedMetadata =
existingMetadata.createRemoteLogSegmentWithUpdates(metadataUpdate);
+ idToSegmentMetadata.put(remoteLogSegmentId, updatedMetadata);
+ if (targetState != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+ remoteLogSegmentIdInProgress.remove(remoteLogSegmentId);
+ addRemoteLogSegmentMetadata(updatedMetadata);
+ }
+
+ if (targetState == RemoteLogSegmentState.DELETE_SEGMENT_FINISHED) {
+ log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+ // remove this entry when the state is moved to
delete_segment_finished
+ Map<Integer, Long> leaderEpochs =
existingMetadata.segmentLeaderEpochs();
Review comment:
Hmm, the entry for `existingMetadata` gets overwritten in the call to
`addRemoteLogSegmentMetadata` in L110. Should we be accounting for the same
here?
##########
File path:
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+ private static final Logger log =
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+ private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata>
idToSegmentMetadata
+ = new ConcurrentHashMap<>();
+
+ // It keeps the segments which are not yet reached to
COPY_SEGMENT_FINISHED state.
+ private final Set<RemoteLogSegmentId> remoteLogSegmentIdInProgress = new
HashSet<>();
+
+ // It will have all the segments except with state as COPY_SEGMENT_STARTED.
+ private final ConcurrentMap<Integer, NavigableMap<Long,
RemoteLogSegmentId>> leaderEpochToOffsetToId
+ = new ConcurrentHashMap<>();
+
+ private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ log.debug("Adding remote log segment metadata: [{}]",
remoteLogSegmentMetadata);
+ idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(),
remoteLogSegmentMetadata);
+ Map<Integer, Long> leaderEpochToOffset =
remoteLogSegmentMetadata.segmentLeaderEpochs();
+ for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
+ leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new
ConcurrentSkipListMap<>())
+ .put(entry.getValue(),
remoteLogSegmentMetadata.remoteLogSegmentId());
+ }
+ }
+
+ public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int
leaderEpoch, long offset) {
+ NavigableMap<Long, RemoteLogSegmentId> offsetToId =
leaderEpochToOffsetToId.get(leaderEpoch);
+ if (offsetToId == null || offsetToId.isEmpty()) {
+ return Optional.empty();
+ }
+
+ // look for floor entry as the given offset may exist in this entry.
+ Map.Entry<Long, RemoteLogSegmentId> entry =
offsetToId.floorEntry(offset);
+ if (entry == null) {
+ // if the offset is lower than the minimum offset available in
metadata then return empty.
+ return Optional.empty();
+ }
+
+ RemoteLogSegmentMetadata metadata =
idToSegmentMetadata.get(entry.getValue());
+ // check whether the given offset with leaderEpoch exists in this
segment.
+ // check for epoch's offset boundaries with in this segment.
+ // 1. get the next epoch's start offset -1 if exists
+ // 2. if no next epoch exists, then segment end offset can be
considered as epoch's relative end offset.
+ Map.Entry<Integer, Long> nextEntry = metadata.segmentLeaderEpochs()
+ .higherEntry(leaderEpoch);
+ long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 :
metadata.endOffset();
+
+ // seek offset should be <= epoch's end offset.
+ return (offset > epochEndOffset) ? Optional.empty() :
Optional.of(metadata);
+ }
+
+ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
metadataUpdate)
+ throws RemoteResourceNotFoundException {
+ log.debug("Updating remote log segment metadata: [{}]",
metadataUpdate);
+ RemoteLogSegmentId remoteLogSegmentId =
metadataUpdate.remoteLogSegmentId();
+ RemoteLogSegmentMetadata existingMetadata =
idToSegmentMetadata.get(remoteLogSegmentId);
+ if (existingMetadata == null) {
+ throw new RemoteResourceNotFoundException("No remote log segment
metadata found for : "
+ + remoteLogSegmentId);
+ }
+
+ RemoteLogSegmentState targetState = metadataUpdate.state();
+ RemoteLogSegmentState existingState = existingMetadata.state();
+ if (!RemoteLogSegmentState.isValidTransition(existingMetadata.state(),
targetState)) {
+ throw new IllegalStateException("Current state: " + existingState
+ ", target state: " + targetState);
+ }
+
+ RemoteLogSegmentMetadata updatedMetadata =
existingMetadata.createRemoteLogSegmentWithUpdates(metadataUpdate);
+ idToSegmentMetadata.put(remoteLogSegmentId, updatedMetadata);
+ if (targetState != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
+ remoteLogSegmentIdInProgress.remove(remoteLogSegmentId);
+ addRemoteLogSegmentMetadata(updatedMetadata);
+ }
+
+ if (targetState == RemoteLogSegmentState.DELETE_SEGMENT_FINISHED) {
+ log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+ // remove this entry when the state is moved to
delete_segment_finished
+ Map<Integer, Long> leaderEpochs =
existingMetadata.segmentLeaderEpochs();
+ for (Map.Entry<Integer, Long> entry : leaderEpochs.entrySet()) {
+ NavigableMap<Long, RemoteLogSegmentId> offsetToIds =
leaderEpochToOffsetToId.get(entry.getKey());
+ // remove the mappings where this segment is deleted.
+ offsetToIds.values().remove(remoteLogSegmentId);
+ }
+
+ // remove the segment-id mapping.
+ idToSegmentMetadata.remove(remoteLogSegmentId);
+ }
+ }
+
+ public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() {
+ ArrayList<RemoteLogSegmentMetadata> list = new
ArrayList<>(idToSegmentMetadata.values());
+ list.addAll(remoteLogSegmentIdInProgress.stream().map(id ->
idToSegmentMetadata.get(id))
+ .collect(Collectors.toList()));
+
list.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset));
+ return list.iterator();
+ }
+
+ public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int
leaderEpoch) {
+ NavigableMap<Long, RemoteLogSegmentId> map =
leaderEpochToOffsetToId.get(leaderEpoch);
+ return map != null ? map.values().stream().map(id ->
idToSegmentMetadata.get(id)).iterator()
+ : Collections.emptyIterator();
+ }
+
+ public Optional<Long> highestLogOffset(int leaderEpoch) {
+ NavigableMap<Long, RemoteLogSegmentId> offsetToSegmentId =
leaderEpochToOffsetToId.get(leaderEpoch);
+ if (offsetToSegmentId == null) {
+ return Optional.empty();
+ }
+
+ long max = 0L;
+ for (RemoteLogSegmentId id : offsetToSegmentId.values()) {
+ RemoteLogSegmentMetadata metadata = idToSegmentMetadata.get(id);
+ Map.Entry<Integer, Long> nextEntry =
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
+ // If there is an higher entry than the given leader epoch,
end-offset of the given leader epoch is,
+ // (next leader epoch's start-offset) -1.
+ long nextVal = nextEntry != null ? nextEntry.getValue() - 1 :
metadata.endOffset();
+ max = Math.max(max, nextVal);
+ }
+
+ return Optional.of(max);
+ }
+
+ /**
+ * This will be added to copy_in_progress metadata list. This will be
removed from that list once it is moved to the
+ * next state which can be COPY_SEGMENT_FINISHED or DELETE_SEGMENT_STARTED.
+ *
+ * @param remoteLogSegmentMetadata
+ */
+ public void addToInProgress(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
Review comment:
Before we insert into the map/set, we should check if the provided
`remoteLogSegmentMetadata.state()` is `COPY_SEGMENT_STARTED`.
##########
File path:
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##########
@@ -87,4 +89,27 @@ public byte id() {
public static RemoteLogSegmentState forId(byte id) {
return STATE_TYPES.get(id);
}
+
+ public static boolean isValidTransition(RemoteLogSegmentState srcState,
RemoteLogSegmentState targetState) {
+ Objects.requireNonNull(targetState, "targetState can not be null");
+
+ if (srcState == null) {
Review comment:
It seems to me that `srcState` is never null in practice. Where does
this check come into play in practice?
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManagerTest.java
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class InmemoryRemoteLogMetadataManagerTest {
+
+ private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+ private static final int SEG_SIZE = 1024 * 1024;
+ private static final int BROKER_ID = 0;
+
+ @Test
+ public void testRLMMFetchSegment() throws Exception {
Review comment:
Can we improve the local variable names? for example
`segIdFooTp0s0e100`, `segMetFooTp0s0e100` etc. is not easy to read. We can use
simpler names.
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
Review comment:
Could we add test(s) for `highestLogOffset` API?
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+ private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("foo", 0));
+ private static final int SEG_SIZE = 1024 * 1024;
+ private static final int BROKER_ID = 0;
+
+ @Test
+ public void testCacheSegmentsWithDifferentStates() throws Exception {
+ RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+
+ // Add segments with different states and check
cache.remoteLogSegmentMetadata(int leaderEpoch, long offset)
+ // cache.listRemoteLogSegments(int leaderEpoch), and
cache.listAllRemoteLogSegments().
+
+ //
=============================================================================================================
+ // 1.Create a segment with state COPY_SEGMENT_STARTED, and check for
searching that segment and listing the
+ // segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg0leaderEpochs = Collections.singletonMap(0, 0L);
+ RemoteLogSegmentId seg0Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata segCopyInProgress = new
RemoteLogSegmentMetadata(seg0Id, 0L, 50L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg0leaderEpochs);
+ cache.addToInProgress(segCopyInProgress);
+
+ // This segment should not be available as the state is not reached to
COPY_SEGMENT_FINISHED.
+ Optional<RemoteLogSegmentMetadata> seg0s0e0 =
cache.remoteLogSegmentMetadata(0, 0);
+ Assertions.assertFalse(seg0s0e0.isPresent());
+
+ // cache.listRemoteLogSegments(0) should not contain the above
segment, it will be empty.
+ Assertions.assertFalse(cache.listRemoteLogSegments(0).hasNext());
+ // But cache.listRemoteLogSegments() should contain the above segment.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
Collections.singletonList(segCopyInProgress));
+
+ //
=============================================================================================================
+ // 2.Create a segment and move it to state COPY_SEGMENT_FINISHED. and
check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg1leaderEpochs = Collections.singletonMap(0,
101L);
+ RemoteLogSegmentId seg1Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg1 = new RemoteLogSegmentMetadata(seg1Id,
101L, 200L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg1leaderEpochs);
+ cache.addToInProgress(seg1);
+ RemoteLogSegmentMetadataUpdate seg1Update = new
RemoteLogSegmentMetadataUpdate(seg1Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg1Update);
+ RemoteLogSegmentMetadata segCopyFinished =
seg1.createRemoteLogSegmentWithUpdates(seg1Update);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg1S150 =
cache.remoteLogSegmentMetadata(0, 150);
+
Assertions.assertEquals(seg1.createRemoteLogSegmentWithUpdates(seg1Update),
seg1S150.orElse(null));
+
+ // cache.listRemoteLogSegments(0) should not contain the above segment.
+ checkContainsAll(cache.listRemoteLogSegments(0),
Collections.singletonList(segCopyFinished));
+ // But cache.listRemoteLogSegments() should contain both the segments.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
Arrays.asList(segCopyInProgress, segCopyFinished));
+
+ //
=============================================================================================================
+ // 3.Create a segment and move it to state DELETE_SEGMENT_STARTED, and
check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg2leaderEpochs = Collections.singletonMap(0,
201L);
+ RemoteLogSegmentId seg2Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg2 = new RemoteLogSegmentMetadata(seg2Id,
201L, 300L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg2leaderEpochs);
+ cache.addToInProgress(seg2);
+ RemoteLogSegmentMetadataUpdate seg2Update = new
RemoteLogSegmentMetadataUpdate(seg2Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
+ cache.updateRemoteLogSegmentMetadata(seg2Update);
+ RemoteLogSegmentMetadata segDeleteStarted =
seg2.createRemoteLogSegmentWithUpdates(seg2Update);
+
+ // Search should return the above segment.
+ Optional<RemoteLogSegmentMetadata> seg2S250 =
cache.remoteLogSegmentMetadata(0, 250);
+
Assertions.assertEquals(seg2.createRemoteLogSegmentWithUpdates(seg2Update),
seg2S250.orElse(null));
+
+ // cache.listRemoteLogSegments(0) should contain the above segment.
+ checkContainsAll(cache.listRemoteLogSegments(0),
Arrays.asList(segCopyFinished, segDeleteStarted));
+ // But cache.listRemoteLogSegments() should contain all the segments.
+ checkContainsAll(cache.listAllRemoteLogSegments(),
+ Arrays.asList(segCopyInProgress, segCopyFinished,
segDeleteStarted));
+
+ //
=============================================================================================================
+ // 4.Create a segment and move it to state DELETE_SEGMENT_FINISHED,
and check for searching that segment and
+ // listing the segments.
+ //
==============================================================================================================
+ Map<Integer, Long> seg3leaderEpochs = Collections.singletonMap(0,
301L);
+ RemoteLogSegmentId seg3Id = new RemoteLogSegmentId(TP0,
Uuid.randomUuid());
+ RemoteLogSegmentMetadata seg3 = new RemoteLogSegmentMetadata(seg3Id,
301L, 400L, -1L, BROKER_ID,
+ System.currentTimeMillis(), SEG_SIZE, seg3leaderEpochs);
+ cache.addToInProgress(seg3);
+ RemoteLogSegmentMetadataUpdate seg3Update1 = new
RemoteLogSegmentMetadataUpdate(seg3Id,
+ System.currentTimeMillis(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID);
Review comment:
Should we alter the other arguments too, for example `BROKER_ID` and
`eventTimestamp`? It appears that we expect `RemoteLogMetadataCache` to [apply
all of the provided
updates](https://github.com/apache/kafka/blob/0d9a95a7d0ab06aecc4480901707e29dd2a3147e/clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java#L240-L242),
and this may include the other fields as well.
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+ // map of key to log data, which can be segment or any of its indexes.
+ private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+ public InmemoryRemoteStorageManager() {
+ }
+
+ static String generateKeyForSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
".segment";
+ }
+
+ static String generateKeyForIndex(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ IndexType indexType) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
"." + indexType.toString();
+ }
+
+ // visible for testing.
+ boolean containsKey(String key) {
+ return keyToLogData.containsKey(key);
+ }
+
+ @Override
+ public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
+ throws RemoteStorageException {
+ log.debug("copying log segment and indexes for : {}",
remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+ Objects.requireNonNull(logSegmentData, "logSegmentData can not be
null");
+ try {
+ keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
Review comment:
As per the interface we
[mandate](https://github.com/apache/kafka/blob/0d9a95a7d0ab06aecc4480901707e29dd2a3147e/clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L76)
the caller to ensure unique ID, but is it useful to add a guard that disallows
replacing existing values?)
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+ // map of key to log data, which can be segment or any of its indexes.
+ private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+ public InmemoryRemoteStorageManager() {
Review comment:
Can we remove this c'tor in exchange for the default generated c'tor?
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RemoteLogMetadataCacheTest {
+
+ private static final TopicIdPartition TP0 = new
TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("foo", 0));
+ private static final int SEG_SIZE = 1024 * 1024;
+ private static final int BROKER_ID = 0;
+
+ @Test
+ public void testCacheSegmentsWithDifferentStates() throws Exception {
Review comment:
This particular test checks a number of things together in one test.
Instead, could sections (1) to (4) from below each be defined as a separate
unit test? Especially since each section seems to operate on a different
segment, so it seems logically independent.
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+ // map of key to log data, which can be segment or any of its indexes.
+ private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+ public InmemoryRemoteStorageManager() {
+ }
+
+ static String generateKeyForSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
".segment";
+ }
+
+ static String generateKeyForIndex(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ IndexType indexType) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
"." + indexType.toString();
+ }
+
+ // visible for testing.
+ boolean containsKey(String key) {
+ return keyToLogData.containsKey(key);
+ }
+
+ @Override
+ public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
+ throws RemoteStorageException {
+ log.debug("copying log segment and indexes for : {}",
remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+ Objects.requireNonNull(logSegmentData, "logSegmentData can not be
null");
+ try {
+ keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+ Files.readAllBytes(logSegmentData.logSegment().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Offset),
+ Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Timestamp),
+ Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Transaction),
+ Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.LeaderEpoch),
+ logSegmentData.leaderEpochIndex().array());
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.ProducerSnapshot),
+
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+ } catch (IOException e) {
+ throw new RemoteStorageException(e.getMessage(), e);
+ }
+ log.debug("copied log segment and indexes for : {} successfully.",
remoteLogSegmentMetadata);
+ }
+
+ @Override
+ public InputStream fetchLogSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ int startPosition)
+ throws RemoteStorageException {
+ log.debug("Received fetch segment request at start position: [{}] for
[{}]", startPosition, remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ return fetchLogSegment(remoteLogSegmentMetadata, startPosition,
Integer.MAX_VALUE);
+ }
+
+ @Override
+ public InputStream fetchLogSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ int startPosition,
+ int endPosition) throws
RemoteStorageException {
+ log.debug("Received fetch segment request at start position: [{}] and
end position: [{}] for segment [{}]",
+ startPosition, endPosition, remoteLogSegmentMetadata);
+
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ if (startPosition < 0 || endPosition < 0) {
+ throw new IllegalArgumentException("Given start position or end
position must not be negative.");
+ }
+
+ if (endPosition < startPosition) {
+ throw new IllegalArgumentException("end position must be greater
than start position");
+ }
+
+ String key = generateKeyForSegment(remoteLogSegmentMetadata);
+ byte[] segment = keyToLogData.get(key);
+
+ if (segment == null) {
+ throw new RemoteResourceNotFoundException("No remote log segment
found with start offset:"
+ +
remoteLogSegmentMetadata.startOffset() + " and id: "
+ +
remoteLogSegmentMetadata.remoteLogSegmentId());
+ }
+
+ if (startPosition >= segment.length) {
+ throw new IllegalArgumentException("start position: " +
startPosition
+ + " must be less than the
length of the segment: " + segment.length);
+ }
+
+ // check for boundaries like given end position is more than the
length, length should never be more than the
+ // existing segment size.
+ int length = Math.min(segment.length - 1, endPosition) - startPosition
+ 1;
+ log.debug("Length of the segment to be sent: [{}], for segment: [{}]",
length, remoteLogSegmentMetadata);
+
+ return new ByteArrayInputStream(segment, startPosition, length);
+ }
+
+ @Override
+ public InputStream fetchIndex(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ IndexType indexType) throws
RemoteStorageException {
+ log.debug("Received fetch request for index type: [{}], segment [{}]",
indexType, remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+ Objects.requireNonNull(indexType, "indexType can not be null");
+
+ String key = generateKeyForIndex(remoteLogSegmentMetadata, indexType);
+ byte[] index = keyToLogData.get(key);
+ if (index == null) {
+ throw new RemoteResourceNotFoundException("No remote log segment
index found with start offset:"
+ +
remoteLogSegmentMetadata.startOffset() + " and id: "
+ +
remoteLogSegmentMetadata.remoteLogSegmentId());
+ }
+
+ return new ByteArrayInputStream(index);
+ }
+
+ @Override
+ public void deleteLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) throws RemoteStorageException {
+ log.info("Deleting log segment for: [{}]", remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+ String segmentKey = generateKeyForSegment(remoteLogSegmentMetadata);
+ keyToLogData.remove(segmentKey);
+ for (IndexType indexType : IndexType.values()) {
+ String key = generateKeyForIndex(remoteLogSegmentMetadata,
indexType);
+ keyToLogData.remove(key);
+ }
+ log.info("Deleted log segment successfully for: [{}]",
remoteLogSegmentMetadata);
+ }
+
+ @Override
+ public void close() throws IOException {
+ keyToLogData = Collections.emptyMap();
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
Review comment:
Is this intentionally left empty?
##########
File path:
remote-storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class is an implementation of {@link RemoteStorageManager} backed by
inmemory store.
+ */
+public class InmemoryRemoteStorageManager implements RemoteStorageManager {
+ private static final Logger log =
LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
+
+ // map of key to log data, which can be segment or any of its indexes.
+ private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
+
+ public InmemoryRemoteStorageManager() {
+ }
+
+ static String generateKeyForSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
".segment";
+ }
+
+ static String generateKeyForIndex(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ IndexType indexType) {
+ return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() +
"." + indexType.toString();
+ }
+
+ // visible for testing.
+ boolean containsKey(String key) {
+ return keyToLogData.containsKey(key);
+ }
+
+ @Override
+ public void copyLogSegmentData(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
+ throws RemoteStorageException {
+ log.debug("copying log segment and indexes for : {}",
remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+ Objects.requireNonNull(logSegmentData, "logSegmentData can not be
null");
+ try {
+ keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
+ Files.readAllBytes(logSegmentData.logSegment().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Offset),
+ Files.readAllBytes(logSegmentData.offsetIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Timestamp),
+ Files.readAllBytes(logSegmentData.timeIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.Transaction),
+ Files.readAllBytes(logSegmentData.txnIndex().toPath()));
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.LeaderEpoch),
+ logSegmentData.leaderEpochIndex().array());
+ keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata,
IndexType.ProducerSnapshot),
+
Files.readAllBytes(logSegmentData.producerSnapshotIndex().toPath()));
+ } catch (IOException e) {
+ throw new RemoteStorageException(e.getMessage(), e);
+ }
+ log.debug("copied log segment and indexes for : {} successfully.",
remoteLogSegmentMetadata);
+ }
+
+ @Override
+ public InputStream fetchLogSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ int startPosition)
+ throws RemoteStorageException {
+ log.debug("Received fetch segment request at start position: [{}] for
[{}]", startPosition, remoteLogSegmentMetadata);
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ return fetchLogSegment(remoteLogSegmentMetadata, startPosition,
Integer.MAX_VALUE);
+ }
+
+ @Override
+ public InputStream fetchLogSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata,
+ int startPosition,
+ int endPosition) throws
RemoteStorageException {
+ log.debug("Received fetch segment request at start position: [{}] and
end position: [{}] for segment [{}]",
+ startPosition, endPosition, remoteLogSegmentMetadata);
+
+ Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
+
+ if (startPosition < 0 || endPosition < 0) {
+ throw new IllegalArgumentException("Given start position or end
position must not be negative.");
+ }
+
+ if (endPosition < startPosition) {
+ throw new IllegalArgumentException("end position must be greater
than start position");
Review comment:
Probably better to say `...must be greater than or equal to...` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]