junrao commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r620461362
########## File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadata.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Base class for remote log metadata objects like {@link RemoteLogSegmentMetadata}, {@link RemoteLogSegmentMetadataUpdate}, + * and {@link RemotePartitionDeleteMetadata}. + */ +@InterfaceStability.Evolving +public class RemoteLogMetadata { Review comment: Should this class be abstract? ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { + private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); + private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); + private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + + private static final Map<String, Short> REMOTE_LOG_STORAGE_CLASS_TO_API_KEY = createRemoteLogStorageClassToApiKeyMap(); + private static final Map<Short, RemoteLogMetadataTransform> KEY_TO_TRANSFORM = createRemoteLogMetadataTransforms(); + + private static final BytesApiMessageSerde BYTES_API_MESSAGE_SERDE = new BytesApiMessageSerde() { + @Override + public ApiMessage apiMessageFor(short apiKey) { Review comment: Could we generate this mapping using MetadataRecordTypeGenerator like what raft did? ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class RemoteLogMetadataSerdeTest { + + public static final String TOPIC = "foo"; + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); + private final Time time = new MockTime(1); + + @Test + public void testRemoteLogSegmentMetadataSerde() { + RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); + + doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); + } + + @Test + public void testRemoteLogSegmentMetadataUpdateSerde() { + // Create RemoteLogSegmentMetadataUpdate for RemoteLogSegmentMetadataUpdate + RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate(); + + doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate); + } + + @Test + public void testRemotePartitionDeleteMetadataSerde() { + // Create RemoteLogMetadataContext for RemotePartitionDeleteMetadata + RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata(); + + doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata); + } + + private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { + Map<Integer, Long> segLeaderEpochs = new HashMap<>(); + segLeaderEpochs.put(0, 0L); + segLeaderEpochs.put(1, 20L); + segLeaderEpochs.put(2, 80L); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, + time.milliseconds(), 1024, segLeaderEpochs); + } + + private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(), + RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2); + } + + private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() { + return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED, + time.milliseconds(), 0); + } + + private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { + RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + // Serialize metadata and get the bytes. + byte[] metadataBytes = serde.serialize(remoteLogMetadata); + + // Deserialize the bytes and check the RemoteLogMetadata object is as expected. + RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde(); + RemoteLogMetadata deserializedRlmContext = deserializer.deserialize(metadataBytes); Review comment: Should we name deserializedRlmContext differently since there is no longer a RlmContext? ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { + private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey(); + private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); + private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); + + private static final Map<String, Short> REMOTE_LOG_STORAGE_CLASS_TO_API_KEY = createRemoteLogStorageClassToApiKeyMap(); Review comment: Perhaps we could just use Class as the key instead of class name? ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class RemoteLogMetadataSerdeTest { + + public static final String TOPIC = "foo"; + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); + private final Time time = new MockTime(1); + + @Test + public void testRemoteLogSegmentMetadataSerde() { + RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); + + doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); + } + + @Test + public void testRemoteLogSegmentMetadataUpdateSerde() { + // Create RemoteLogSegmentMetadataUpdate for RemoteLogSegmentMetadataUpdate + RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate(); + + doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate); + } + + @Test + public void testRemotePartitionDeleteMetadataSerde() { + // Create RemoteLogMetadataContext for RemotePartitionDeleteMetadata Review comment: There is no longer RemoteLogMetadataContext. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataSerde { Review comment: Since we just need a single instance of this class, perhaps we could have a static public method to return the singleton and make the public constructor private? ########## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class RemoteLogMetadataSerdeTest { + + public static final String TOPIC = "foo"; + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); + private final Time time = new MockTime(1); + + @Test + public void testRemoteLogSegmentMetadataSerde() { + RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); + + doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); + } + + @Test + public void testRemoteLogSegmentMetadataUpdateSerde() { + // Create RemoteLogSegmentMetadataUpdate for RemoteLogSegmentMetadataUpdate Review comment: Why do we need to mention RemoteLogSegmentMetadataUpdate twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org