This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f31a9d6da85f0251005115c610c698d35d93fd71 Author: 卢春亮 <946240...@qq.com> AuthorDate: Thu Nov 10 10:00:09 2022 +0800 [INLONG-6491][Manager] Support getting backup info in getAllConfig (#6492) --- .../repository/DataProxyConfigRepository.java | 340 +++++++++++---------- 1 file changed, 171 insertions(+), 169 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java index d49eb9a34..55a5e7c85 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java @@ -19,10 +19,13 @@ package org.apache.inlong.manager.service.repository; import com.google.common.base.Splitter; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; + import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.common.constant.ClusterSwitch; import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject; import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject; import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster; @@ -34,20 +37,23 @@ import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; -import org.apache.inlong.manager.pojo.dataproxy.CacheCluster; -import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId; -import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId; -import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster; -import org.apache.inlong.manager.pojo.sink.SinkPageRequest; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; +import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.mapper.ClusterSetMapper; import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; +import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; +import org.apache.inlong.manager.pojo.dataproxy.CacheCluster; +import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId; +import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId; +import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster; +import org.apache.inlong.manager.pojo.sink.SinkPageRequest; +import org.apache.inlong.manager.service.core.SortConfigLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -55,7 +61,6 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; -import javax.annotation.PostConstruct; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Date; @@ -63,10 +68,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PostConstruct; + /** * DataProxyConfigRepository */ @@ -76,6 +84,7 @@ public class DataProxyConfigRepository implements IRepository { public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class); + public static final String KEY_NAMESPACE = "namespace"; public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag"; public static final String KEY_BACKUP_TOPIC = "backup_topic"; public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName"; @@ -104,6 +113,8 @@ public class DataProxyConfigRepository implements IRepository { private InlongGroupEntityMapper inlongGroupMapper; @Autowired private StreamSinkEntityMapper streamSinkMapper; + @Autowired + private SortConfigLoader sortConfigLoader; @PostConstruct public void initialize() { @@ -195,47 +206,44 @@ public class DataProxyConfigRepository implements IRepository { @Override @Transactional(rollbackFor = Exception.class) public void reload() { - LOGGER.info("start to reload config."); - Map<String, ProxyClusterObject> proxyClusterMap = this.reloadProxyCluster(); + LOGGER.info("start to reload config:" + this.getClass().getSimpleName()); + // reload proxy cluster + Map<String, DataProxyCluster> proxyClusterMap = new HashMap<>(); + this.reloadProxyCluster(proxyClusterMap); if (proxyClusterMap.size() == 0) { return; } - Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = this.reloadCacheCluster(); - Map<String, List<InLongIdObject>> inlongIdMap = this.reloadInlongId(); - // mapping inlongIdMap - for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) { - String clusterTag = entry.getValue().getSetName(); - List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag); - if (inlongIds != null) { - entry.getValue().setInlongIds(inlongIds); - } - } + // reoload cache cluster + this.reloadCacheCluster(proxyClusterMap); + // reload inlong group id and inlong stream id + this.reloadInlongId(proxyClusterMap); // generateClusterJson - this.generateClusterJson(proxyClusterMap, cacheClusterMap); + this.generateClusterJson(proxyClusterMap); - LOGGER.info("end to reload config."); + LOGGER.info("end to reload config:" + this.getClass().getSimpleName()); } /** * reloadProxyCluster */ - private Map<String, ProxyClusterObject> reloadProxyCluster() { - Map<String, ProxyClusterObject> proxyClusterMap = new HashMap<>(); + private void reloadProxyCluster(Map<String, DataProxyCluster> proxyClusterMap) { for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) { ProxyClusterObject obj = new ProxyClusterObject(); obj.setName(proxyCluster.getClusterName()); obj.setSetName(proxyCluster.getClusterTag()); obj.setZone(proxyCluster.getExtTag()); - proxyClusterMap.put(obj.getName(), obj); + DataProxyCluster clusterObj = new DataProxyCluster(); + clusterObj.setProxyCluster(obj); + proxyClusterMap.put(obj.getName(), clusterObj); } - return proxyClusterMap; } /** * reloadCacheCluster */ - private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() { + private void reloadCacheCluster(Map<String, DataProxyCluster> proxyClusterMap) { + // reload cache cluster Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<>(); for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) { if (StringUtils.isEmpty(cacheCluster.getExtTag())) { @@ -248,126 +256,160 @@ public class DataProxyConfigRepository implements IRepository { .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster); } } - return cacheClusterMap; + // mark cache cluster to proxy cluster + Map<String, Map<String, String>> tagCache = new HashMap<>(); + for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) { + DataProxyCluster clusterObj = entry.getValue(); + ProxyClusterObject proxyObj = clusterObj.getProxyCluster(); + // cache + String clusterTag = proxyObj.getSetName(); + String extTag = proxyObj.getZone(); + Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag); + if (cacheClusterZoneMap != null) { + Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag)); + for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) { + if (cacheEntry.getValue().size() == 0) { + continue; + } + Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(), + k -> MAP_SPLITTER.split(cacheEntry.getKey())); + if (isSubTag(wholeTagMap, subTagMap)) { + CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet(); + cacheSet.setSetName(clusterTag); + List<CacheCluster> cacheClusterList = cacheEntry.getValue(); + cacheSet.setType(cacheClusterList.get(0).getType()); + List<CacheClusterObject> cacheClusters = cacheSet.getCacheClusters(); + for (CacheCluster cacheCluster : cacheClusterList) { + CacheClusterObject obj = new CacheClusterObject(); + obj.setName(cacheCluster.getClusterName()); + obj.setZone(cacheCluster.getExtTag()); + obj.setParams(fromJson(cacheCluster.getExtParams())); + cacheClusters.add(obj); + } + } + } + } + } } /** - * reloadInlongId + * fromJson */ - private Map<String, List<InLongIdObject>> reloadInlongId() { - // parse group - Map<String, InlongGroupId> groupIdMap = new HashMap<>(); - clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value)); - // parse stream - Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>(); - for (InlongStreamId streamIdObj : clusterSetMapper.selectInlongStreamId()) { - String groupId = streamIdObj.getInlongGroupId(); - InlongGroupId groupIdObj = groupIdMap.get(groupId); - if (groupId == null) { - continue; + private Map<String, String> fromJson(String jsonString) { + Map<String, String> mapObj = new HashMap<>(); + try { + JsonObject obj = gson.fromJson(jsonString, JsonObject.class); + for (String key : obj.keySet()) { + JsonElement child = obj.get(key); + if (child.isJsonPrimitive()) { + mapObj.put(key, child.getAsString()); + } else { + mapObj.put(key, child.toString()); + } } - Map<String, String> groupParams = this.getExtParams(groupIdObj.getExtParams()); - Map<String, String> streamParams = this.getExtParams(streamIdObj.getExtParams()); - this.parseMasterTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap); - this.parseBackupTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } - return inlongIdMap; + return mapObj; } /** - * getExtParams + * reloadInlongId */ - @SuppressWarnings("unchecked") - private Map<String, String> getExtParams(String extParams) { - // parse extparams - if (!StringUtils.isEmpty(extParams)) { - try { - Map<String, String> groupParams = gson.fromJson(extParams, HashMap.class); - return groupParams; - } catch (Exception e) { - LOGGER.error("Fail to parse ext error:{},params:{}", e.getMessage(), extParams, e); + private void reloadInlongId(Map<String, DataProxyCluster> proxyClusterMap) { + // reload inlong group id + Map<String, InlongGroupId> groupIdMap = new HashMap<>(); + clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value)); + // reload inlong group ext params + Map<String, Map<String, String>> groupParams = new HashMap<>(); + groupIdMap.forEach((k, v) -> groupParams.put(k, fromJson(v.getExtParams()))); + // reload inlong group ext + List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader + .loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG); + groupExtCursor.forEach(v -> groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>()) + .put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue())); + // reload inlong stream id + Map<String, InlongStreamId> streamIdMap = new HashMap<>(); + clusterSetMapper.selectInlongStreamId() + .forEach(v -> streamIdMap.put(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), v)); + // reload inlong stream ext params + Map<String, Map<String, String>> streamParams = new HashMap<>(); + streamIdMap.forEach((k, v) -> streamParams.put(k, fromJson(v.getExtParams()))); + // reload inlong stream ext + List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader + .loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE); + streamExtCursor.forEach(v -> streamParams + .computeIfAbsent(getInlongId(v.getInlongGroupId(), v.getInlongStreamId()), k -> new HashMap<>()) + .put(ClusterSwitch.BACKUP_MQ_RESOURCE, v.getKeyValue())); + + // build Map<clusterTag, List<InlongIdObject>> + Map<String, List<InLongIdObject>> inlongIdMap = this.parseInlongId(groupIdMap, groupParams, streamIdMap, + streamParams); + // mark inlong id to proxy cluster + for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) { + String clusterTag = entry.getValue().getProxyCluster().getSetName(); + List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag); + if (inlongIds != null) { + entry.getValue().getProxyCluster().getInlongIds().addAll(inlongIds); } } - return new HashMap<>(); } /** - * parseMasterTopic + * parseInlongId */ - private void parseMasterTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj, - Map<String, String> groupParams, Map<String, String> streamParams, - Map<String, List<InLongIdObject>> inlongIdMap) { - // choose topic - String groupTopic = groupIdObj.getTopic(); - String streamTopic = streamIdObj.getTopic(); - String finalTopic = null; - if (StringUtils.isEmpty(groupTopic)) { - // both empty then ignore - if (StringUtils.isEmpty(streamTopic)) { - return; - } else { - finalTopic = streamTopic; + private Map<String, List<InLongIdObject>> parseInlongId(Map<String, InlongGroupId> groupIdMap, + Map<String, Map<String, String>> groupParams, Map<String, InlongStreamId> streamIdMap, + Map<String, Map<String, String>> streamParams) { + Map<String, List<InLongIdObject>> inlongIdMap = new HashMap<>(); + for (Entry<String, InlongStreamId> entry : streamIdMap.entrySet()) { + InlongStreamId streamIdObj = entry.getValue(); + String groupId = streamIdObj.getInlongGroupId(); + InlongGroupId groupIdObj = groupIdMap.get(groupId); + if (groupId == null) { + continue; } - } else { - if (StringUtils.isEmpty(streamTopic)) { - finalTopic = groupTopic; + // master + InLongIdObject obj = new InLongIdObject(); + String inlongId = entry.getKey(); + obj.setInlongId(inlongId); + Optional.ofNullable(groupParams.get(groupId)).ifPresent(v -> obj.getParams().putAll(v)); + Optional.ofNullable(streamParams.get(inlongId)).ifPresent(v -> obj.getParams().putAll(v)); + if (StringUtils.isBlank(streamIdObj.getTopic())) { + obj.setTopic(groupIdObj.getTopic()); } else { - // Pulsar: namespace+topic - finalTopic = groupTopic + "/" + streamTopic; + obj.setTopic(streamIdObj.getTopic()); + obj.getParams().put(KEY_NAMESPACE, groupIdObj.getTopic()); + } + inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj); + // backup + InLongIdObject backupObj = new InLongIdObject(); + backupObj.setInlongId(inlongId); + backupObj.getParams().putAll(obj.getParams()); + Map<String, String> groupParam = groupParams.get(groupId); + if (groupParam != null && groupParam.containsKey(ClusterSwitch.BACKUP_CLUSTER_TAG) + && groupParam.containsKey(ClusterSwitch.BACKUP_MQ_RESOURCE)) { + String clusterTag = groupParam.get(ClusterSwitch.BACKUP_CLUSTER_TAG); + String groupMqResource = groupParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE); + + Map<String, String> streamParam = streamParams.get(inlongId); + if (streamParam != null && !StringUtils.isBlank(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE))) { + obj.setTopic(streamParam.get(ClusterSwitch.BACKUP_MQ_RESOURCE)); + backupObj.getParams().put(KEY_NAMESPACE, groupMqResource); + } else { + obj.setTopic(groupMqResource); + } + inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(obj); } } - // concat id - InLongIdObject obj = new InLongIdObject(); - String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId(); - obj.setInlongId(inlongId); - obj.setTopic(finalTopic); - Map<String, String> params = new HashMap<>(); - params.putAll(groupParams); - params.putAll(streamParams); - obj.setParams(params); - inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList<>()).add(obj); + return inlongIdMap; } /** - * parseBackupTopic + * getInlongId */ - private void parseBackupTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj, - Map<String, String> groupParams, Map<String, String> streamParams, - Map<String, List<InLongIdObject>> inlongIdMap) { - Map<String, String> params = new HashMap<>(); - params.putAll(groupParams); - params.putAll(streamParams); - // find backup cluster tag - String clusterTag = params.get(KEY_BACKUP_CLUSTER_TAG); - if (StringUtils.isEmpty(clusterTag)) { - return; - } - // find backup topic - String groupTopic = groupParams.get(KEY_BACKUP_TOPIC); - String streamTopic = streamParams.get(KEY_BACKUP_TOPIC); - String finalTopic = null; - if (StringUtils.isEmpty(groupTopic)) { - // both empty then ignore - if (StringUtils.isEmpty(streamTopic)) { - return; - } else { - finalTopic = streamTopic; - } - } else { - if (StringUtils.isEmpty(streamTopic)) { - finalTopic = groupTopic; - } else { - // Pulsar: namespace+topic - finalTopic = groupTopic + "/" + streamTopic; - } - } - // concat id - InLongIdObject obj = new InLongIdObject(); - String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId(); - obj.setInlongId(inlongId); - obj.setTopic(finalTopic); - obj.setParams(params); - inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList<>()).add(obj); + private String getInlongId(String inlongGroupId, String inlongStreamId) { + return inlongGroupId + "." + inlongStreamId; } /** @@ -382,62 +424,22 @@ public class DataProxyConfigRepository implements IRepository { /** * generateClusterJson */ - @SuppressWarnings("unchecked") - private void generateClusterJson(Map<String, ProxyClusterObject> proxyClusterMap, - Map<String, Map<String, List<CacheCluster>>> cacheClusterMap) { + private void generateClusterJson(Map<String, DataProxyCluster> proxyClusterMap) { Map<String, String> newProxyConfigJson = new ConcurrentHashMap<>(); Map<String, String> newProxyMd5Map = new ConcurrentHashMap<>(); - Map<String, Map<String, String>> tagCache = new HashMap<>(); - for (Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) { - ProxyClusterObject proxyObj = entry.getValue(); - // proxy - DataProxyCluster clusterObj = new DataProxyCluster(); - clusterObj.setProxyCluster(proxyObj); - // cache - String clusterTag = proxyObj.getSetName(); - String extTag = proxyObj.getZone(); - Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag); - if (cacheClusterZoneMap != null) { - Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag)); - for (Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) { - if (cacheEntry.getValue().size() == 0) { - continue; - } - Map<String, String> wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(), - k -> MAP_SPLITTER.split(cacheEntry.getKey())); - if (isSubTag(wholeTagMap, subTagMap)) { - CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet(); - cacheSet.setSetName(clusterTag); - List<CacheCluster> cacheClusterList = cacheEntry.getValue(); - cacheSet.setType(cacheClusterList.get(0).getType()); - List<CacheClusterObject> cacheClusters = new ArrayList<>(cacheClusterList.size()); - cacheSet.setCacheClusters(cacheClusters); - for (CacheCluster cacheCluster : cacheClusterList) { - CacheClusterObject obj = new CacheClusterObject(); - obj.setName(cacheCluster.getClusterName()); - obj.setZone(cacheCluster.getExtTag()); - try { - Map<String, String> params = gson.fromJson(cacheCluster.getExtParams(), Map.class); - obj.setParams(params); - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - cacheClusters.add(obj); - } - } - } - } + for (Entry<String, DataProxyCluster> entry : proxyClusterMap.entrySet()) { + DataProxyCluster proxyObj = entry.getValue(); // json - String jsonDataProxyCluster = gson.toJson(clusterObj); + String jsonDataProxyCluster = gson.toJson(proxyObj); String md5 = DigestUtils.md5Hex(jsonDataProxyCluster); DataProxyConfigResponse response = new DataProxyConfigResponse(); response.setResult(true); response.setErrCode(DataProxyConfigResponse.SUCC); response.setMd5(md5); - response.setData(clusterObj); + response.setData(proxyObj); String jsonResponse = gson.toJson(response); - newProxyConfigJson.put(proxyObj.getName(), jsonResponse); - newProxyMd5Map.put(proxyObj.getName(), md5); + newProxyConfigJson.put(entry.getKey(), jsonResponse); + newProxyMd5Map.put(entry.getKey(), md5); } // replace