This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c960960f9 [INLONG-7106][TubeMQ] Add reset offset API by time (#7108) c960960f9 is described below commit c960960f923136a615b31e494965bb2aea847384 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Jan 3 10:14:34 2023 +0800 [INLONG-7106][TubeMQ] Add reset offset API by time (#7108) --- .../tubemq/corebase/utils/ServiceStatusHolder.java | 24 +-- .../server/broker/offset/OffsetHistoryInfo.java | 67 ++++++ .../server/broker/web/BrokerAdminServlet.java | 226 +++++++++++++++++++++ .../tubemq/server/common/fielddef/WebFieldDef.java | 8 +- .../nodemanage/nodebroker/BrokerAbnHolder.java | 43 ++-- 5 files changed, 330 insertions(+), 38 deletions(-) diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java index 378bf6de7..2c9ea1013 100644 --- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java +++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java @@ -70,16 +70,10 @@ public class ServiceStatusHolder { } public static boolean addWriteIOErrCnt() { - long curTime = lastWriteStatsTime.get(); - if (System.currentTimeMillis() - curTime > statsDurationMs) { - if (lastWriteStatsTime.compareAndSet(curTime, System.currentTimeMillis())) { - curWriteIOExcptCnt.getAndSet(0); - if (isPauseWrite.get()) { - isPauseWrite.compareAndSet(true, false); - } - } - } if (curWriteIOExcptCnt.incrementAndGet() > allowedWriteIOExcptCnt) { + if (isPauseWrite.get()) { + return true; + } isPauseWrite.set(true); return true; } @@ -95,16 +89,10 @@ public class ServiceStatusHolder { } public static boolean addReadIOErrCnt() { - long curTime = lastReadStatsTime.get(); - if (System.currentTimeMillis() - curTime > statsDurationMs) { - if (lastReadStatsTime.compareAndSet(curTime, System.currentTimeMillis())) { - curReadIOExcptCnt.getAndSet(0); - if (isPauseRead.get()) { - isPauseRead.compareAndSet(true, false); - } - } - } if (curReadIOExcptCnt.incrementAndGet() > allowedReadIOExcptCnt) { + if (isPauseRead.get()) { + return true; + } isPauseRead.set(true); return true; } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java index 3c6b90072..2d5f1e468 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java @@ -17,10 +17,17 @@ package org.apache.inlong.tubemq.server.broker.offset; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.inlong.tubemq.corebase.TBaseConstants; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils; +import org.apache.inlong.tubemq.corebase.utils.Tuple3; import org.apache.inlong.tubemq.server.common.TServerConstants; /** @@ -159,4 +166,64 @@ public class OffsetHistoryInfo { } strBuff.append("]}"); } + + /** + * Parse history offset record info + * + * @param jsonData string offset information + * @param result process result + */ + public static boolean parseRecordInfo(String jsonData, ProcessResult result) { + JsonObject jsonObject = null; + try { + jsonObject = JsonParser.parseString(jsonData).getAsJsonObject(); + } catch (Throwable e1) { + result.setFailResult(String.format( + "Parse history offset value failure, reason is %s", e1.getMessage())); + return result.isSuccess(); + } + if (jsonObject == null) { + result.setFailResult("Parse error, history offset value must be valid json format!"); + return result.isSuccess(); + } + if (!jsonObject.has("ver")) { + result.setFailResult("FIELD ver is required in history offset value!"); + return result.isSuccess(); + } + int verValue = jsonObject.get("ver").getAsInt(); + if (verValue < TServerConstants.OFFSET_HISTORY_RECORD_SHORT_VERSION) { + result.setFailResult("Only support v2 or next version in history offset value!"); + return result.isSuccess(); + } + if (!jsonObject.has("records")) { + result.setFailResult("FIELD records is required in history offset value!"); + return result.isSuccess(); + } + List<Tuple3<String, Integer, Long>> resetOffsets = new ArrayList<>(); + JsonArray records = jsonObject.get("records").getAsJsonArray(); + for (int i = 0; i < records.size(); i++) { + JsonObject itemInfo = records.get(i).getAsJsonObject(); + if (itemInfo == null) { + continue; + } + String topicName = itemInfo.get("topic").getAsString(); + JsonArray offsets = itemInfo.get("offsets").getAsJsonArray(); + for (int j = 0; j < offsets.size(); j++) { + JsonObject storeInfo = offsets.get(j).getAsJsonObject(); + if (storeInfo == null) { + continue; + } + JsonArray partInfos = storeInfo.get("parts").getAsJsonArray(); + for (int k = 0; k < partInfos.size(); k++) { + JsonObject partItem = partInfos.get(k).getAsJsonObject(); + int partId = partItem.get("partId").getAsInt(); + long offsetVal = partItem.get("iCfm").getAsLong(); + resetOffsets.add(new Tuple3<>(topicName, partId, offsetVal)); + } + } + } + result.setSuccResult(resetOffsets); + return true; + } + } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java index 3e723c541..2a73b0e67 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/web/BrokerAdminServlet.java @@ -44,6 +44,7 @@ import org.apache.inlong.tubemq.server.broker.msgstore.MessageStore; import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager; import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult; import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo; +import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo; import org.apache.inlong.tubemq.server.broker.offset.OffsetService; import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType; import org.apache.inlong.tubemq.server.broker.stats.BrokerSrvStatsHolder; @@ -118,6 +119,9 @@ public class BrokerAdminServlet extends AbstractWebHandler { // set or update group's offset info innRegisterWebMethod("admin_set_offset", "adminSetGroupOffSet", false); + // set or update group's offset info by history offset time + innRegisterWebMethod("admin_set_offset_by_time", + "adminSetGroupOffSetByTime", false); // remove group's offset info innRegisterWebMethod("admin_rmv_offset", "adminRemoveGroupOffSet", false); @@ -1001,6 +1005,228 @@ public class BrokerAdminServlet extends AbstractWebHandler { sBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); } + /** + * Add or Modify consumer group offset by group's history offset time. + * + * @param req request + * @param sBuffer process result + */ + public void adminSetGroupOffSetByTime(HttpServletRequest req, StringBuilder sBuffer) { + ProcessResult result = new ProcessResult(); + // get group name + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.GROUPNAME, true, null, sBuffer, result)) { + WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg()); + return; + } + final String groupName = (String) result.getRetData(); + // get modify user + if (!WebParameterUtils.getStringParamValue(req, + WebFieldDef.MODIFYUSER, true, null, sBuffer, result)) { + WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg()); + return; + } + final String modifyUser = (String) result.getRetData(); + // get the left timestamp to be set + if (!WebParameterUtils.getDateParameter(req, + WebFieldDef.RECORDTIME, true, null, sBuffer, result)) { + WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg()); + return; + } + final Date tmpDataTime1 = (Date) result.getRetData(); + final long recStartTime = tmpDataTime1.getTime(); + // get the right timestamp to be set + if (!WebParameterUtils.getDateParameter(req, + WebFieldDef.ENDTIME, false, null, sBuffer, result)) { + WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg()); + return; + } + final Date tmpDataTime2 = (Date) result.getRetData(); + long recEndTime = recStartTime + 5 * 60 * 1000L; + if (tmpDataTime2 != null) { + recEndTime = tmpDataTime2.getTime(); + if (recEndTime < recStartTime) { + WebParameterUtils.buildFailResult(sBuffer, + String.format("Parameter %s value must >= %s", + WebFieldDef.ENDTIME.name, WebFieldDef.RECORDTIME.name)); + return; + } + } + // check storage status + if (ServiceStatusHolder.isReadServiceStop()) { + WebParameterUtils.buildFailResult(sBuffer, + "Read StoreService temporary unavailable!"); + return; + } + // get offset history storage + MessageStore msgStore; + MessageStoreManager storeManager = broker.getStoreManager(); + try { + msgStore = storeManager.getOrCreateMessageStore( + TServerConstants.OFFSET_HISTORY_NAME, 0); + } catch (Throwable ex) { + WebParameterUtils.buildFailResult(sBuffer, + String.format("Get offset history store fail, reason=%s", ex.getMessage())); + return; + } + // get the history offset in the time range + // read history data + int totalCnt = 0; + // locate start offset + int maxRetryCnt = 50; + long requestOffset = msgStore.getStartOffsetByTimeStamp(recStartTime); + if (!getStoredGroupHisOffsets(groupName, msgStore, + requestOffset, maxRetryCnt, recStartTime, recEndTime, result)) { + WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg()); + return; + } + List<Tuple3<String, Integer, Long>> resetOffsets = + (List<Tuple3<String, Integer, Long>>) result.getRetData(); + if (resetOffsets.isEmpty()) { + WebParameterUtils.buildFailResult(sBuffer, "Not found history offset value!"); + return; + } + Set<String> groupNameSet = new HashSet<>(); + groupNameSet.add(groupName); + Set<String> topicSet = new HashSet<>(); + // before + Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> befGroupOffsetMap = + getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, groupNameSet, topicSet); + Map<String, Map<Integer, GroupOffsetInfo>> befTopicPartMap = + befGroupOffsetMap.get(groupName); + // change + broker.getOffsetManager().modifyGroupOffset(groupNameSet, resetOffsets, modifyUser); + // after + Map<String, Map<String, Map<Integer, GroupOffsetInfo>>> aftGroupOffsetMap = + getGroupOffsetInfo(WebFieldDef.COMPSGROUPNAME, groupNameSet, topicSet); + Map<String, Map<Integer, GroupOffsetInfo>> aftTopicPartMap = + aftGroupOffsetMap.get(groupName); + // build result + WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer); + int topicCnt = 0; + sBuffer.append("{\"groupName\":\"").append(groupName).append("\",\"before\":["); + for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 : befTopicPartMap.entrySet()) { + if (topicCnt++ > 0) { + sBuffer.append(","); + } + Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue(); + sBuffer.append("{\"topicName\":\"").append(entry1.getKey()) + .append("\",\"offsets\":["); + int partCnt = 0; + for (Map.Entry<Integer, GroupOffsetInfo> entry2 : partOffMap.entrySet()) { + if (partCnt++ > 0) { + sBuffer.append(","); + } + GroupOffsetInfo offsetInfo = entry2.getValue(); + offsetInfo.buildOffsetInfo(sBuffer); + } + sBuffer.append("],\"partCount\":").append(partCnt).append("}"); + } + sBuffer.append("],\"after\":["); + topicCnt = 0; + for (Map.Entry<String, Map<Integer, GroupOffsetInfo>> entry1 : aftTopicPartMap.entrySet()) { + if (topicCnt++ > 0) { + sBuffer.append(","); + } + Map<Integer, GroupOffsetInfo> partOffMap = entry1.getValue(); + sBuffer.append("{\"topicName\":\"").append(entry1.getKey()) + .append("\",\"offsets\":["); + int partCnt = 0; + for (Map.Entry<Integer, GroupOffsetInfo> entry2 : partOffMap.entrySet()) { + if (partCnt++ > 0) { + sBuffer.append(","); + } + GroupOffsetInfo offsetInfo = entry2.getValue(); + offsetInfo.buildOffsetInfo(sBuffer); + } + sBuffer.append("],\"partCount\":").append(partCnt).append("}"); + } + sBuffer.append("]}"); + WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, totalCnt); + } + + /** + * Query group's offset records stored in broker. + * + * @param groupName group name + * @param msgStore history offset store + * @param requestOffset request offset + * @param maxRetryCnt max query turns + * @param recStartTime record start timestamp + * @param recEndTime record end timestamp + * @param result query result + * @return whether success + */ + private boolean getStoredGroupHisOffsets(String groupName, MessageStore msgStore, + long requestOffset, int maxRetryCnt, + long recStartTime, long recEndTime, + ProcessResult result) { + int msgTypeCode; + int partitionId; + Throwable qryThrow; + GetMessageResult getMessageResult; + // locate partitionId and filter-item + msgTypeCode = groupName.hashCode(); + partitionId = Math.abs(msgTypeCode) % TServerConstants.OFFSET_HISTORY_NUMPARTS; + Set<String> filterCodes = new HashSet<>(); + filterCodes.add(groupName); + // build consumer node information + ConsumerNodeInfo consumerNodeInfo = new ConsumerNodeInfo(broker.getStoreManager(), + groupName, "offsetConsumer", filterCodes, "", System.currentTimeMillis(), "", ""); + // query records from storage + int qryRetryCount = 0; + long itemInitOffset = requestOffset; + int maxTransferSize = broker.getStoreManager().getMaxMsgTransferSize(); + do { + qryThrow = null; + try { + getMessageResult = msgStore.getMessages(303, itemInitOffset, + partitionId, consumerNodeInfo, TServerConstants.OFFSET_HISTORY_NAME, + maxTransferSize, recStartTime); + } catch (Throwable e2) { + qryThrow = e2; + continue; + } + // check query result + if (getMessageResult.transferedMessageList == null + || getMessageResult.transferedMessageList.isEmpty()) { + itemInitOffset += getMessageResult.lastReadOffset; + continue; + } + // build record to return result + List<Message> messageList = DataConverterUtil.convertMessage( + TServerConstants.OFFSET_HISTORY_NAME, getMessageResult.transferedMessageList); + for (Message message : messageList) { + if (message == null) { + continue; + } + long recAppTime = DateTimeConvertUtils.yyyyMMddHHmm2ms( + message.getAttrValue(TokenConstants.TOKEN_MSG_TIME)); + if (recAppTime > recEndTime) { + result.setFailResult(String.format( + "Over required endTime range, current time is %s", + message.getAttrValue(TokenConstants.TOKEN_MSG_TIME))); + return result.isSuccess(); + } + if (!groupName.equals(message.getAttrValue( + TServerConstants.TOKEN_OFFSET_GROUP))) { + continue; + } + return OffsetHistoryInfo.parseRecordInfo( + StringUtils.newStringUtf8(message.getData()), result); + } + itemInitOffset += getMessageResult.lastReadOffset; + } while (++qryRetryCount < maxRetryCnt); + // check query result + if (qryThrow == null) { + result.setFailResult("Not found record in required search range"); + } else { + result.setFailResult(String.format( + "Query record failure, reason is :%s", qryThrow.getMessage())); + } + return result.isSuccess(); + } + /** * Clone consume group offset, clone A group's offset to other group. * diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java index f40fc0e81..4a8d140de 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/fielddef/WebFieldDef.java @@ -259,7 +259,13 @@ public enum WebFieldDef { MAXRETRYCOUNT(93, "maxRetryCnt", "mrc", WebFieldType.INT, "Max retry query turns", RegexDef.TMP_NUMBER), STATSTYPE(94, "statsType", "st", WebFieldType.STRING, - "Statistics type", TServerConstants.META_MAX_STATSTYPE_LENGTH); + "Statistics type", TServerConstants.META_MAX_STATSTYPE_LENGTH), + + RESETVALUE(95, "resetValue", "rv", WebFieldType.BOOLEAN, + "Reset value, default is false"), + ENDTIME(96, "endTime", "et", WebFieldType.STRING, + "The end record time of the historical offset of the consume group", + DateTimeConvertUtils.LENGTH_YYYYMMDDHHMMSS); public final int id; public final String name; diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java index 3a31afae0..2f9f0abfd 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/BrokerAbnHolder.java @@ -69,7 +69,9 @@ public class BrokerAbnHolder { int reportReadStatus, int reportWriteStatus) { StringBuilder sBuffer = new StringBuilder(512); - if (reportReadStatus == 0 && reportWriteStatus == 0) { + ManageStatus reqMngStatus = + getManageStatus(reportWriteStatus, reportReadStatus); + if (reqMngStatus == ManageStatus.STATUS_MANAGE_ONLINE) { BrokerAbnInfo brokerAbnInfo = brokerAbnormalMap.get(brokerId); if (brokerAbnInfo != null) { if (brokerForbiddenMap.get(brokerId) == null) { @@ -89,13 +91,6 @@ public class BrokerAbnHolder { if (curEntry == null) { return; } - ManageStatus reqMngStatus = - getManageStatus(reportWriteStatus, reportReadStatus); - if ((curEntry.getManageStatus() == reqMngStatus) - || ((reqMngStatus == ManageStatus.STATUS_MANAGE_OFFLINE) - && (curEntry.getManageStatus().getCode() < ManageStatus.STATUS_MANAGE_ONLINE.getCode()))) { - return; - } BrokerAbnInfo brokerAbnInfo = brokerAbnormalMap.get(brokerId); if (brokerAbnInfo == null) { if (brokerAbnormalMap.putIfAbsent(brokerId, @@ -110,20 +105,30 @@ public class BrokerAbnHolder { } else { brokerAbnInfo.updateLastRepStatus(reportReadStatus, reportWriteStatus); } + ManageStatus curStatus = curEntry.getManageStatus(); + if (curStatus == reqMngStatus + || curStatus.getCode() < ManageStatus.STATUS_MANAGE_ONLINE.getCode() + || curStatus.getCode() >= ManageStatus.STATUS_MANAGE_OFFLINE.getCode()) { + return; + } + ManageStatus newStatus = reqMngStatus; + if (reqMngStatus == ManageStatus.STATUS_MANAGE_ONLINE_NOT_WRITE + && curStatus == ManageStatus.STATUS_MANAGE_ONLINE_NOT_READ) { + newStatus = ManageStatus.STATUS_MANAGE_OFFLINE; + } BrokerFbdInfo brokerFbdInfo = brokerForbiddenMap.get(brokerId); if (brokerFbdInfo == null) { - BrokerFbdInfo tmpBrokerFbdInfo = - new BrokerFbdInfo(brokerId, curEntry.getManageStatus(), - reqMngStatus, System.currentTimeMillis()); + BrokerFbdInfo tmpFbdInfo = new BrokerFbdInfo(brokerId, + curStatus, newStatus, System.currentTimeMillis()); if (reportReadStatus > 0 || reportWriteStatus > 0) { - if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) { - if (brokerForbiddenMap.putIfAbsent(brokerId, tmpBrokerFbdInfo) == null) { + if (updateCurManageStatus(brokerId, newStatus, sBuffer)) { + if (brokerForbiddenMap.putIfAbsent(brokerId, tmpFbdInfo) == null) { brokerForbiddenCount.incrementAndGet(); MasterSrvStatsHolder.incBrokerForbiddenCnt(); logger.warn(sBuffer .append("[Broker AutoForbidden] master add missing forbidden broker, ") .append(brokerId).append("'s manage status to ") - .append(reqMngStatus.getDescription()).toString()); + .append(newStatus.getDescription()).toString()); sBuffer.delete(0, sBuffer.length()); } } @@ -132,8 +137,8 @@ public class BrokerAbnHolder { brokerForbiddenCount.decrementAndGet(); return; } - if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) { - if (brokerForbiddenMap.putIfAbsent(brokerId, tmpBrokerFbdInfo) != null) { + if (updateCurManageStatus(brokerId, newStatus, sBuffer)) { + if (brokerForbiddenMap.putIfAbsent(brokerId, tmpFbdInfo) != null) { brokerForbiddenCount.decrementAndGet(); return; } @@ -141,15 +146,15 @@ public class BrokerAbnHolder { logger.warn(sBuffer .append("[Broker AutoForbidden] master auto forbidden broker, ") .append(brokerId).append("'s manage status to ") - .append(reqMngStatus.getDescription()).toString()); + .append(newStatus.getDescription()).toString()); sBuffer.delete(0, sBuffer.length()); } else { brokerForbiddenCount.decrementAndGet(); } } } else { - if (updateCurManageStatus(brokerId, reqMngStatus, sBuffer)) { - brokerFbdInfo.updateInfo(curEntry.getManageStatus(), reqMngStatus); + if (updateCurManageStatus(brokerId, newStatus, sBuffer)) { + brokerFbdInfo.updateInfo(curStatus, newStatus); } } }