This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c960960f9 [INLONG-7106][TubeMQ] Add reset offset API by time (#7108)
c960960f9 is described below

commit c960960f923136a615b31e494965bb2aea847384
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Jan 3 10:14:34 2023 +0800

    [INLONG-7106][TubeMQ] Add reset offset API by time (#7108)
---
 .../tubemq/corebase/utils/ServiceStatusHolder.java |  24 +--
 .../server/broker/offset/OffsetHistoryInfo.java    |  67 ++++++
 .../server/broker/web/BrokerAdminServlet.java      | 226 +++++++++++++++++++++
 .../tubemq/server/common/fielddef/WebFieldDef.java |   8 +-
 .../nodemanage/nodebroker/BrokerAbnHolder.java     |  43 ++--
 5 files changed, 330 insertions(+), 38 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
index 378bf6de7..2c9ea1013 100644
--- 
a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
+++ 
b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
@@ -70,16 +70,10 @@ public class ServiceStatusHolder {
     }
 
     public static boolean addWriteIOErrCnt() {
-        long curTime = lastWriteStatsTime.get();
-        if (System.currentTimeMillis() - curTime > statsDurationMs) {
-            if (lastWriteStatsTime.compareAndSet(curTime, 
System.currentTimeMillis())) {
-                curWriteIOExcptCnt.getAndSet(0);
-                if (isPauseWrite.get()) {
-                    isPauseWrite.compareAndSet(true, false);
-                }
-            }
-        }
         if (curWriteIOExcptCnt.incrementAndGet() > allowedWriteIOExcptCnt) {
+            if (isPauseWrite.get()) {
+                return true;
+            }
             isPauseWrite.set(true);
             return true;
         }
@@ -95,16 +89,10 @@ public class ServiceStatusHolder {
     }
 
     public static boolean addReadIOErrCnt() {
-        long curTime = lastReadStatsTime.get();
-        if (System.currentTimeMillis() - curTime > statsDurationMs) {
-            if (lastReadStatsTime.compareAndSet(curTime, 
System.currentTimeMillis())) {
-                curReadIOExcptCnt.getAndSet(0);
-                if (isPauseRead.get()) {
-                    isPauseRead.compareAndSet(true, false);
-                }
-            }
-        }
         if (curReadIOExcptCnt.incrementAndGet() > allowedReadIOExcptCnt) {
+            if (isPauseRead.get()) {
+                return true;
+            }
             isPauseRead.set(true);
             return true;
         }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
index 3c6b90072..2d5f1e468 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
@@ -17,10 +17,17 @@
 
 package org.apache.inlong.tubemq.server.broker.offset;
 
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
 import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple3;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 
 /**
@@ -159,4 +166,64 @@ public class OffsetHistoryInfo {
         }
         strBuff.append("]}");
     }
+
+    /**
+     * Parse history offset record info
+     *
+     * @param jsonData  string offset information
+     * @param result    process result
+     */
+    public static boolean parseRecordInfo(String jsonData, ProcessResult 
result) {
+        JsonObject jsonObject = null;
+        try {
+            jsonObject = JsonParser.parseString(jsonData).getAsJsonObject();
+        } catch (Throwable e1) {
+            result.setFailResult(String.format(
+                    "Parse history offset value failure, reason is %s", 
e1.getMessage()));
+            return result.isSuccess();
+        }
+        if (jsonObject == null) {
+            result.setFailResult("Parse error, history offset value must be 
valid json format!");
+            return result.isSuccess();
+        }
+        if (!jsonObject.has("ver")) {
+            result.setFailResult("FIELD ver is required in history offset 
value!");
+            return result.isSuccess();
+        }
+        int verValue = jsonObject.get("ver").getAsInt();
+        if (verValue < TServerConstants.OFFSET_HISTORY_RECORD_SHORT_VERSION) {
+            result.setFailResult("Only support v2 or next version in history 
offset value!");
+            return result.isSuccess();
+        }
+        if (!jsonObject.has("records")) {
+            result.setFailResult("FIELD records is required in history offset 
value!");
+            return result.isSuccess();
+        }
+        List<Tuple3<String, Integer, Long>> resetOffsets = new ArrayList<>();
+        JsonArray records = jsonObject.get("records").getAsJsonArray();
+        for (int i = 0; i < records.size(); i++) {
+            JsonObject itemInfo = records.get(i).getAsJsonObject();
+            if (itemInfo == null) {
+                continue;
+            }
+            String topicName = itemInfo.get("topic").getAsString();
+            JsonArray offsets = itemInfo.get("offsets").getAsJsonArray();
+            for (int j = 0; j < offsets.size(); j++) {
+                JsonObject storeInfo = offsets.get(j).getAsJsonObject();
+                if (storeInfo == null) {
+                    continue;
+                }
+                JsonArray partInfos = storeInfo.get("parts").getAsJsonArray();
+                for (int k = 0; k < partInfos.size(); k++) {
+                    JsonObject partItem = partInfos.get(k).getAsJsonObject();
+                    int partId = partItem.get("partId").getAsInt();
+                    long offsetVal = partItem.get("iCfm").getAsLong();
+                    resetOffsets.add(new Tuple3<>(topicName, partId, 
offsetVal));
+                }
+            }
+        }
+        result.setSuccResult(resetOffsets);
+        return true;
+    }
+
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
index 3e723c541..2a73b0e67 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -44,6 +44,7 @@ import 
org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
 import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
 import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType;
 import org.apache.inlong.tubemq.server.broker.stats.BrokerSrvStatsHolder;
@@ -118,6 +119,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
         // set or update group's offset info
         innRegisterWebMethod("admin_set_offset",
                 "adminSetGroupOffSet", false);
+        // set or update group's offset info by history offset time
+        innRegisterWebMethod("admin_set_offset_by_time",
+                "adminSetGroupOffSetByTime", false);
         // remove group's offset info
         innRegisterWebMethod("admin_rmv_offset",
                 "adminRemoveGroupOffSet", false);
@@ -1001,6 +1005,228 @@ public class BrokerAdminServlet extends 
AbstractWebHandler {
         sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
     }
 
+    /**
+     * Add or Modify consumer group offset by group's history offset time.
+     *
+     * @param req      request
+     * @param sBuffer  process result
+     */
+    public void adminSetGroupOffSetByTime(HttpServletRequest req, 
StringBuilder sBuffer) {
+        ProcessResult result = new ProcessResult();
+        // get group name
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.GROUPNAME, true, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return;
+        }
+        final String groupName = (String) result.getRetData();
+        // get modify user
+        if (!WebParameterUtils.getStringParamValue(req,
+                WebFieldDef.MODIFYUSER, true, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return;
+        }
+        final String modifyUser = (String) result.getRetData();
+        // get the left timestamp to be set
+        if (!WebParameterUtils.getDateParameter(req,
+                WebFieldDef.RECORDTIME, true, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return;
+        }
+        final Date tmpDataTime1 = (Date) result.getRetData();
+        final long recStartTime = tmpDataTime1.getTime();
+        // get the right timestamp to be set
+        if (!WebParameterUtils.getDateParameter(req,
+                WebFieldDef.ENDTIME, false, null, sBuffer, result)) {
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return;
+        }
+        final Date tmpDataTime2 = (Date) result.getRetData();
+        long recEndTime = recStartTime + 5 * 60 * 1000L;
+        if (tmpDataTime2 != null) {
+            recEndTime = tmpDataTime2.getTime();
+            if (recEndTime < recStartTime) {
+                WebParameterUtils.buildFailResult(sBuffer,
+                        String.format("Parameter %s value must >= %s",
+                                WebFieldDef.ENDTIME.name, 
WebFieldDef.RECORDTIME.name));
+                return;
+            }
+        }
+        // check storage status
+        if (ServiceStatusHolder.isReadServiceStop()) {
+            WebParameterUtils.buildFailResult(sBuffer,
+                    "Read StoreService temporary unavailable!");
+            return;
+        }
+        // get offset history storage
+        MessageStore msgStore;
+        MessageStoreManager storeManager = broker.getStoreManager();
+        try {
+            msgStore = storeManager.getOrCreateMessageStore(
+                    TServerConstants.OFFSET_HISTORY_NAME, 0);
+        } catch (Throwable ex) {
+            WebParameterUtils.buildFailResult(sBuffer,
+                    String.format("Get offset history store fail, reason=%s", 
ex.getMessage()));
+            return;
+        }
+        // get the history offset in the time range
+        // read history data
+        int totalCnt = 0;
+        // locate start offset
+        int maxRetryCnt = 50;
+        long requestOffset = msgStore.getStartOffsetByTimeStamp(recStartTime);
+        if (!getStoredGroupHisOffsets(groupName, msgStore,
+                requestOffset, maxRetryCnt, recStartTime, recEndTime, result)) 
{
+            WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+            return;
+        }
+        List<Tuple3<String, Integer, Long>> resetOffsets =
+                (List<Tuple3<String, Integer, Long>>) result.getRetData();
+        if (resetOffsets.isEmpty()) {
+            WebParameterUtils.buildFailResult(sBuffer, "Not found history 
offset value!");
+            return;
+        }
+        Set<String> groupNameSet = new HashSet<>();
+        groupNameSet.add(groupName);
+        Set<String> topicSet = new HashSet<>();
+        // before
+        Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> 
befGroupOffsetMap =
+                getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, groupNameSet, 
topicSet);
+        Map<String, Map<Integer, GroupOffsetInfo>> befTopicPartMap =
+                befGroupOffsetMap.get(groupName);
+        // change
+        broker.getOffsetManager().modifyGroupOffset(groupNameSet, 
resetOffsets, modifyUser);
+        // after
+        Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> 
aftGroupOffsetMap =
+                getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, groupNameSet, 
topicSet);
+        Map<String, Map<Integer, GroupOffsetInfo>> aftTopicPartMap =
+                aftGroupOffsetMap.get(groupName);
+        // build result
+        WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+        int topicCnt = 0;
+        
sBuffer.append("{\"groupName\":\"").append(groupName).append("\",\"before\":[");
+        for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 : 
befTopicPartMap.entrySet()) {
+            if (topicCnt++ > 0) {
+                sBuffer.append(",");
+            }
+            Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue();
+            sBuffer.append("{\"topicName\":\"").append(entry1.getKey())
+                    .append("\",\"offsets\":[");
+            int partCnt = 0;
+            for (Map.Entry<Integer, GroupOffsetInfo> entry2 : 
partOffMap.entrySet()) {
+                if (partCnt++ > 0) {
+                    sBuffer.append(",");
+                }
+                GroupOffsetInfo offsetInfo = entry2.getValue();
+                offsetInfo.buildOffsetInfo(sBuffer);
+            }
+            sBuffer.append("],\"partCount\":").append(partCnt).append("}");
+        }
+        sBuffer.append("],\"after\":[");
+        topicCnt = 0;
+        for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 : 
aftTopicPartMap.entrySet()) {
+            if (topicCnt++ > 0) {
+                sBuffer.append(",");
+            }
+            Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue();
+            sBuffer.append("{\"topicName\":\"").append(entry1.getKey())
+                    .append("\",\"offsets\":[");
+            int partCnt = 0;
+            for (Map.Entry<Integer, GroupOffsetInfo> entry2 : 
partOffMap.entrySet()) {
+                if (partCnt++ > 0) {
+                    sBuffer.append(",");
+                }
+                GroupOffsetInfo offsetInfo = entry2.getValue();
+                offsetInfo.buildOffsetInfo(sBuffer);
+            }
+            sBuffer.append("],\"partCount\":").append(partCnt).append("}");
+        }
+        sBuffer.append("]}");
+        WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt);
+    }
+
+    /**
+     * Query group's offset records stored in broker.
+     *
+     * @param groupName      group name
+     * @param msgStore       history offset store
+     * @param requestOffset  request offset
+     * @param maxRetryCnt    max query turns
+     * @param recStartTime   record start timestamp
+     * @param recEndTime     record end timestamp
+     * @param result         query result
+     * @return  whether success
+     */
+    private boolean getStoredGroupHisOffsets(String groupName, MessageStore 
msgStore,
+            long requestOffset, int maxRetryCnt,
+            long recStartTime, long recEndTime,
+            ProcessResult result) {
+        int msgTypeCode;
+        int partitionId;
+        Throwable qryThrow;
+        GetMessageResult getMessageResult;
+        // locate partitionId and filter-item
+        msgTypeCode = groupName.hashCode();
+        partitionId = Math.abs(msgTypeCode) % 
TServerConstants.OFFSET_HISTORY_NUMPARTS;
+        Set<String> filterCodes = new HashSet<>();
+        filterCodes.add(groupName);
+        // build consumer node information
+        ConsumerNodeInfo consumerNodeInfo = new 
ConsumerNodeInfo(broker.getStoreManager(),
+                groupName, "offsetConsumer", filterCodes, "", 
System.currentTimeMillis(), "", "");
+        // query records from storage
+        int qryRetryCount = 0;
+        long itemInitOffset = requestOffset;
+        int maxTransferSize = broker.getStoreManager().getMaxMsgTransferSize();
+        do {
+            qryThrow = null;
+            try {
+                getMessageResult = msgStore.getMessages(303, itemInitOffset,
+                        partitionId, consumerNodeInfo, 
TServerConstants.OFFSET_HISTORY_NAME,
+                        maxTransferSize, recStartTime);
+            } catch (Throwable e2) {
+                qryThrow = e2;
+                continue;
+            }
+            // check query result
+            if (getMessageResult.transferedMessageList == null
+                    || getMessageResult.transferedMessageList.isEmpty()) {
+                itemInitOffset += getMessageResult.lastReadOffset;
+                continue;
+            }
+            // build record to return result
+            List<Message> messageList = DataConverterUtil.convertMessage(
+                    TServerConstants.OFFSET_HISTORY_NAME, 
getMessageResult.transferedMessageList);
+            for (Message message : messageList) {
+                if (message == null) {
+                    continue;
+                }
+                long recAppTime = DateTimeConvertUtils.yyyyMMddHHmm2ms(
+                        message.getAttrValue(TokenConstants.TOKEN_MSG_TIME));
+                if (recAppTime > recEndTime) {
+                    result.setFailResult(String.format(
+                            "Over required endTime range, current time is %s",
+                            
message.getAttrValue(TokenConstants.TOKEN_MSG_TIME)));
+                    return result.isSuccess();
+                }
+                if (!groupName.equals(message.getAttrValue(
+                        TServerConstants.TOKEN_OFFSET_GROUP))) {
+                    continue;
+                }
+                return OffsetHistoryInfo.parseRecordInfo(
+                        StringUtils.newStringUtf8(message.getData()), result);
+            }
+            itemInitOffset += getMessageResult.lastReadOffset;
+        } while (++qryRetryCount < maxRetryCnt);
+        // check query result
+        if (qryThrow == null) {
+            result.setFailResult("Not found record in required search range");
+        } else {
+            result.setFailResult(String.format(
+                    "Query record failure, reason is :%s", 
qryThrow.getMessage()));
+        }
+        return result.isSuccess();
+    }
+
     /**
      * Clone consume group offset, clone A group's offset to other group.
      *
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
index f40fc0e81..4a8d140de 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java
@@ -259,7 +259,13 @@ public enum WebFieldDef {
     MAXRETRYCOUNT(93, "maxRetryCnt", "mrc", WebFieldType.INT,
             "Max retry query turns", RegexDef.TMP_NUMBER),
     STATSTYPE(94, "statsType", "st", WebFieldType.STRING,
-            "Statistics type", TServerConstants.META_MAX_STATSTYPE_LENGTH);
+            "Statistics type", TServerConstants.META_MAX_STATSTYPE_LENGTH),
+
+    RESETVALUE(95, "resetValue", "rv", WebFieldType.BOOLEAN,
+            "Reset value, default is false"),
+    ENDTIME(96, "endTime", "et", WebFieldType.STRING,
+            "The end record time of the historical offset of the consume 
group",
+            DateTimeConvertUtils.LENGTH_YYYYMMDDHHMMSS);
 
     public final int id;
     public final String name;
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
index 3a31afae0..2f9f0abfd 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java
@@ -69,7 +69,9 @@ public class BrokerAbnHolder {
             int reportReadStatus,
             int reportWriteStatus) {
         StringBuilder sBuffer = new StringBuilder(512);
-        if (reportReadStatus == 0 && reportWriteStatus == 0) {
+        ManageStatus reqMngStatus =
+                getManageStatus(reportWriteStatus, reportReadStatus);
+        if (reqMngStatus == ManageStatus.STATUS_MANAGE_ONLINE) {
             BrokerAbnInfo brokerAbnInfo = brokerAbnormalMap.get(brokerId);
             if (brokerAbnInfo != null) {
                 if (brokerForbiddenMap.get(brokerId) == null) {
@@ -89,13 +91,6 @@ public class BrokerAbnHolder {
         if (curEntry == null) {
             return;
         }
-        ManageStatus reqMngStatus =
-                getManageStatus(reportWriteStatus, reportReadStatus);
-        if ((curEntry.getManageStatus() == reqMngStatus)
-                || ((reqMngStatus == ManageStatus.STATUS_MANAGE_OFFLINE)
-                        && (curEntry.getManageStatus().getCode() < 
ManageStatus.STATUS_MANAGE_ONLINE.getCode()))) {
-            return;
-        }
         BrokerAbnInfo brokerAbnInfo = brokerAbnormalMap.get(brokerId);
         if (brokerAbnInfo == null) {
             if (brokerAbnormalMap.putIfAbsent(brokerId,
@@ -110,20 +105,30 @@ public class BrokerAbnHolder {
         } else {
             brokerAbnInfo.updateLastRepStatus(reportReadStatus, 
reportWriteStatus);
         }
+        ManageStatus curStatus = curEntry.getManageStatus();
+        if (curStatus == reqMngStatus
+                || curStatus.getCode() < 
ManageStatus.STATUS_MANAGE_ONLINE.getCode()
+                || curStatus.getCode() >= 
ManageStatus.STATUS_MANAGE_OFFLINE.getCode()) {
+            return;
+        }
+        ManageStatus newStatus = reqMngStatus;
+        if (reqMngStatus == ManageStatus.STATUS_MANAGE_ONLINE_NOT_WRITE
+                && curStatus == ManageStatus.STATUS_MANAGE_ONLINE_NOT_READ) {
+            newStatus = ManageStatus.STATUS_MANAGE_OFFLINE;
+        }
         BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.get(brokerId);
         if (brokerFbdInfo == null) {
-            BrokerFbdInfo tmpBrokerFbdInfo =
-                    new BrokerFbdInfo(brokerId, curEntry.getManageStatus(),
-                            reqMngStatus, System.currentTimeMillis());
+            BrokerFbdInfo tmpFbdInfo = new BrokerFbdInfo(brokerId,
+                    curStatus, newStatus, System.currentTimeMillis());
             if (reportReadStatus > 0 || reportWriteStatus > 0) {
-                if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
-                    if (brokerForbiddenMap.putIfAbsent(brokerId, 
tmpBrokerFbdInfo) == null) {
+                if (updateCurManageStatus(brokerId, newStatus, sBuffer)) {
+                    if (brokerForbiddenMap.putIfAbsent(brokerId, tmpFbdInfo) 
== null) {
                         brokerForbiddenCount.incrementAndGet();
                         MasterSrvStatsHolder.incBrokerForbiddenCnt();
                         logger.warn(sBuffer
                                 .append("[Broker AutoForbidden] master add 
missing forbidden broker, ")
                                 .append(brokerId).append("'s manage status to 
")
-                                
.append(reqMngStatus.getDescription()).toString());
+                                
.append(newStatus.getDescription()).toString());
                         sBuffer.delete(0, sBuffer.length());
                     }
                 }
@@ -132,8 +137,8 @@ public class BrokerAbnHolder {
                     brokerForbiddenCount.decrementAndGet();
                     return;
                 }
-                if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
-                    if (brokerForbiddenMap.putIfAbsent(brokerId, 
tmpBrokerFbdInfo) != null) {
+                if (updateCurManageStatus(brokerId, newStatus, sBuffer)) {
+                    if (brokerForbiddenMap.putIfAbsent(brokerId, tmpFbdInfo) 
!= null) {
                         brokerForbiddenCount.decrementAndGet();
                         return;
                     }
@@ -141,15 +146,15 @@ public class BrokerAbnHolder {
                     logger.warn(sBuffer
                             .append("[Broker AutoForbidden] master auto 
forbidden broker, ")
                             .append(brokerId).append("'s manage status to ")
-                            .append(reqMngStatus.getDescription()).toString());
+                            .append(newStatus.getDescription()).toString());
                     sBuffer.delete(0, sBuffer.length());
                 } else {
                     brokerForbiddenCount.decrementAndGet();
                 }
             }
         } else {
-            if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) {
-                brokerFbdInfo.updateInfo(curEntry.getManageStatus(), 
reqMngStatus);
+            if (updateCurManageStatus(brokerId, newStatus, sBuffer)) {
+                brokerFbdInfo.updateInfo(curStatus, newStatus);
             }
         }
     }

Reply via email to