RongtongJin commented on code in PR #7301:
URL: https://github.com/apache/rocketmq/pull/7301#discussion_r1396960356


##########
common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java:
##########
@@ -67,6 +68,13 @@ public class ControllerConfig {
      */
     private long scanInactiveMasterInterval = 5 * 1000;
 
+    private int jRaftElectionTimeoutMs = 1000;
+    private int jRaftSnapshotIntervalSecs = 3600;
+    private String jRaftGroupId = "jRaft-Controller";
+    private String jRaftServerId = "localhost:9880";
+    private String jRaftInitConf = 
"localhost:9880,localhost:9881,localhost:9882";
+    private String jRaftControllerRPCAddr = 
"localhost:9770,localhost:9771,localhost:9772";

Review Comment:
   Moving these configurations to a jraftConfig class will make it clearer.



##########
controller/src/main/java/org/apache/rocketmq/controller/impl/JRaftController.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.RaftGroupService;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.entity.NodeId;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.entity.Task;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
+import org.apache.rocketmq.controller.impl.closure.ControllerClosure;
+import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
+import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
+import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
+import org.apache.rocketmq.controller.impl.task.GetSyncStateDataRequest;
+import 
org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class JRaftController implements Controller {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final RaftGroupService raftGroupService;
+    private final Node node;
+    private final JRaftControllerStateMachine stateMachine;
+    private final ControllerConfig controllerConfig;
+    private final List<BrokerLifecycleListener> brokerLifecycleListeners;
+    private final Map<PeerId/* jRaft peerId */, String/* Controller RPC Server 
Addr */> peerIdToAddr;
+    private final NettyRemotingServer remotingServer;
+
+    public JRaftController(ControllerConfig controllerConfig,
+        final ChannelEventListener channelEventListener) throws IOException {
+        this.controllerConfig = controllerConfig;
+        this.brokerLifecycleListeners = new ArrayList<>();
+
+        final NodeOptions nodeOptions = new NodeOptions();
+        
nodeOptions.setElectionTimeoutMs(controllerConfig.getjRaftElectionTimeoutMs());
+        
nodeOptions.setSnapshotIntervalSecs(controllerConfig.getjRaftSnapshotIntervalSecs());
+        final PeerId serverId = new PeerId();
+        if (!serverId.parse(controllerConfig.getjRaftServerId())) {
+            throw new IllegalArgumentException("Fail to parse serverId:" + 
controllerConfig.getjRaftServerId());
+        }
+        final Configuration initConf = new Configuration();
+        if (!initConf.parse(controllerConfig.getjRaftInitConf())) {
+            throw new IllegalArgumentException("Fail to parse initConf:" + 
controllerConfig.getjRaftInitConf());
+        }
+        nodeOptions.setInitialConf(initConf);
+
+        FileUtils.forceMkdir(new 
File(controllerConfig.getControllerStorePath()));
+        nodeOptions.setLogUri(controllerConfig.getControllerStorePath() + 
File.separator + "log");
+        nodeOptions.setRaftMetaUri(controllerConfig.getControllerStorePath() + 
File.separator + "raft_meta");
+        nodeOptions.setSnapshotUri(controllerConfig.getControllerStorePath() + 
File.separator + "snapshot");
+
+        this.stateMachine = new JRaftControllerStateMachine(controllerConfig, 
new NodeId(controllerConfig.getjRaftGroupId(), serverId));
+        this.stateMachine.registerOnLeaderStart(this::onLeaderStart);
+        this.stateMachine.registerOnLeaderStop(this::onLeaderStop);
+        nodeOptions.setFsm(this.stateMachine);
+
+        this.raftGroupService = new 
RaftGroupService(controllerConfig.getjRaftGroupId(), serverId, nodeOptions);
+
+        this.peerIdToAddr = new HashMap<>();
+        initPeerIdMap();
+
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        
nettyServerConfig.setListenPort(Integer.parseInt(this.peerIdToAddr.get(serverId).split(":")[1]));
+        remotingServer = new NettyRemotingServer(nettyServerConfig, 
channelEventListener);
+
+        this.node = this.raftGroupService.start();

Review Comment:
   Placing this line in startup function would be better. Or is there any 
reason why it must be placed in initialization?



##########
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/RaftReplicasInfoManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.manager;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.heartbeat.BrokerIdentityInfo;
+import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
+import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
+import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelResponse;
+import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
+import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerResponse;
+import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
+import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoResponse;
+import 
org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
+import 
org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventResponse;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class RaftReplicasInfoManager extends ReplicasInfoManager {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final Map<BrokerIdentityInfo/* brokerIdentity*/, BrokerLiveInfo> 
brokerLiveTable = new ConcurrentHashMap<>(256);
+
+    public RaftReplicasInfoManager(ControllerConfig controllerConfig) {
+        super(controllerConfig);
+    }
+
+    public ControllerResult<GetBrokerLiveInfoResponse> getBrokerLiveInfo(final 
GetBrokerLiveInfoRequest request) {
+        BrokerIdentityInfo brokerIdentityInfo = request.getBrokerIdentity();
+        ControllerResult<GetBrokerLiveInfoResponse> result = new 
ControllerResult<>(new GetBrokerLiveInfoResponse());
+        Map<BrokerIdentityInfo/* brokerIdentity*/, BrokerLiveInfo> 
resBrokerLiveTable = new HashMap<>();
+        if (brokerIdentityInfo == null || brokerIdentityInfo.isEmpty()) {
+            resBrokerLiveTable.putAll(this.brokerLiveTable);
+        } else {
+            if (brokerLiveTable.containsKey(brokerIdentityInfo)) {
+                resBrokerLiveTable.put(brokerIdentityInfo, 
brokerLiveTable.get(brokerIdentityInfo));
+            } else {
+                log.warn("GetBrokerLiveInfo failed, brokerIdentityInfo: {} not 
exist", brokerIdentityInfo);
+                
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_LIVE_INFO_NOT_EXISTS, 
"brokerIdentityInfo not exist");
+            }
+        }
+        try {
+            result.setBody(JSON.toJSONBytes(resBrokerLiveTable));
+        } catch (Throwable e) {
+            log.error("json serialize resBrokerLiveTable {} error", 
resBrokerLiveTable, e);
+            result.setCodeAndRemark(ResponseCode.SYSTEM_ERROR, "serialize 
error");
+        }
+
+        return result;
+    }
+
+    public ControllerResult<RaftBrokerHeartBeatEventResponse> 
onBrokerHeartBeat(
+        RaftBrokerHeartBeatEventRequest request) {
+        BrokerIdentityInfo brokerIdentityInfo = 
request.getBrokerIdentityInfo();
+        BrokerLiveInfo brokerLiveInfo = request.getBrokerLiveInfo();
+        ControllerResult<RaftBrokerHeartBeatEventResponse> result = new 
ControllerResult<>(new RaftBrokerHeartBeatEventResponse());
+        BrokerLiveInfo prev = 
brokerLiveTable.computeIfAbsent(brokerIdentityInfo, identityInfo -> {
+            log.info("new broker registered, brokerIdentityInfo: {}", 
identityInfo);
+            return brokerLiveInfo;
+        });
+        prev.setLastUpdateTimestamp(brokerLiveInfo.getLastUpdateTimestamp());
+        
prev.setHeartbeatTimeoutMillis(brokerLiveInfo.getHeartbeatTimeoutMillis());
+        prev.setElectionPriority(brokerLiveInfo.getElectionPriority());
+        if (brokerLiveInfo.getEpoch() > prev.getEpoch() || 
brokerLiveInfo.getEpoch() == prev.getEpoch() && brokerLiveInfo.getMaxOffset() > 
prev.getMaxOffset()) {
+            prev.setEpoch(brokerLiveInfo.getEpoch());
+            prev.setMaxOffset(brokerLiveInfo.getMaxOffset());
+            prev.setConfirmOffset(brokerLiveInfo.getConfirmOffset());
+        }
+        return result;
+    }
+
+    public ControllerResult<BrokerCloseChannelResponse> 
onBrokerCloseChannel(BrokerCloseChannelRequest request) {
+        BrokerIdentityInfo brokerIdentityInfo = 
request.getBrokerIdentityInfo();
+        ControllerResult<BrokerCloseChannelResponse> result = new 
ControllerResult<>(new BrokerCloseChannelResponse());
+        if (brokerIdentityInfo == null || brokerIdentityInfo.isEmpty()) {
+            log.warn("onBrokerCloseChannel failed, brokerIdentityInfo is 
null");
+        } else {
+            brokerLiveTable.remove(brokerIdentityInfo);
+            log.info("onBrokerCloseChannel success, brokerIdentityInfo: {}", 
brokerIdentityInfo);
+        }
+        return result;
+    }
+
+    public ControllerResult<CheckNotActiveBrokerResponse> 
checkNotActiveBroker(CheckNotActiveBrokerRequest request) {
+        List<BrokerIdentityInfo> notActiveBrokerIdentityInfoList = new 
ArrayList<>();
+        long checkTime = request.getCheckTimeMillis();
+        final Iterator<Map.Entry<BrokerIdentityInfo, BrokerLiveInfo>> iterator 
= this.brokerLiveTable.entrySet().iterator();
+        while (iterator.hasNext()) {
+            final Map.Entry<BrokerIdentityInfo, BrokerLiveInfo> next = 
iterator.next();
+            long last = next.getValue().getLastUpdateTimestamp();
+            long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
+            if (checkTime - last > timeoutMillis) {
+                notActiveBrokerIdentityInfoList.add(next.getKey());
+                iterator.remove();
+                log.warn("Broker expired, brokerInfo {}, expired {}ms", 
next.getKey(), timeoutMillis);
+            }
+        }
+        List<String> needReElectBrokerNames = scanNeedReelectBrokerSets(new 
BrokerValidPredicate() {
+            @Override
+            public boolean check(String clusterName, String brokerName, Long 
brokerId) {
+                return !isBrokerActive(clusterName, brokerName, brokerId, 
checkTime);
+            }
+        });
+        Set<String> alreadyReportedBrokerName = 
notActiveBrokerIdentityInfoList.stream()
+            .map(BrokerIdentityInfo::getBrokerName)
+            .collect(Collectors.toSet());
+        notActiveBrokerIdentityInfoList.addAll(needReElectBrokerNames.stream()
+            .filter(brokerName -> 
!alreadyReportedBrokerName.contains(brokerName))
+            .map(brokerName -> new BrokerIdentityInfo(null, brokerName, null))
+            .collect(Collectors.toList()));

Review Comment:
   这里有两个问题,为了方便我用中文描述:
   1. 
checkNotActiveBroker和electMaster是两种日志,一般先执行checkNotActiveBroker,再执行electMaster,如果checkNotActiveBroker执行成功了,brokerLiveTable就移除了这个broker,但electMaster日志没有应用成功(假设该日志controller切换后被截断了)没有选新主,那么下次checkNotActiveBroker就不会再去扫描到这个broker不活跃,electMaster行为就丢失了,也就是broker无法正常切换。
   2. 
还有一个是如果controller全部下线,再重新上线,预期我们应该不影响到broker正常运行,但当前的设计可能在心跳前先调用checkNotActiveBroker触发broker
 重新选举,这个需要避免。



##########
controller/src/main/resources/rmq.controller.logback.xml:
##########
@@ -41,7 +44,9 @@
         
<file>${user.home}${file.separator}logs${file.separator}rocketmqlogs${file.separator}dledger.log</file>
         <append>true</append>
         <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-            
<fileNamePattern>${user.home}${file.separator}logs${file.separator}rocketmqlogs${file.separator}otherdays${file.separator}dledger.%i.log.gz</fileNamePattern>
+            <fileNamePattern>
+                
${user.home}${file.separator}logs${file.separator}rocketmqlogs${file.separator}otherdays${file.separator}dledger.%i.log.gz
+            </fileNamePattern>

Review Comment:
   
能不能像DLedgerController那样(dledger内部日志和controller业务日志分离),jraft内部日志放在jraft.log,controller的业务日志放在controller.log,现在开启jraftController后日志都混在一起(jraft日志既在controller.log,也有在controller_default.log),不是很方便进行排查。



##########
controller/src/main/java/org/apache/rocketmq/controller/impl/manager/RaftReplicasInfoManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.manager;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.heartbeat.BrokerIdentityInfo;
+import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
+import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
+import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelResponse;
+import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
+import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerResponse;
+import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
+import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoResponse;
+import 
org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
+import 
org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventResponse;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class RaftReplicasInfoManager extends ReplicasInfoManager {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final Map<BrokerIdentityInfo/* brokerIdentity*/, BrokerLiveInfo> 
brokerLiveTable = new ConcurrentHashMap<>(256);
+
+    public RaftReplicasInfoManager(ControllerConfig controllerConfig) {
+        super(controllerConfig);
+    }
+
+    public ControllerResult<GetBrokerLiveInfoResponse> getBrokerLiveInfo(final 
GetBrokerLiveInfoRequest request) {
+        BrokerIdentityInfo brokerIdentityInfo = request.getBrokerIdentity();
+        ControllerResult<GetBrokerLiveInfoResponse> result = new 
ControllerResult<>(new GetBrokerLiveInfoResponse());
+        Map<BrokerIdentityInfo/* brokerIdentity*/, BrokerLiveInfo> 
resBrokerLiveTable = new HashMap<>();
+        if (brokerIdentityInfo == null || brokerIdentityInfo.isEmpty()) {
+            resBrokerLiveTable.putAll(this.brokerLiveTable);
+        } else {
+            if (brokerLiveTable.containsKey(brokerIdentityInfo)) {
+                resBrokerLiveTable.put(brokerIdentityInfo, 
brokerLiveTable.get(brokerIdentityInfo));
+            } else {
+                log.warn("GetBrokerLiveInfo failed, brokerIdentityInfo: {} not 
exist", brokerIdentityInfo);
+                
result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_LIVE_INFO_NOT_EXISTS, 
"brokerIdentityInfo not exist");
+            }
+        }
+        try {
+            result.setBody(JSON.toJSONBytes(resBrokerLiveTable));
+        } catch (Throwable e) {
+            log.error("json serialize resBrokerLiveTable {} error", 
resBrokerLiveTable, e);
+            result.setCodeAndRemark(ResponseCode.SYSTEM_ERROR, "serialize 
error");
+        }
+
+        return result;
+    }
+
+    public ControllerResult<RaftBrokerHeartBeatEventResponse> 
onBrokerHeartBeat(
+        RaftBrokerHeartBeatEventRequest request) {
+        BrokerIdentityInfo brokerIdentityInfo = 
request.getBrokerIdentityInfo();
+        BrokerLiveInfo brokerLiveInfo = request.getBrokerLiveInfo();
+        ControllerResult<RaftBrokerHeartBeatEventResponse> result = new 
ControllerResult<>(new RaftBrokerHeartBeatEventResponse());
+        BrokerLiveInfo prev = 
brokerLiveTable.computeIfAbsent(brokerIdentityInfo, identityInfo -> {
+            log.info("new broker registered, brokerIdentityInfo: {}", 
identityInfo);
+            return brokerLiveInfo;
+        });
+        prev.setLastUpdateTimestamp(brokerLiveInfo.getLastUpdateTimestamp());
+        
prev.setHeartbeatTimeoutMillis(brokerLiveInfo.getHeartbeatTimeoutMillis());
+        prev.setElectionPriority(brokerLiveInfo.getElectionPriority());
+        if (brokerLiveInfo.getEpoch() > prev.getEpoch() || 
brokerLiveInfo.getEpoch() == prev.getEpoch() && brokerLiveInfo.getMaxOffset() > 
prev.getMaxOffset()) {
+            prev.setEpoch(brokerLiveInfo.getEpoch());
+            prev.setMaxOffset(brokerLiveInfo.getMaxOffset());
+            prev.setConfirmOffset(brokerLiveInfo.getConfirmOffset());
+        }
+        return result;
+    }
+
+    public ControllerResult<BrokerCloseChannelResponse> 
onBrokerCloseChannel(BrokerCloseChannelRequest request) {
+        BrokerIdentityInfo brokerIdentityInfo = 
request.getBrokerIdentityInfo();
+        ControllerResult<BrokerCloseChannelResponse> result = new 
ControllerResult<>(new BrokerCloseChannelResponse());
+        if (brokerIdentityInfo == null || brokerIdentityInfo.isEmpty()) {
+            log.warn("onBrokerCloseChannel failed, brokerIdentityInfo is 
null");
+        } else {
+            brokerLiveTable.remove(brokerIdentityInfo);
+            log.info("onBrokerCloseChannel success, brokerIdentityInfo: {}", 
brokerIdentityInfo);
+        }
+        return result;
+    }
+
+    public ControllerResult<CheckNotActiveBrokerResponse> 
checkNotActiveBroker(CheckNotActiveBrokerRequest request) {
+        List<BrokerIdentityInfo> notActiveBrokerIdentityInfoList = new 
ArrayList<>();
+        long checkTime = request.getCheckTimeMillis();
+        final Iterator<Map.Entry<BrokerIdentityInfo, BrokerLiveInfo>> iterator 
= this.brokerLiveTable.entrySet().iterator();
+        while (iterator.hasNext()) {
+            final Map.Entry<BrokerIdentityInfo, BrokerLiveInfo> next = 
iterator.next();
+            long last = next.getValue().getLastUpdateTimestamp();
+            long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();
+            if (checkTime - last > timeoutMillis) {
+                notActiveBrokerIdentityInfoList.add(next.getKey());
+                iterator.remove();
+                log.warn("Broker expired, brokerInfo {}, expired {}ms", 
next.getKey(), timeoutMillis);
+            }
+        }
+        List<String> needReElectBrokerNames = scanNeedReelectBrokerSets(new 
BrokerValidPredicate() {
+            @Override
+            public boolean check(String clusterName, String brokerName, Long 
brokerId) {
+                return !isBrokerActive(clusterName, brokerName, brokerId, 
checkTime);
+            }
+        });
+        Set<String> alreadyReportedBrokerName = 
notActiveBrokerIdentityInfoList.stream()
+            .map(BrokerIdentityInfo::getBrokerName)
+            .collect(Collectors.toSet());
+        notActiveBrokerIdentityInfoList.addAll(needReElectBrokerNames.stream()
+            .filter(brokerName -> 
!alreadyReportedBrokerName.contains(brokerName))
+            .map(brokerName -> new BrokerIdentityInfo(null, brokerName, null))
+            .collect(Collectors.toList()));

Review Comment:
   这里有两个问题,为了方便我用中文描述:
   1. 
checkNotActiveBroker和electMaster是两种日志,一般先执行checkNotActiveBroker,再执行electMaster,如果checkNotActiveBroker执行成功了,brokerLiveTable就移除了这个broker,但electMaster日志没有应用成功(假设该日志controller切换后被截断了)没有选新主,那么下次checkNotActiveBroker就不会再去扫描到这个broker不活跃,electMaster行为就丢失了,也就是broker无法正常切换。
   2. 
还有一个是如果controller全部下线,再重新上线,预期我们应该不影响到broker正常运行,但当前的设计可能在心跳前先调用checkNotActiveBroker触发broker
 重新选举,这个需要避免。



##########
controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.heartbeat;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import io.netty.channel.Channel;
+import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
+import org.apache.rocketmq.controller.impl.JRaftController;
+import org.apache.rocketmq.controller.impl.task.BrokerCloseChannelRequest;
+import org.apache.rocketmq.controller.impl.task.CheckNotActiveBrokerRequest;
+import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoRequest;
+import org.apache.rocketmq.controller.impl.task.GetBrokerLiveInfoResponse;
+import 
org.apache.rocketmq.controller.impl.task.RaftBrokerHeartBeatEventRequest;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class RaftBrokerHeartBeatManager implements BrokerHeartbeatManager {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private JRaftController controller;
+    private final List<BrokerLifecycleListener> brokerLifecycleListeners = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledService;
+    private final ExecutorService executor;
+    private final ControllerConfig controllerConfig;
+
+    private final Map<Channel, BrokerIdentityInfo> 
brokerChannelIdentityInfoMap = new HashMap<>();
+
+    public RaftBrokerHeartBeatManager(ControllerConfig controllerConfig) {
+        this.scheduledService = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RaftBrokerHeartbeatManager_scheduledService_"));
+        this.executor = Executors.newFixedThreadPool(2, new 
ThreadFactoryImpl("RaftBrokerHeartbeatManager_executorService_"));
+        this.controllerConfig = controllerConfig;
+    }
+
+    public void setController(JRaftController controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void start() {
+        this.scheduledService.scheduleAtFixedRate(this::scanNotActiveBroker, 
2000, this.controllerConfig.getScanNotActiveBrokerInterval(), 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void shutdown() {
+        this.scheduledService.shutdown();
+        this.executor.shutdown();
+    }
+
+    @Override
+    public void registerBrokerLifecycleListener(BrokerLifecycleListener 
listener) {
+        brokerLifecycleListeners.add(listener);
+    }
+
+    @Override
+    public void onBrokerHeartbeat(String clusterName, String brokerName, 
String brokerAddr, Long brokerId,
+        Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, 
Long confirmOffset,
+        Integer electionPriority) {
+        BrokerIdentityInfo brokerIdentityInfo = new 
BrokerIdentityInfo(clusterName, brokerName, brokerId);
+        int realEpoch = Optional.ofNullable(epoch).orElse(-1);
+        long realBrokerId = Optional.ofNullable(brokerId).orElse(-1L);
+        long realMaxOffset = Optional.ofNullable(maxOffset).orElse(-1L);
+        long realConfirmOffset = 
Optional.ofNullable(confirmOffset).orElse(-1L);
+        long realTimeoutMillis = 
Optional.ofNullable(timeoutMillis).orElse(DEFAULT_BROKER_CHANNEL_EXPIRED_TIME);
+        int realElectionPriority = 
Optional.ofNullable(electionPriority).orElse(Integer.MAX_VALUE);
+        BrokerLiveInfo liveInfo = new BrokerLiveInfo(brokerName,
+            brokerAddr,
+            realBrokerId,
+            System.currentTimeMillis(),
+            realTimeoutMillis,
+            null,
+            realEpoch,
+            realMaxOffset,
+            realElectionPriority,
+            realConfirmOffset);
+        log.info("broker {} heart beat", brokerIdentityInfo);
+        RaftBrokerHeartBeatEventRequest requestHeader = new 
RaftBrokerHeartBeatEventRequest(brokerIdentityInfo, liveInfo);
+        CompletableFuture<RemotingCommand> future = 
controller.onBrokerHeartBeat(requestHeader);
+        try {
+            RemotingCommand remotingCommand = future.get(5, 
java.util.concurrent.TimeUnit.SECONDS);
+            if (remotingCommand.getCode() != ResponseCode.SUCCESS) {
+                throw new RuntimeException("on broker heartbeat return invalid 
code, code: " + remotingCommand.getCode());
+            }

Review Comment:
   It would be better to handle the return code "CONTROLLER_NOT_LEADER" here. 
Broker will send heartbeat to all controllers and folllower controllers will 
return "CONTROLLER_NOT_LEADER". 



##########
controller/src/main/resources/rmq.controller.logback.xml:
##########
@@ -41,7 +44,9 @@
         
<file>${user.home}${file.separator}logs${file.separator}rocketmqlogs${file.separator}dledger.log</file>
         <append>true</append>
         <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-            
<fileNamePattern>${user.home}${file.separator}logs${file.separator}rocketmqlogs${file.separator}otherdays${file.separator}dledger.%i.log.gz</fileNamePattern>
+            <fileNamePattern>
+                
${user.home}${file.separator}logs${file.separator}rocketmqlogs${file.separator}otherdays${file.separator}dledger.%i.log.gz
+            </fileNamePattern>

Review Comment:
   
能不能像DLedgerController那样(dledger内部日志和controller业务日志分离),jraft内部日志放在jraft.log,controller的业务日志放在controller.log,现在开启jraftController后日志都混在一起(jraft日志既在controller.log,也有在controller_default.log),不是很方便进行排查。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to