This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-snapshot
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/dledger-controller-snapshot by 
this push:
     new 28f2d0826 [ISSUE #5585]  Implement controller statemachine snapshot 
file generator (#5642)
28f2d0826 is described below

commit 28f2d0826288faee798942973e8ba8c5b70445e4
Author: hzh0425 <[email protected]>
AuthorDate: Mon Dec 5 15:13:21 2022 +0800

    [ISSUE #5585]  Implement controller statemachine snapshot file generator 
(#5642)
    
    * implement Statemachine snapshot file generator
    
    * Using filechannel to replace fileoutputstream
---
 .../controller/impl/DLedgerController.java         |  26 +--
 .../impl/manager/ReplicasInfoManager.java          |   3 +-
 .../DLedgerControllerStateMachine.java             |   4 +-
 .../StatemachineSnapshotFileGenerator.java         | 177 +++++++++++++++++++++
 .../StatemachineSnapshotFileGeneratorTest.java     |  89 +++++++++++
 5 files changed, 283 insertions(+), 16 deletions(-)

diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 71e8e465c..27a75b2ea 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -24,18 +24,6 @@ import io.openmessaging.storage.dledger.MemberState;
 import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
 import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
 import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiPredicate;
-import java.util.function.Supplier;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -47,6 +35,7 @@ import 
org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.event.EventSerializer;
 import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import 
org.apache.rocketmq.controller.impl.statemachine.DLedgerControllerStateMachine;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -64,6 +53,19 @@ import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataRespon
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiPredicate;
+import java.util.function.Supplier;
+
 /**
  * The implementation of controller, based on DLedger (raft).
  */
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 9636f63a0..f54709572 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -477,13 +477,12 @@ public class ReplicasInfoManager implements 
SnapshotAbleMetadataManager {
     @Override
     public byte[] encodeMetadata() {
         String json = RemotingSerializable.toJson(this, true);
-        System.out.println(json);
         return json.getBytes(StandardCharsets.UTF_8);
     }
 
     @Override
     public boolean loadMetadata(byte[] data) {
-        String json = new String(data);
+        String json = new String(data, StandardCharsets.UTF_8);
         ReplicasInfoManager obj = RemotingSerializable.fromJson(json, 
ReplicasInfoManager.class);
         this.syncStateSetInfoTable.putAll(obj.syncStateSetInfoTable);
         this.replicaInfoTable.putAll(obj.replicaInfoTable);
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/DLedgerControllerStateMachine.java
similarity index 95%
rename from 
controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
rename to 
controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/DLedgerControllerStateMachine.java
index 4f1408b37..ee10775d6 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/DLedgerControllerStateMachine.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.controller.impl;
+package org.apache.rocketmq.controller.impl.statemachine;
 
 import io.openmessaging.storage.dledger.entry.DLedgerEntry;
 import io.openmessaging.storage.dledger.exception.DLedgerException;
@@ -39,7 +39,7 @@ public class DLedgerControllerStateMachine implements 
StateMachine {
     private final String dLedgerId;
 
     public DLedgerControllerStateMachine(final ReplicasInfoManager 
replicasInfoManager,
-        final EventSerializer eventSerializer, final String dLedgerId) {
+                                         final EventSerializer 
eventSerializer, final String dLedgerId) {
         this.replicasInfoManager = replicasInfoManager;
         this.eventSerializer = eventSerializer;
         this.dLedgerId = dLedgerId;
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/StatemachineSnapshotFileGenerator.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/StatemachineSnapshotFileGenerator.java
new file mode 100644
index 000000000..d65c1eaea
--- /dev/null
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/statemachine/StatemachineSnapshotFileGenerator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.statemachine;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.impl.manager.MetadataManagerType;
+import org.apache.rocketmq.controller.impl.manager.SnapshotAbleMetadataManager;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StatemachineSnapshotFileGenerator {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+
+    static class SnapshotFileHeader {
+        // Magic + Version + TotalSections + BodyLength + Reversed
+        public static final int HEADER_LENGTH = 18;
+        public static final Integer MAGIC = -626843481;
+        public static final Short VERSION = 1;
+        public static final Long REVERSED = 0L;
+        public final int totalSections;
+
+
+        public SnapshotFileHeader(int totalSections) {
+            this.totalSections = totalSections;
+        }
+
+        public static SnapshotFileHeader from(ByteBuffer header) {
+            if (header == null || header.capacity() < HEADER_LENGTH) {
+                return null;
+            }
+            int magic = header.getInt();
+            if (magic != SnapshotFileHeader.MAGIC) {
+                return null;
+            }
+            short version = header.getShort();
+            if (version != SnapshotFileHeader.VERSION) {
+                return null;
+            }
+
+            int totalSections = header.getInt();
+            return new SnapshotFileHeader(totalSections);
+        }
+
+        public ByteBuffer build() {
+            ByteBuffer buffer = ByteBuffer.allocate(HEADER_LENGTH);
+            buffer.putInt(MAGIC);
+            buffer.putShort(VERSION);
+            buffer.putInt(this.totalSections);
+            buffer.putLong(REVERSED);
+            buffer.flip();
+            return buffer;
+        }
+    }
+
+    private final Map<Short/*MetadataManagerId*/, SnapshotAbleMetadataManager> 
metadataManagerTable;
+
+    public StatemachineSnapshotFileGenerator(final 
List<SnapshotAbleMetadataManager> managers) {
+        this.metadataManagerTable = new HashMap<>();
+        managers.forEach(manager -> 
this.metadataManagerTable.put(manager.getMetadataManagerType().getId(), 
manager));
+    }
+
+
+    /**
+     * Generate snapshot and write the data to snapshot file.
+     */
+    public synchronized void generateSnapshot(final String snapshotPath) 
throws IOException {
+        try (final FileChannel fileChannel = 
FileChannel.open(Paths.get(snapshotPath),
+                StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
+            // Write Snapshot Header
+            SnapshotFileHeader header = new 
SnapshotFileHeader(this.metadataManagerTable.size());
+
+            fileChannel.write(header.build());
+
+            // Write each section
+            ByteBuffer sectionHeader = ByteBuffer.allocate(6);
+            for (Map.Entry<Short, SnapshotAbleMetadataManager> section : 
this.metadataManagerTable.entrySet()) {
+                byte[] serializedBytes = section.getValue().encodeMetadata();
+                // Section format: 
<Section-MetadataManagerType><Section-Length><Section-Bytes>
+
+                // Write section header
+                sectionHeader.putShort(section.getKey());
+                sectionHeader.putInt(serializedBytes.length);
+                sectionHeader.flip();
+                fileChannel.write(sectionHeader);
+                sectionHeader.rewind();
+
+                // Write section bytes
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+
+            fileChannel.force(true);
+        }
+    }
+
+    /**
+     * Read snapshot from snapshot file and load the metadata into 
corresponding metadataManager
+     */
+    public synchronized boolean loadSnapshot(final String snapshotPath) throws 
IOException {
+        try (ReadableByteChannel channel = 
Channels.newChannel(Files.newInputStream(Paths.get(snapshotPath)))) {
+            // Read snapshot Header
+            ByteBuffer header = 
ByteBuffer.allocate(SnapshotFileHeader.HEADER_LENGTH);
+            if (channel.read(header) < 0) {
+                return false;
+            }
+            header.rewind();
+
+            SnapshotFileHeader fileHeader = SnapshotFileHeader.from(header);
+            if (fileHeader == null) {
+                return false;
+            }
+
+            // Read each section
+            ByteBuffer sectionHeader = ByteBuffer.allocate(6);
+            int successLoadCnt = 0;
+            int readSize;
+            while ((readSize = channel.read(sectionHeader)) > 0) {
+                sectionHeader.rewind();
+
+                if (readSize != sectionHeader.capacity()) {
+                    throw new IOException("Invalid amount of data read for the 
header of a section");
+                }
+
+                // Section format: 
<Section-MetadataManagerType><Section-Length><Section-Bytes>
+                short sectionType = sectionHeader.getShort();
+                int length = sectionHeader.getInt();
+
+                ByteBuffer data = ByteBuffer.allocate(length);
+                readSize = channel.read(data);
+
+                if (readSize != length) {
+                    throw new IOException("Invalid amount of data read for the 
body of a section");
+                }
+
+                if (this.metadataManagerTable.containsKey(sectionType)) {
+                    SnapshotAbleMetadataManager metadataManager = 
this.metadataManagerTable.get(sectionType);
+                    if (!metadataManager.loadMetadata(data.array())) {
+                        return false;
+                    }
+                    successLoadCnt ++;
+                    log.info("Load snapshot metadata for {} success!", 
MetadataManagerType.from(sectionType));
+                }
+            }
+            if (successLoadCnt != this.metadataManagerTable.size()) {
+                log.info("Failed to load snapshot metadata file totally, 
expected section nums:{}, success load nums:{}", 
this.metadataManagerTable.size(), successLoadCnt);
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/statemachine/StatemachineSnapshotFileGeneratorTest.java
 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/statemachine/StatemachineSnapshotFileGeneratorTest.java
new file mode 100644
index 000000000..1c4d7e478
--- /dev/null
+++ 
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/statemachine/StatemachineSnapshotFileGeneratorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.controller.impl.statemachine;
+
+import org.apache.rocketmq.controller.impl.manager.BrokerInfo;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import org.apache.rocketmq.controller.impl.manager.SyncStateInfo;
+import 
org.apache.rocketmq.controller.impl.statemachine.StatemachineSnapshotFileGenerator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StatemachineSnapshotFileGeneratorTest {
+
+    public String snapshotPath;
+    StatemachineSnapshotFileGenerator snapshotGenerator;
+    ReplicasInfoManager replicasInfoManager;
+
+
+    public void mockMetadata() {
+        BrokerInfo broker1 = new BrokerInfo("broker1", "cluster1");
+        broker1.setBrokerIdTable(new HashMap<String, Long>() {{
+                put("127.0.0.1:10000", 1L);
+                put("127.0.0.1:10001", 2L);
+            }});
+        broker1.setBrokerIdCount(2L);
+
+        SyncStateInfo syncStateInfo1 = new SyncStateInfo("cluster1", 
"broker1", "127.0.0.1:10000");
+        syncStateInfo1.setSyncStateSet(new HashSet<String>() {{
+                add("127.0.0.1:10000");
+                add("127.0.0.1:10001");
+            }});
+
+        this.replicasInfoManager.setReplicaInfoTable(new HashMap<String, 
BrokerInfo>() {{
+                put("broker1", broker1);
+            }});
+        this.replicasInfoManager.setSyncStateSetInfoTable(new HashMap<String, 
SyncStateInfo>() {{
+                put("broker1", syncStateInfo1);
+            }});
+    }
+
+    @Before
+    public void init() {
+        this.snapshotPath = Paths.get(File.separator + "tmp", 
"ControllerSnapshot").toString();
+        File file = new File(snapshotPath);
+        File parentFile = file.getParentFile();
+        if (parentFile != null) {
+            parentFile.mkdirs();
+        }
+        this.replicasInfoManager = new ReplicasInfoManager(null);
+        mockMetadata();
+        this.snapshotGenerator = new 
StatemachineSnapshotFileGenerator(Collections.singletonList(this.replicasInfoManager));
+    }
+
+
+    @Test
+    public void testGenerateAndLoadSnapshot() throws IOException {
+        this.snapshotGenerator.generateSnapshot(this.snapshotPath);
+
+        ReplicasInfoManager emptyManager = new ReplicasInfoManager(null);
+        StatemachineSnapshotFileGenerator generator1 = new 
StatemachineSnapshotFileGenerator(Collections.singletonList(emptyManager));
+        assertTrue(generator1.loadSnapshot(this.snapshotPath));
+
+        assertArrayEquals(emptyManager.encodeMetadata(), 
this.replicasInfoManager.encodeMetadata());
+    }
+}
\ No newline at end of file

Reply via email to