This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new fd454ad4e [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125) fd454ad4e is described below commit fd454ad4e3f2cd316e68bf4d1655ff99f1c08879 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Oct 10 17:43:01 2022 +0800 [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125) --- .../server/common/utils/WebParameterUtils.java | 23 ++++++++ .../master/metamanage/DefaultMetaDataService.java | 15 +++-- .../server/master/metamanage/MetaDataService.java | 9 +++ .../dao/entity/GroupConsumeCtrlEntity.java | 15 +++++ .../metastore/dao/entity/GroupResCtrlEntity.java | 22 ++++++++ .../metastore/dao/entity/TopicCtrlEntity.java | 64 ++++++++++++++++------ .../metastore/dao/mapper/MetaConfigMapper.java | 10 ++++ .../metastore/dao/mapper/TopicCtrlMapper.java | 10 ++++ .../metastore/impl/AbsMetaConfigMapperImpl.java | 58 +++++++------------- .../metastore/impl/AbsTopicCtrlMapperImpl.java | 27 +++++++++ .../impl/bdbimpl/BdbMetaConfigMapperImpl.java | 2 +- .../master/web/handler/WebOtherInfoHandler.java | 15 +++++ .../master/web/handler/WebTopicDeployHandler.java | 9 +-- 13 files changed, 213 insertions(+), 66 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java index dd3e3c948..005489780 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java @@ -18,6 +18,7 @@ package org.apache.inlong.tubemq.server.common.utils; import com.google.gson.Gson; +import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; @@ -725,6 +726,12 @@ public class WebParameterUtils { if (paramValue == null) { paramValue = req.getParameter(fieldDef.shortName); } + } else if (paramCntr instanceof JsonObject) { + JsonObject jsonObject = (JsonObject) paramCntr; + paramValue = jsonObject.get(fieldDef.name).getAsString(); + if (paramValue == null) { + paramValue = jsonObject.get(fieldDef.shortName).getAsString(); + } } else { throw new IllegalArgumentException("Unknown parameter type!"); } @@ -1562,6 +1569,22 @@ public class WebParameterUtils { return strManageStatus; } + public static int getBrokerManageStatusId(String strManageStatus) { + int manageStatus = TStatusConstants.STATUS_MANAGE_NOT_DEFINED; + if (strManageStatus.equals("draft")) { + manageStatus = TStatusConstants.STATUS_MANAGE_APPLY; + } else if (strManageStatus.equals("online")) { + manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE; + } else if (strManageStatus.equals("offline")) { + manageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE; + } else if (strManageStatus.equals("only-read")) { + manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE; + } else if (strManageStatus.equals("only-write")) { + manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ; + } + return manageStatus; + } + /** * translate broker manage status from int to tuple2 value * diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java index d26d32810..c26ac6118 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java @@ -834,10 +834,9 @@ public class DefaultMetaDataService implements MetaDataService { int maxMsgSize = defMsgSizeInB; TopicCtrlEntity topicCtrlEntity = metaConfigMapper.getTopicCtrlByTopicName(topicEntity.getTopicName()); - if (topicCtrlEntity != null) { - if (topicCtrlEntity.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED) { - maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB(); - } + if (topicCtrlEntity != null + && topicCtrlEntity.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED) { + maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB(); } if (maxMsgSize == defMsgSizeInB) { strBuff.append(TokenConstants.ATTR_SEP).append(" "); @@ -948,7 +947,8 @@ public class DefaultMetaDataService implements MetaDataService { ClusterSettingEntity clusterSettingEntity = metaConfigMapper.getClusterDefSetting(false); int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB(); TopicCtrlEntity topicCtrlEntity = metaConfigMapper.getTopicCtrlByTopicName(topicName); - if (topicCtrlEntity != null) { + if (topicCtrlEntity != null + && topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) { maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB(); } return maxMsgSizeInMB; @@ -960,6 +960,11 @@ public class DefaultMetaDataService implements MetaDataService { return metaConfigMapper.getTopicCtrlConf(topicNameSet, qryEntity); } + @Override + public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, + Set<String> topicNameSet) { + return metaConfigMapper.getMaxMsgSizeInBByTopics(defMaxMsgSizeInB, topicNameSet); + } // ////////////////////////////////////////////////////////////////////////////// @Override diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java index 6370cb98f..8f26f2507 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataService.java @@ -495,6 +495,15 @@ public interface MetaDataService extends Server { Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet, TopicCtrlEntity qryEntity); + /** + * get topic max message size configure info from store + * + * @param defMaxMsgSizeInB the default max message size in B + * @param topicNameSet need matched topic name set + * @return result, only read + */ + Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, Set<String> topicNameSet); + // ////////////////////////////////////////////////////////////////////////////// /** diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java index 983b276ac..5a73c57ec 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java @@ -216,6 +216,21 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable { return changed; } + /** + * fill empty fields with default value + * + * @return object + */ + public GroupConsumeCtrlEntity fillEmptyValues() { + if (this.filterEnable == EnableStatus.STATUS_UNDEFINE) { + this.filterEnable = EnableStatus.STATUS_DISABLE; + } + if (this.consumeEnable == EnableStatus.STATUS_UNDEFINE) { + this.consumeEnable = EnableStatus.STATUS_ENABLE; + } + return this; + } + /** * Check whether the specified query item value matches * Allowed query items: diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java index 61c75aed3..ac2bac075 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java @@ -107,6 +107,28 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable { return this; } + /** + * fill empty fields with default value + * + * @return object + */ + public GroupResCtrlEntity fillEmptyValues() { + if (TStringUtils.isBlank(this.flowCtrlInfo)) { + setFlowCtrlRule(0, TServerConstants.BLANK_FLOWCTRL_RULES); + } + if (this.resCheckStatus == EnableStatus.STATUS_UNDEFINE) { + this.resCheckStatus = EnableStatus.STATUS_DISABLE; + this.allowedBrokerClientRate = 0; + } + if (this.flowCtrlStatus == EnableStatus.STATUS_UNDEFINE) { + this.flowCtrlStatus = EnableStatus.STATUS_DISABLE; + } + if (this.qryPriorityId == TBaseConstants.META_VALUE_UNDEFINED) { + this.qryPriorityId = TServerConstants.QRY_PRIORITY_DEF_VALUE; + } + return this; + } + public GroupResCtrlEntity setGroupName(String groupName) { this.groupName = groupName; return this; diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java index 00702950a..2ad57a15b 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java @@ -54,9 +54,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable { this.topicName = topicName; this.topicNameId = topicNameId; this.authCtrlStatus = EnableStatus.STATUS_DISABLE; - this.maxMsgSizeInB = - SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(maxMsgSizeInMB); - this.maxMsgSizeInMB = maxMsgSizeInMB; + fillMaxMsgSizeInMB(maxMsgSizeInMB); } /** @@ -165,14 +163,7 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable { } // check and set modified field if (newMaxMsgSizeMB != TBaseConstants.META_VALUE_UNDEFINED) { - int tmpMaxMsgSizeInMB = - SettingValidUtils.validAndGetMsgSizeInMB(newMaxMsgSizeMB); - if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) { - changed = true; - this.maxMsgSizeInMB = tmpMaxMsgSizeInMB; - this.maxMsgSizeInB = - SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB); - } + changed = fillMaxMsgSizeInMB(newMaxMsgSizeMB); } // check and set authCtrlStatus info if (enableTopicAuth != null @@ -187,6 +178,27 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable { return changed; } + /** + * fill empty fields with default value + * + * @param defSetting current system default setting + * + * @return object + */ + public TopicCtrlEntity fillEmptyValues(ClusterSettingEntity defSetting) { + if (this.maxMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED) { + int tmpMaxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB; + if (defSetting != null) { + tmpMaxMsgSizeInMB = defSetting.getMaxMsgSizeInMB(); + } + fillMaxMsgSizeInMB(tmpMaxMsgSizeInMB); + } + if (this.authCtrlStatus == EnableStatus.STATUS_UNDEFINE) { + this.authCtrlStatus = EnableStatus.STATUS_DISABLE; + } + return this; + } + /** * Check whether the specified query item value matches * Allowed query items: @@ -242,14 +254,30 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable { return sBuilder; } - private void fillMaxMsgSizeInB(int maxMsgSizeInB) { - int tmpMaxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB; - if (maxMsgSizeInB > TBaseConstants.META_MB_UNIT_SIZE) { - tmpMaxMsgSizeInMB = SettingValidUtils.validAndGetMsgSizeBtoMB(maxMsgSizeInB); + private boolean fillMaxMsgSizeInB(int maxMsgSizeInB) { + boolean changed = false; + int tmpMaxMsgSizeInMB = + SettingValidUtils.validAndGetMsgSizeBtoMB(maxMsgSizeInB); + if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) { + changed = true; + this.maxMsgSizeInMB = tmpMaxMsgSizeInMB; + this.maxMsgSizeInB = + SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB); + } + return changed; + } + + private boolean fillMaxMsgSizeInMB(int maxMsgSizeInMB) { + boolean changed = false; + int tmpMaxMsgSizeInMB = + SettingValidUtils.validAndGetMsgSizeInMB(maxMsgSizeInMB); + if (this.maxMsgSizeInMB != tmpMaxMsgSizeInMB) { + changed = true; + this.maxMsgSizeInMB = tmpMaxMsgSizeInMB; + this.maxMsgSizeInB = + SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(tmpMaxMsgSizeInMB); } - this.maxMsgSizeInMB = tmpMaxMsgSizeInMB; - this.maxMsgSizeInB = - SettingValidUtils.validAndXfeMaxMsgSizeFromMBtoB(this.maxMsgSizeInMB); + return changed; } /** diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java index ccfc05fc4..f3cd23e7c 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/MetaConfigMapper.java @@ -238,6 +238,16 @@ public interface MetaConfigMapper extends KeepAliveService { Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet, TopicCtrlEntity qryEntity); + /** + * get topic max message size configure info from store + * + * @param defMaxMsgSizeInB the default max message size in B + * @param topicNameSet need matched topic name set + * @return result, only read + */ + Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, + Set<String> topicNameSet); + // //////////////////////////////////////////////////////////// /** diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java index 1df1397dc..bb0882b2a 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/mapper/TopicCtrlMapper.java @@ -82,4 +82,14 @@ public interface TopicCtrlMapper extends AbstractMapper { */ Map<String, TopicCtrlEntity> getTopicCtrlConf(Set<String> topicNameSet, TopicCtrlEntity qryEntity); + + /** + * get topic max message size configure info from store + * + * @param defMaxMsgSizeInB the default max message size in B + * @param topicNameSet need matched topic name set + * @return result, only read + */ + Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, + Set<String> topicNameSet); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java index 965706770..f54ceebdf 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsMetaConfigMapperImpl.java @@ -451,10 +451,9 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { ClusterSettingEntity clusterSettingEntity = getClusterDefSetting(false); int maxMsgSizeInMB = clusterSettingEntity.getMaxMsgSizeInMB(); TopicCtrlEntity topicCtrlEntity = topicCtrlMapper.getTopicCtrlConf(topicName); - if (topicCtrlEntity != null) { - if (topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) { - maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB(); - } + if (topicCtrlEntity != null + && topicCtrlEntity.getMaxMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED) { + maxMsgSizeInMB = topicCtrlEntity.getMaxMsgSizeInMB(); } return maxMsgSizeInMB; } @@ -470,6 +469,12 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { return topicCtrlMapper.getTopicCtrlConf(topicNameSet, qryEntity); } + @Override + public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, + Set<String> topicNameSet) { + return topicCtrlMapper.getMaxMsgSizeInBByTopics(defMaxMsgSizeInB, topicNameSet); + } + /** * Add if absent topic control configure info * @@ -481,13 +486,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { */ private boolean addTopicCtrlConfIfAbsent(BaseEntity opEntity, String topicName, StringBuilder strBuff, ProcessResult result) { - int maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB; - ClusterSettingEntity defSetting = getClusterDefSetting(false); - if (defSetting != null) { - maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB(); - } TopicCtrlEntity entity = new TopicCtrlEntity(opEntity, topicName, - TBaseConstants.META_VALUE_UNDEFINED, maxMsgSizeInMB); + TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED); return innAddOrUpdTopicCtrlConf(false, true, entity, strBuff, result); } @@ -524,26 +524,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { strBuff.delete(0, strBuff.length()); return result.isSuccess(); } - if (entity.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED - || entity.getMaxMsgSizeInMB() == TBaseConstants.META_VALUE_UNDEFINED) { - int topicId = entity.getTopicId(); - int maxMsgSizeInMB = entity.getMaxMsgSizeInMB(); - if (topicId == TBaseConstants.META_VALUE_UNDEFINED) { - topicId = TServerConstants.TOPIC_ID_DEF; - } - if (maxMsgSizeInMB == TBaseConstants.META_VALUE_UNDEFINED) { - maxMsgSizeInMB = TBaseConstants.META_MIN_ALLOWED_MESSAGE_SIZE_MB; - ClusterSettingEntity defSetting = getClusterDefSetting(false); - if (defSetting != null) { - maxMsgSizeInMB = defSetting.getMaxMsgSizeInMB(); - } - } - newEntity = new TopicCtrlEntity(entity, entity.getTopicName(), topicId, maxMsgSizeInMB); - topicCtrlMapper.addTopicCtrlConf(newEntity, strBuff, result); - - } else { - topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result); - } + entity.fillEmptyValues(getClusterDefSetting(false)); + topicCtrlMapper.addTopicCtrlConf(entity, strBuff, result); } else { if (isAddOpOrOnlyAdd) { if (chkConsistent) { @@ -609,7 +591,8 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { Integer topicLockId = null; Integer brokerLockId = null; // add topic control configure - addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result); + BaseEntity opEntity = new BaseEntity("systemSelf", new Date()); + addTopicCtrlConfIfAbsent(opEntity, entity.getTopicName(), strBuff, result); // execute add or update operation try { // lock topicName meta-lock @@ -653,11 +636,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { newProps.updModifyInfo(brokerEntity.getTopicProps()); newEntity = new TopicDeployEntity(entity, entity.getBrokerId(), entity.getTopicName(), newProps); - int topicId = entity.getTopicId(); - if (entity.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED) { - topicId = TServerConstants.TOPIC_ID_DEF; - } - newEntity.updModifyInfo(entity.getDataVerId(), topicId, + newEntity.updModifyInfo(entity.getDataVerId(), entity.getTopicId(), brokerEntity.getBrokerPort(), brokerEntity.getBrokerIp(), entity.getTopicStatus(), entity.getTopicProps()); topicDeployMapper.addTopicDeployConf(newEntity, strBuff, result); @@ -973,6 +952,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { strBuff.delete(0, strBuff.length()); return result.isSuccess(); } + entity.fillEmptyValues(); groupResCtrlMapper.addGroupResCtrlConf(entity, strBuff, result); } else { if (isAddOpOrOnlyAdd) { @@ -1116,9 +1096,10 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { GroupConsumeCtrlEntity newEntity; String printPrefix = "[addConsumeCtrlConf], "; // append topic control configure - addTopicCtrlConfIfAbsent(entity, entity.getTopicName(), strBuff, result); + BaseEntity opEntity = new BaseEntity("systemSelf", new Date()); + addTopicCtrlConfIfAbsent(opEntity, entity.getTopicName(), strBuff, result); // append group control configure - addGroupCtrlConfIfAbsent(entity, entity.getGroupName(), strBuff, result); + addGroupCtrlConfIfAbsent(opEntity, entity.getGroupName(), strBuff, result); // execute add or update operation try { // lock topicName meta-lock @@ -1138,6 +1119,7 @@ public abstract class AbsMetaConfigMapperImpl implements MetaConfigMapper { strBuff.delete(0, strBuff.length()); return result.isSuccess(); } + entity.fillEmptyValues(); consumeCtrlMapper.addGroupConsumeCtrlConf(entity, strBuff, result); } else { if (isAddOpOrOnlyAdd) { diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java index e16440d8e..3457ee27d 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicCtrlMapperImpl.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.inlong.tubemq.corebase.TBaseConstants; import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode; import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicCtrlEntity; @@ -147,6 +148,32 @@ public abstract class AbsTopicCtrlMapperImpl implements TopicCtrlMapper { return retEntityMap; } + @Override + public Map<String, Integer> getMaxMsgSizeInBByTopics(int defMaxMsgSizeInB, + Set<String> topicNameSet) { + Map<String, Integer> resultMap = new HashMap<>(); + if (topicNameSet == null || topicNameSet.isEmpty()) { + return resultMap; + } + TopicCtrlEntity ctrlEntity; + for (String topic : topicNameSet) { + if (topic == null) { + continue; + } + ctrlEntity = topicCtrlCache.get(topic); + if (ctrlEntity == null) { + continue; + } + if (ctrlEntity.getMaxMsgSizeInB() == TBaseConstants.META_VALUE_UNDEFINED + || ctrlEntity.getMaxMsgSizeInB() == defMaxMsgSizeInB) { + resultMap.put(topic, null); + } else { + resultMap.put(topic, ctrlEntity.getMaxMsgSizeInB()); + } + } + return resultMap; + } + /** * Clear cached data */ diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java index f1442d76c..938b2537c 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbMetaConfigMapperImpl.java @@ -279,7 +279,7 @@ public class BdbMetaConfigMapperImpl extends AbsMetaConfigMapperImpl { return clusterGroupVO; } // translate replication group info to ClusterGroupVO structure - Tuple2<Boolean, List<ClusterNodeVO>> transResult = + Tuple2<Boolean, List<ClusterNodeVO>> transResult = transReplicateNodes(replicationGroup); clusterGroupVO.setNodeData(transResult.getF1()); clusterGroupVO.setPrimaryNodeActive(isPrimaryNodeActive()); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java index 4418fa68e..9fcc779d7 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebOtherInfoHandler.java @@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest; import org.apache.inlong.tubemq.corebase.cluster.Partition; import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils; +import org.apache.inlong.tubemq.corebase.utils.Tuple2; import org.apache.inlong.tubemq.server.broker.stats.BrokerStatsType; import org.apache.inlong.tubemq.server.common.TubeServerVersion; import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef; @@ -271,6 +272,17 @@ public class WebOtherInfoHandler extends AbstractWebHandler { .append(",\"reqSourceCount\":").append(reqSourceCount) .append(",\"curSourceCount\":").append(curSourceCount) .append(",\"rebalanceCheckTime\":").append(rebalanceCheckTime); + } else if (consumeType == ConsumeType.CONSUME_CLIENT_REB) { + Tuple2<Long, List<String>> metaInfoTuple = consumeGroupInfo.getTopicMetaInfo(); + sBuffer.append(",\"topicMetaId\":").append(metaInfoTuple.getF0()) + .append(",\"metaDetails\":["); + for (String itemInfo : metaInfoTuple.getF1()) { + if (itemCnt++ > 0) { + sBuffer.append(","); + } + sBuffer.append("\"").append(itemInfo).append("\""); + } + sBuffer.append("]"); } sBuffer.append(",\"rebInfo\":{"); if (balanceStatus == -2) { @@ -515,6 +527,9 @@ public class WebOtherInfoHandler extends AbstractWebHandler { } strBuffer.append("]"); } + } else if (consumeType == ConsumeType.CONSUME_CLIENT_REB) { + strBuffer.append(",\"sourceCount\":").append(consumer.getSourceCount()) + .append(",\"nodeId\":").append(consumer.getNodeId()); } Map<String, Map<String, Partition>> topicSubMap = currentSubInfoMap.get(consumer.getConsumerId()); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java index b92a53324..344cb0f71 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java @@ -379,15 +379,16 @@ public class WebTopicDeployHandler extends AbstractWebHandler { defMetaDataService.getBrokerTopicConfigInfo(brokerIds); // build query result int dataCount = 0; + int topicCnt = 0; WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer); for (Map.Entry<Integer, Set<String>> entry : brokerTopicConfigMap.entrySet()) { if (dataCount++ > 0) { sBuffer.append(","); } - sBuffer.append("{\"brokerId\":").append(entry.getKey()).append(",\"topicName\":["); - int topicCnt = 0; - Set<String> topicSet = entry.getValue(); - for (String topic : topicSet) { + topicCnt = 0; + sBuffer.append("{\"brokerId\":").append(entry.getKey()) + .append(",\"topicName\":["); + for (String topic : entry.getValue()) { if (topicCnt++ > 0) { sBuffer.append(","); }