This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b5714f8e26 [INLONG-10081][DataProxy] Modify the data format of 
metadata saved in the metadata.json file (#10083)
b5714f8e26 is described below

commit b5714f8e26dcd1e8633c409d3e767f8dfeb18c0e
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Fri Apr 26 17:06:07 2024 +0800

    [INLONG-10081][DataProxy] Modify the data format of metadata saved in the 
metadata.json file (#10083)
---
 .../inlong/dataproxy/config/ConfigManager.java     | 260 ++++++++++++--
 .../dataproxy/config/holder/MetaConfigHolder.java  | 372 ++++++++-------------
 .../dataproxy/config/pojo/CacheClusterConfig.java  |  20 ++
 .../inlong/dataproxy/config/pojo/CacheType.java    |  11 +-
 .../dataproxy/config/pojo/IdTopicConfig.java       |  54 ++-
 .../dataproxy/config/pojo/InLongMetaConfig.java    |  69 ++++
 .../{source => consts}/SourceConstants.java        |  93 +-----
 .../apache/inlong/dataproxy/source/BaseSource.java |   1 +
 .../dataproxy/source/ServerMessageFactory.java     |   2 +
 .../inlong/dataproxy/source/SimpleHttpSource.java  |   1 +
 .../inlong/dataproxy/source/SimpleTcpSource.java   |   1 +
 .../inlong/dataproxy/source/SimpleUdpSource.java   |   1 +
 .../inlong/dataproxy/utils/AddressUtils.java       |   2 +-
 .../src/test/resources/metadata.json               |   2 +-
 14 files changed, 535 insertions(+), 354 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 01ef883b44..68772a8c78 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -17,10 +17,17 @@
 
 package org.apache.inlong.dataproxy.config;
 
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.InlongCompressType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.heartbeat.AddressInfo;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
+import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigRequest;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
+import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
+import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
 import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
@@ -29,12 +36,15 @@ import 
org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.WeightConfigHolder;
 import org.apache.inlong.dataproxy.config.holder.WhiteListConfigHolder;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.config.pojo.CacheType;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.config.pojo.InLongMetaConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.utils.HttpUtils;
 
 import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -45,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.security.SecureRandom;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -141,8 +152,8 @@ public class ConfigManager {
         return metaConfigHolder.getConfigMd5();
     }
 
-    public boolean updateMetaConfigInfo(String inDataMd5, String 
inDataJsonStr) {
-        return metaConfigHolder.updateConfigMap(inDataMd5, inDataJsonStr);
+    public boolean updateMetaConfigInfo(InLongMetaConfig metaConfig) {
+        return metaConfigHolder.updateConfigMap(metaConfig);
     }
 
     // register meta-config callback
@@ -331,7 +342,7 @@ public class ConfigManager {
                 }
                 httpPost.setEntity(HttpUtils.getEntity(request));
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Start to request {} to get config info, with 
params: {}, headers: {}",
+                    LOG.debug("Sync meta: start to get config, to:{}, params: 
{}, headers: {}",
                             url, request, httpPost.getAllHeaders());
                 }
                 // request with post
@@ -340,52 +351,69 @@ public class ConfigManager {
                 String returnStr = EntityUtils.toString(response.getEntity());
                 long dltTime = System.currentTimeMillis() - startTime;
                 if (dltTime >= 
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()) {
-                    LOG.warn("End to request {} to get config info, WAIST {} 
ms, over alarm value {} ms",
-                            url, dltTime, 
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs());
+                    LOG.warn("Sync meta: end to get config, WAIST {} ms, over 
alarm: {} ms, from:{}",
+                            dltTime, 
CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs(), url);
                 } else {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("End to request {} to get config info:{}, 
WAIST {} ms",
-                                url, returnStr, dltTime);
+                        LOG.debug("Sync meta: end to get config, WAIST {} ms, 
from:{}, result:{}",
+                                dltTime, url, returnStr);
                     }
                 }
                 if (response.getStatusLine().getStatusCode() != 200) {
-                    LOG.warn("Failed to request {}, with params: {}, headers: 
{}, the response is {}",
-                            url, request, httpPost.getAllHeaders(), returnStr);
+                    LOG.error(
+                            "Sync meta: return failure, errCode {}, from:{}, 
params:{}, headers:{}, response:{}",
+                            response.getStatusLine().getStatusCode(), url, 
request, httpPost.getAllHeaders(),
+                            returnStr);
                     return false;
                 }
                 // get groupId <-> topic and m value.
-                DataProxyConfigResponse proxyResponse =
-                        gson.fromJson(returnStr, 
DataProxyConfigResponse.class);
-                if (!proxyResponse.isResult()) {
-                    LOG.warn("Fail to get config from url {}, with params {}, 
error code is {}",
-                            url, request, proxyResponse.getErrCode());
+                DataProxyConfigResponse proxyResponse;
+                try {
+                    proxyResponse = gson.fromJson(returnStr, 
DataProxyConfigResponse.class);
+                } catch (Throwable e) {
+                    LOG.error("Sync meta: exception thrown while parsing 
config, from:{}, params:{}, response:{}",
+                            url, request, returnStr, e);
                     return false;
                 }
-                if (proxyResponse.getErrCode() != 
DataProxyConfigResponse.SUCC) {
+                // check required fields
+                ImmutablePair<Boolean, String> validResult = 
validRequiredFields(proxyResponse);
+                if (!validResult.getLeft()) {
                     if (proxyResponse.getErrCode() != 
DataProxyConfigResponse.NOUPDATE) {
-                        LOG.warn("Get config failure from url:{}, with params 
{}, error code is {}",
-                                url, request, proxyResponse.getErrCode());
+                        LOG.error("Sync meta: {}, from:{}, params:{}, 
return:{}",
+                                validResult.getRight(), url, request, 
returnStr);
                     }
                     return true;
                 }
-                DataProxyCluster dataProxyCluster = proxyResponse.getData();
-                if (dataProxyCluster == null
-                        || dataProxyCluster.getCacheClusterSet() == null
-                        || 
dataProxyCluster.getCacheClusterSet().getCacheClusters().isEmpty()) {
-                    LOG.warn("Get config empty from url:{}, with params {}, 
return:{}, cluster is empty!",
+                // get mq cluster info
+                ImmutablePair<CacheType, Map<String, CacheClusterConfig>> 
clusterInfo =
+                        
buildCacheClusterConfig(proxyResponse.getData().getCacheClusterSet());
+                if (clusterInfo.getLeft() == CacheType.N) {
+                    LOG.error("Sync meta: unsupported mq type {}, from:{}, 
params:{}, return:{}",
+                            clusterInfo.getLeft(), url, request, returnStr);
+                    return true;
+                }
+                if (clusterInfo.getRight().isEmpty()) {
+                    LOG.error("Sync meta: cacheClusters is empty, from:{}, 
params:{}, return:{}",
                             url, request, returnStr);
                     return true;
                 }
+                // get ID to Topic configure
+                Map<String, IdTopicConfig> idTopicConfigMap = 
buildCacheTopicConfig(
+                        clusterInfo.getLeft(), 
proxyResponse.getData().getProxyCluster());
+                InLongMetaConfig inLongMetaConfig = new 
InLongMetaConfig(proxyResponse.getMd5(),
+                        clusterInfo.getLeft(), clusterInfo.getRight(), 
idTopicConfigMap);
                 // update meta configure
-                if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(), 
returnStr)) {
-                    if (!ConfigManager.handshakeManagerOk.get()) {
-                        ConfigManager.handshakeManagerOk.set(true);
-                        LOG.info("Get config success from manager and updated, 
set handshake status is ok!");
-                    }
+                configManager.updateMetaConfigInfo(inLongMetaConfig);
+                // update handshake to manager status
+                if (ConfigManager.handshakeManagerOk.get()) {
+                    LOG.info("Sync meta: sync config success, from:{}", url);
+                } else {
+                    ConfigManager.handshakeManagerOk.set(true);
+                    LOG.info("Sync meta: sync config success, handshake 
manager ok, from:{}", url);
                 }
                 return true;
             } catch (Throwable ex) {
-                LOG.error("Request manager {} failure, throw exception", url, 
ex);
+                LOG.error("Sync meta: process throw exception, from:{}", url, 
ex);
                 return false;
             } finally {
                 if (httpPost != null) {
@@ -393,5 +421,179 @@ public class ConfigManager {
                 }
             }
         }
+
+        /**
+         * check required fields status
+         *
+         * @param response  response from Manager
+         *
+         * @return   check result
+         */
+        public ImmutablePair<Boolean, String> 
validRequiredFields(DataProxyConfigResponse response) {
+            if (response == null) {
+                return ImmutablePair.of(false, "parse result is null");
+            } else if (!response.isResult()) {
+                return ImmutablePair.of(false, "result is NOT true");
+            } else if (response.getErrCode() != DataProxyConfigResponse.SUCC) {
+                return ImmutablePair.of(false, "errCode is "
+                        + response.getErrCode() + ", NOT success");
+            } else if (response.getMd5() == null) {
+                return ImmutablePair.of(false, "md5 field is null");
+            } else if (response.getData() == null) {
+                return ImmutablePair.of(false, "data field is null");
+            } else if (response.getData().getProxyCluster() == null) {
+                return ImmutablePair.of(false, "proxyCluster field is null");
+            } else if (response.getData().getCacheClusterSet() == null) {
+                return ImmutablePair.of(false, "cacheClusterSet field is 
null");
+            } else if (response.getData().getProxyCluster().getInlongIds() == 
null) {
+                return ImmutablePair.of(false, "inlongIds field is null");
+            } else if 
(response.getData().getCacheClusterSet().getCacheClusters() == null) {
+                return ImmutablePair.of(false, "cacheClusters field is null");
+            }
+            return ImmutablePair.of(true, "ok");
+        }
+
+        /**
+         * build cluster config based on cluster set object
+         *
+         * @param clusterSetObject  mq cluster set obect
+         *
+         * @return   mq type and cluster set configure
+         */
+        private ImmutablePair<CacheType, Map<String, CacheClusterConfig>> 
buildCacheClusterConfig(
+                CacheClusterSetObject clusterSetObject) {
+            CacheType mqType = CacheType.convert(clusterSetObject.getType());
+            Map<String, CacheClusterConfig> result = new HashMap<>();
+            for (CacheClusterObject clusterObject : 
clusterSetObject.getCacheClusters()) {
+                if (clusterObject == null || 
StringUtils.isBlank(clusterObject.getName())) {
+                    continue;
+                }
+                CacheClusterConfig config = new CacheClusterConfig();
+                config.setClusterName(clusterObject.getName());
+                config.setToken(clusterObject.getToken());
+                config.getParams().putAll(clusterObject.getParams());
+                result.put(config.getClusterName(), config);
+            }
+            return ImmutablePair.of(mqType, result);
+        }
+
+        /**
+         * build id2field config based on id2Topic configure
+         *
+         * @param mqType  mq cluster type
+         * @param proxyClusterObject  cluster object info
+         *
+         * @return   ID to Topic configures
+         */
+        private Map<String, IdTopicConfig> buildCacheTopicConfig(
+                CacheType mqType, ProxyClusterObject proxyClusterObject) {
+            Map<String, IdTopicConfig> tmpTopicConfigMap = new HashMap<>();
+            List<InLongIdObject> inLongIds = proxyClusterObject.getInlongIds();
+            if (inLongIds.isEmpty()) {
+                return tmpTopicConfigMap;
+            }
+            int index;
+            String[] idItems;
+            String groupId;
+            String streamId;
+            String topicName;
+            String tenant;
+            String nameSpace;
+            for (InLongIdObject idObject : inLongIds) {
+                if (idObject == null
+                        || StringUtils.isBlank(idObject.getInlongId())
+                        || StringUtils.isBlank(idObject.getTopic())) {
+                    continue;
+                }
+                // parse inlong id
+                idItems = idObject.getInlongId().split("\\.");
+                if (idItems.length == 2) {
+                    if (StringUtils.isBlank(idItems[0])) {
+                        continue;
+                    }
+                    groupId = idItems[0].trim();
+                    streamId = idItems[1].trim();
+                } else {
+                    groupId = idObject.getInlongId().trim();
+                    streamId = "";
+                }
+                topicName = idObject.getTopic().trim();
+                // change full topic name "pulsar-xxx/test/base_topic_name" to
+                // base topic name "base_topic_name"
+                index = topicName.lastIndexOf('/');
+                if (index >= 0) {
+                    topicName = topicName.substring(index + 1).trim();
+                }
+                tenant = 
idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, "");
+                nameSpace = 
idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, "");
+                if (StringUtils.isBlank(idObject.getTopic())) {
+                    // namespace field must exist and value not be empty,
+                    // otherwise it is an illegal configuration item.
+                    continue;
+                }
+                if (mqType.equals(CacheType.TUBE)) {
+                    topicName = nameSpace;
+                } else if (mqType.equals(CacheType.KAFKA)) {
+                    if (topicName.equals(streamId)) {
+                        topicName = 
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName);
+                    }
+                }
+                IdTopicConfig tmpConfig = new IdTopicConfig();
+                tmpConfig.setInlongGroupIdAndStreamId(groupId, streamId);
+                tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
+                tmpConfig.setTopicName(topicName);
+                tmpConfig.setParams(idObject.getParams());
+                tmpConfig.setDataType(DataTypeEnum.convert(
+                        idObject.getParams().getOrDefault("dataType", 
DataTypeEnum.TEXT.getType())));
+                
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter", 
"|"));
+                
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter", 
"\n"));
+                tmpConfig.setUseExtendedFields(Boolean.valueOf(
+                        idObject.getParams().getOrDefault("useExtendedFields", 
"false")));
+                tmpConfig.setMsgWrapType(getPbWrapType(idObject));
+                tmpConfig.setV1CompressType(getPbCompressType(idObject));
+                tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig);
+                // add only groupId object for tube
+                if (mqType.equals(CacheType.TUBE)
+                        && 
!tmpConfig.getUid().equals(tmpConfig.getInlongGroupId())
+                        && tmpTopicConfigMap.get(tmpConfig.getInlongGroupId()) 
== null) {
+                    IdTopicConfig tmpConfig2 = new IdTopicConfig();
+                    tmpConfig2.setInlongGroupIdAndStreamId(groupId, "");
+                    tmpConfig2.setTenantAndNameSpace(tenant, nameSpace);
+                    tmpConfig2.setTopicName(topicName);
+                    tmpConfig2.setDataType(tmpConfig.getDataType());
+                    
tmpConfig2.setFieldDelimiter(tmpConfig.getFieldDelimiter());
+                    tmpConfig2.setFileDelimiter(tmpConfig.getFileDelimiter());
+                    tmpConfig2.setParams(new HashMap<>(tmpConfig.getParams()));
+                    
tmpConfig2.setUseExtendedFields(tmpConfig.isUseExtendedFields());
+                    tmpConfig2.setMsgWrapType(tmpConfig.getMsgWrapType());
+                    
tmpConfig2.setV1CompressType(tmpConfig.getV1CompressType());
+                    tmpTopicConfigMap.put(tmpConfig2.getUid(), tmpConfig2);
+                }
+            }
+            return tmpTopicConfigMap;
+        }
+
+        private MessageWrapType getPbWrapType(InLongIdObject idObject) {
+            String strWrapType = idObject.getParams().get("wrapType");
+            if (StringUtils.isBlank(strWrapType)) {
+                return MessageWrapType.UNKNOWN;
+            } else {
+                return MessageWrapType.forType(strWrapType);
+            }
+        }
+
+        private InlongCompressType getPbCompressType(InLongIdObject idObject) {
+            String strCompressType = 
idObject.getParams().get("inlongCompressType");
+            if (StringUtils.isBlank(strCompressType)) {
+                return 
CommonConfigHolder.getInstance().getDefV1MsgCompressType();
+            } else {
+                InlongCompressType msgCompType = 
InlongCompressType.forType(strCompressType);
+                if (msgCompType == InlongCompressType.UNKNOWN) {
+                    return 
CommonConfigHolder.getInstance().getDefV1MsgCompressType();
+                } else {
+                    return msgCompType;
+                }
+            }
+        }
     }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 96c5bd820a..555c55f9cc 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -17,26 +17,19 @@
 
 package org.apache.inlong.dataproxy.config.holder;
 
-import org.apache.inlong.common.constant.Constants;
-import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
-import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
-import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
-import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.CacheType;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
-import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.config.pojo.InLongMetaConfig;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 
 import com.google.gson.Gson;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +67,6 @@ public class MetaConfigHolder extends ConfigHolder {
     private String tmpDataMd5 = "";
     private final AtomicLong lastSyncVersion = new AtomicLong(0);
     // cached data
-    private final List<String> defTopics = new ArrayList<>();
     private final AtomicInteger clusterType = new 
AtomicInteger(CacheType.N.getId());
     private final ConcurrentHashMap<String, CacheClusterConfig> mqClusterMap = 
new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSrcMap = 
new ConcurrentHashMap<>();
@@ -84,13 +76,6 @@ public class MetaConfigHolder extends ConfigHolder {
         super(metaConfigFileName);
     }
 
-    public void addDefTopic(String defTopic) {
-        if (StringUtils.isBlank(defTopic)) {
-            return;
-        }
-        defTopics.add(defTopic);
-    }
-
     /**
      * get source topic by groupId and streamId
      */
@@ -153,28 +138,46 @@ public class MetaConfigHolder extends ConfigHolder {
         }
     }
 
-    public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) {
-        if (StringUtils.isBlank(inDataMd5)
-                || StringUtils.isBlank(inDataJsonStr)) {
-            return false;
-        }
+    public boolean updateConfigMap(InLongMetaConfig metaConfig) {
+        String inDataJsonStr;
+        // check cache data
         synchronized (this.lastSyncVersion) {
             if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) {
-                if (inDataJsonStr.equals(tmpDataMd5)) {
+                if (tmpDataMd5.equals(metaConfig.getMd5())) {
                     return false;
                 }
-                LOG.info("Load changed metadata {} , but reloading content, 
over {} ms",
+                LOG.info("Update metadata: NOT UPDATE, {} is loading, but wast 
over {} ms",
                         getFileName(), System.currentTimeMillis() - 
this.lastSyncVersion.get());
                 return false;
             } else {
-                if (inDataMd5.equals(dataMd5)) {
+                if (dataMd5.equals(metaConfig.getMd5())) {
                     return false;
                 }
             }
-            return storeConfigToFile(inDataMd5, inDataJsonStr);
+            InLongMetaConfig newMetaConfig = buildMixedMetaConfig(metaConfig);
+            try {
+                inDataJsonStr = GSON.toJson(newMetaConfig);
+            } catch (Throwable e) {
+                LOG.error("Update metadata: failure to serial meta config to 
json", e);
+                return false;
+            }
+            return storeConfigToFile(inDataJsonStr, newMetaConfig);
         }
     }
 
+    private InLongMetaConfig buildMixedMetaConfig(InLongMetaConfig metaConfig) 
{
+        // process and check cluster info
+        Map<String, CacheClusterConfig> newClusterConfigMap =
+                new HashMap<>(metaConfig.getClusterConfigMap().size());
+        newClusterConfigMap.putAll(metaConfig.getClusterConfigMap());
+        // process id2topic info
+        Map<String, IdTopicConfig> newIdTopicConfigMap =
+                new HashMap<>(metaConfig.getIdTopicConfigMap().size());
+        newIdTopicConfigMap.putAll(metaConfig.getIdTopicConfigMap());
+        return new InLongMetaConfig(metaConfig.getMd5(),
+                metaConfig.getMqType(), newClusterConfigMap, 
newIdTopicConfigMap);
+    }
+
     public List<CacheClusterConfig> forkCachedCLusterConfig() {
         List<CacheClusterConfig> result = new ArrayList<>();
         if (mqClusterMap.isEmpty()) {
@@ -214,264 +217,146 @@ public class MetaConfigHolder extends ConfigHolder {
         // check meta update setting
         if 
(!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile()
                 && !ConfigManager.handshakeManagerOk.get()) {
-            LOG.warn("Failed to load json config from {}, don't obtain 
metadata from the Manager,"
-                    + " and the startup via the cache file is false", 
getFileName());
+            LOG.warn("Load metadata: StartupUsingLocalMetaFile is false, don't 
obtain metadata from {}"
+                    + " before handshake with Manager", getFileName());
             return false;
         }
         String jsonString = "";
+        InLongMetaConfig metaConfig;
         readWriteLock.writeLock().lock();
         try {
             jsonString = loadConfigFromFile();
             if (StringUtils.isBlank(jsonString)) {
-                LOG.warn("Load changed json {}, but no records configured", 
getFileName());
+                LOG.error("Load metadata: NOT LOADED, changed but empty 
records, file:{}", getFileName());
                 return true;
             }
-            DataProxyConfigResponse metaConfig =
-                    GSON.fromJson(jsonString, DataProxyConfigResponse.class);
-            // check result tag
-            if (!metaConfig.isResult() || metaConfig.getErrCode() != 
DataProxyConfigResponse.SUCC) {
-                LOG.warn("Load failed json config from {}, error code is {}",
-                        getFileName(), metaConfig.getErrCode());
+            try {
+                metaConfig = GSON.fromJson(jsonString, InLongMetaConfig.class);
+            } catch (Throwable e) {
+                LOG.error("Load metadata: NOT LOADED, parse json config 
failure, file:{}", getFileName(), e);
                 return true;
             }
-            // check cluster data
-            DataProxyCluster clusterObj = metaConfig.getData();
-            if (clusterObj == null) {
-                LOG.warn("Load failed json config from {}, malformed content, 
data is null", getFileName());
+            // check required fields
+            ImmutablePair<Boolean, String> paramChkResult = 
validRequiredFields(metaConfig);
+            if (!paramChkResult.getLeft()) {
+                LOG.error("Load metadata: NOT LOADED, {}, file:{}",
+                        paramChkResult.getRight(), getFileName());
                 return true;
             }
-            // update cache data
-            if (updateCacheData(jsonString, metaConfig)) {
-                LOG.info("Load changed {} file success!", getFileName());
-            }
+            // update cached data
+            replaceCacheConfig(metaConfig.getMqType(),
+                    metaConfig.getClusterConfigMap(), 
metaConfig.getIdTopicConfigMap());
+            this.dataMd5 = metaConfig.getMd5();
+            this.dataStr = jsonString;
+            LOG.info("Load metadata: LOADED success, from {}!", getFileName());
             return true;
         } catch (Throwable e) {
-            LOG.warn("Process json {} changed data {} failure", getFileName(), 
jsonString, e);
+            LOG.error("Load metadata: NOT LOADED, load from {} throw 
exception", getFileName(), e);
             return false;
         } finally {
+            if (this.lastSyncVersion.get() == 0) {
+                this.lastUpdVersion.set(System.currentTimeMillis());
+                this.lastSyncVersion.compareAndSet(0, 
this.lastUpdVersion.get());
+            } else {
+                this.lastUpdVersion.set(this.lastSyncVersion.get());
+            }
             readWriteLock.writeLock().unlock();
         }
     }
 
-    private boolean updateCacheData(String jsonString, DataProxyConfigResponse 
metaConfig) {
-        // get and valid inlongIds configure
-        ProxyClusterObject proxyClusterObject = 
metaConfig.getData().getProxyCluster();
-        if (proxyClusterObject == null) {
-            LOG.warn("Load failed json config from {}, malformed content, 
proxyCluster field is null",
-                    getFileName());
-            return false;
-        }
-        CacheClusterSetObject clusterSetObject = 
metaConfig.getData().getCacheClusterSet();
-        if (clusterSetObject == null) {
-            LOG.warn("Load failed json config from {}, malformed content, 
cacheClusterSet field is null",
-                    getFileName());
-            return false;
-        }
-        List<InLongIdObject> inLongIds = proxyClusterObject.getInlongIds();
-        if (inLongIds == null) {
-            LOG.warn("Load failed json config from {}, malformed content, 
inlongIds field is null",
-                    getFileName());
-            return false;
-        }
-        // get mq type
-        CacheType mqType = CacheType.convert(clusterSetObject.getType());
-        if (mqType == CacheType.N) {
-            LOG.warn("Load failed json config from {}, unsupported mq type {}",
-                    getFileName(), clusterSetObject.getType());
-            return false;
+    /**
+     * store meta config to file
+     *
+     * @param metaJsonStr meta info string
+     * @param metaConfig  meta info object
+     *
+     * @return  store result
+     */
+    private boolean storeConfigToFile(String metaJsonStr, InLongMetaConfig 
metaConfig) {
+        boolean isSuccess = false;
+        String filePath = getFilePath();
+        if (StringUtils.isBlank(filePath)) {
+            LOG.error("Store metadata: error in writing file {} as the file 
path is null.", getFileName());
+            return isSuccess;
         }
-        // get mq cluster info
-        Map<String, CacheClusterConfig> tmpClusterConfigMap = new HashMap<>();
-        for (CacheClusterObject clusterObject : 
clusterSetObject.getCacheClusters()) {
-            if (clusterObject == null || 
StringUtils.isBlank(clusterObject.getName())) {
-                continue;
+        readWriteLock.writeLock().lock();
+        try {
+            File sourceFile = new File(filePath);
+            File targetFile = new File(getNextBackupFileName());
+            File tmpNewFile = new File(getFileName() + ".tmp");
+
+            if (sourceFile.exists()) {
+                FileUtils.copyFile(sourceFile, targetFile);
             }
-            CacheClusterConfig config = new CacheClusterConfig();
-            config.setClusterName(clusterObject.getName());
-            config.setToken(clusterObject.getToken());
-            config.getParams().putAll(clusterObject.getParams());
-            tmpClusterConfigMap.put(config.getClusterName(), config);
-        }
-        if (tmpClusterConfigMap.isEmpty()) {
-            LOG.warn("Load failed json config from {}, no valid {} mq cluster",
-                    getFileName(), clusterSetObject.getType());
-            return false;
-        }
-        // get topic config info
-        Map<String, IdTopicConfig> tmpTopicConfigMap = 
buildCacheTopicConfig(mqType, inLongIds);
-        replaceCacheConfig(mqType, tmpClusterConfigMap, tmpTopicConfigMap);
-        // update cached data
-        this.dataMd5 = metaConfig.getMd5();
-        this.dataStr = jsonString;
-        if (this.lastSyncVersion.get() == 0) {
-            this.lastUpdVersion.set(System.currentTimeMillis());
-            this.lastSyncVersion.compareAndSet(0, this.lastUpdVersion.get());
-        } else {
-            this.lastUpdVersion.set(this.lastSyncVersion.get());
+            FileUtils.writeStringToFile(tmpNewFile, metaJsonStr, 
StandardCharsets.UTF_8);
+            FileUtils.copyFile(tmpNewFile, sourceFile);
+            tmpNewFile.delete();
+            tmpDataMd5 = metaConfig.getMd5();
+            lastSyncVersion.set(System.currentTimeMillis());
+            isSuccess = true;
+            setFileChanged();
+        } catch (Throwable ex) {
+            LOG.error("Store metadata: exception thrown while writing to file 
{}", getFileName(), ex);
+        } finally {
+            readWriteLock.writeLock().unlock();
         }
-        return true;
+        return isSuccess;
     }
 
+    /**
+     * update locally cached configuration with input information
+     *
+     * @param cacheType  mq cluster type
+     * @param clusterConfigMap  mq cluster config
+     * @param idTopicConfigMap  id to topic config
+     */
     private void replaceCacheConfig(CacheType cacheType,
             Map<String, CacheClusterConfig> clusterConfigMap,
-            Map<String, IdTopicConfig> topicConfigMap) {
+            Map<String, IdTopicConfig> idTopicConfigMap) {
         this.clusterType.getAndSet(cacheType.getId());
         // remove deleted id2topic config
-        Set<String> tmpKeys = new HashSet<>();
+        Set<String> tmpRmvKeys = new HashSet<>();
         for (Map.Entry<String, IdTopicConfig> entry : 
id2TopicSrcMap.entrySet()) {
             if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
                 continue;
             }
-            if (!topicConfigMap.containsKey(entry.getKey())) {
-                tmpKeys.add(entry.getKey());
+            if (!idTopicConfigMap.containsKey(entry.getKey())) {
+                tmpRmvKeys.add(entry.getKey());
             }
         }
-        for (String key : tmpKeys) {
+        for (String key : tmpRmvKeys) {
             id2TopicSrcMap.remove(key);
         }
         // add new id2topic source config
-        id2TopicSrcMap.putAll(topicConfigMap);
+        id2TopicSrcMap.putAll(idTopicConfigMap);
         // add new id2topic sink config
-        id2TopicSinkMap.putAll(topicConfigMap);
+        id2TopicSinkMap.putAll(idTopicConfigMap);
         // remove deleted cluster config
-        tmpKeys.clear();
+        tmpRmvKeys.clear();
         for (Map.Entry<String, CacheClusterConfig> entry : 
mqClusterMap.entrySet()) {
             if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
                 continue;
             }
             if (!clusterConfigMap.containsKey(entry.getKey())) {
-                tmpKeys.add(entry.getKey());
+                tmpRmvKeys.add(entry.getKey());
             }
         }
-        for (String key : tmpKeys) {
+        for (String key : tmpRmvKeys) {
             mqClusterMap.remove(key);
         }
         // add new mq cluster config
         mqClusterMap.putAll(clusterConfigMap);
     }
 
-    private Map<String, IdTopicConfig> buildCacheTopicConfig(
-            CacheType mqType, List<InLongIdObject> inLongIds) {
-        Map<String, IdTopicConfig> tmpTopicConfigMap = new HashMap<>();
-        if (inLongIds.isEmpty()) {
-            return tmpTopicConfigMap;
-        }
-        int index;
-        String[] idItems;
-        String groupId;
-        String streamId;
-        String topicName;
-        String tenant;
-        String nameSpace;
-        for (InLongIdObject idObject : inLongIds) {
-            if (idObject == null
-                    || StringUtils.isBlank(idObject.getInlongId())
-                    || StringUtils.isBlank(idObject.getTopic())) {
-                continue;
-            }
-            // parse inlong id
-            idItems = idObject.getInlongId().split("\\.");
-            if (idItems.length == 2) {
-                if (StringUtils.isBlank(idItems[0])) {
-                    continue;
-                }
-                groupId = idItems[0].trim();
-                streamId = idItems[1].trim();
-            } else {
-                groupId = idObject.getInlongId().trim();
-                streamId = "";
-            }
-            topicName = idObject.getTopic().trim();
-            // change full topic name "pulsar-xxx/test/base_topic_name" to
-            // base topic name "base_topic_name"
-            index = topicName.lastIndexOf('/');
-            if (index >= 0) {
-                topicName = topicName.substring(index + 1).trim();
-            }
-            tenant = 
idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, "");
-            nameSpace = 
idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, "");
-            if (StringUtils.isBlank(idObject.getTopic())) {
-                // namespace field must exist and value not be empty,
-                // otherwise it is an illegal configuration item.
-                continue;
-            }
-            if (mqType.equals(CacheType.TUBE)) {
-                topicName = nameSpace;
-            } else if (mqType.equals(CacheType.KAFKA)) {
-                if (topicName.equals(streamId)) {
-                    topicName = 
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName);
-                }
-            }
-            IdTopicConfig tmpConfig = new IdTopicConfig();
-            tmpConfig.setInlongGroupIdAndStreamId(groupId, streamId);
-            tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
-            tmpConfig.setTopicName(topicName);
-            tmpConfig.setParams(idObject.getParams());
-            tmpConfig.setDataType(DataTypeEnum.convert(
-                    idObject.getParams().getOrDefault("dataType", 
DataTypeEnum.TEXT.getType())));
-            
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter", 
"|"));
-            
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter", 
"\n"));
-            tmpConfig.setUseExtendedFields(Boolean.valueOf(
-                    idObject.getParams().getOrDefault("useExtendedFields", 
"false")));
-            tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig);
-            if (mqType.equals(CacheType.TUBE)
-                    && !tmpConfig.getUid().equals(tmpConfig.getInlongGroupId())
-                    && tmpTopicConfigMap.get(tmpConfig.getInlongGroupId()) == 
null) {
-                IdTopicConfig tmpConfig2 = new IdTopicConfig();
-                tmpConfig2.setInlongGroupIdAndStreamId(groupId, "");
-                tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
-                tmpConfig2.setTopicName(topicName);
-                tmpConfig2.setDataType(tmpConfig.getDataType());
-                tmpConfig2.setFieldDelimiter(tmpConfig.getFieldDelimiter());
-                tmpConfig2.setFileDelimiter(tmpConfig.getFileDelimiter());
-                tmpConfig2.setParams(tmpConfig.getParams());
-                tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig2);
-            }
-        }
-        return tmpTopicConfigMap;
-    }
-
     /**
-     * store meta config to file
-     */
-    private boolean storeConfigToFile(String inDataMd5, String metaJsonStr) {
-        boolean isSuccess = false;
-        String filePath = getFilePath();
-        if (StringUtils.isBlank(filePath)) {
-            LOG.error("Error in writing file {} as the file path is null.", 
getFileName());
-            return isSuccess;
-        }
-        readWriteLock.writeLock().lock();
-        try {
-            File sourceFile = new File(filePath);
-            File targetFile = new File(getNextBackupFileName());
-            File tmpNewFile = new File(getFileName() + ".tmp");
-
-            if (sourceFile.exists()) {
-                FileUtils.copyFile(sourceFile, targetFile);
-            }
-            FileUtils.writeStringToFile(tmpNewFile, metaJsonStr, 
StandardCharsets.UTF_8);
-            FileUtils.copyFile(tmpNewFile, sourceFile);
-            tmpNewFile.delete();
-            tmpDataMd5 = inDataMd5;
-            lastSyncVersion.set(System.currentTimeMillis());
-            isSuccess = true;
-            setFileChanged();
-        } catch (Throwable ex) {
-            LOG.error("Error in writing file {}", getFileName(), ex);
-        } finally {
-            readWriteLock.writeLock().unlock();
-        }
-        return isSuccess;
-    }
-
-    /**
-     * load from holder
+     * load configure from holder
+     *
+     * @return the configure info
      */
     private String loadConfigFromFile() {
         String result = "";
         if (StringUtils.isBlank(getFileName())) {
-            LOG.error("Fail to load json {} as the file name is null.", 
getFileName());
+            LOG.error("Load metadata: fail to load json {} as the file name is 
null.", getFileName());
             return result;
         }
         InputStream inStream = null;
@@ -479,12 +364,12 @@ public class MetaConfigHolder extends ConfigHolder {
             URL url = getClass().getClassLoader().getResource(getFileName());
             inStream = url != null ? url.openStream() : null;
             if (inStream == null) {
-                LOG.error("Fail to load json {} as the input stream is null", 
getFileName());
+                LOG.error("Load metadata: fail to load json {} as the input 
stream is null", getFileName());
                 return result;
             }
             int size = inStream.available();
             if (size > MAX_ALLOWED_JSON_FILE_SIZE) {
-                LOG.error("Fail to load json {} as the content size({}) over 
max allowed size({})",
+                LOG.error("Load metadata: fail to load json {} as the content 
size({}) over max allowed size({})",
                         getFileName(), size, MAX_ALLOWED_JSON_FILE_SIZE);
                 return result;
             }
@@ -492,16 +377,45 @@ public class MetaConfigHolder extends ConfigHolder {
             inStream.read(buffer);
             result = new String(buffer, StandardCharsets.UTF_8);
         } catch (Throwable e) {
-            LOG.error("Fail to load json {}", getFileName(), e);
+            LOG.error("Load metadata: exception thrown while load from file 
{}", getFileName(), e);
         } finally {
             if (null != inStream) {
                 try {
                     inStream.close();
                 } catch (IOException e) {
-                    LOG.error("Fail in inStream.close for file {}", 
getFileName(), e);
+                    LOG.error("Load metadata: fail in inStream.close for file 
{}", getFileName(), e);
                 }
             }
         }
         return result;
     }
+
+    /**
+     * check required fields status
+     *
+     * @param metaConfig  response from Manager
+     *
+     * @return   check result
+     */
+    public ImmutablePair<Boolean, String> validRequiredFields(InLongMetaConfig 
metaConfig) {
+        if (metaConfig == null) {
+            return ImmutablePair.of(false, "metaConfig object is null");
+        } else if (metaConfig.getMd5() == null) {
+            return ImmutablePair.of(false, "metaConfig.md5 field is null");
+        } else if (metaConfig.getMqType() == null) {
+            return ImmutablePair.of(false, "metaConfig.mqType field is null");
+        } else if (metaConfig.getMqType() == CacheType.N) {
+            return ImmutablePair.of(false, "metaConfig.mqType value is 
CacheType.N");
+        } else if (metaConfig.getClusterConfigMap() == null) {
+            return ImmutablePair.of(false, "metaConfig.clusterConfigMap field 
is null");
+        } else if (metaConfig.getClusterConfigMap().isEmpty()) {
+            return ImmutablePair.of(false, "metaConfig.clusterConfigMap field 
is empty");
+        } else if (metaConfig.getIdTopicConfigMap() == null) {
+            return ImmutablePair.of(false, "metaConfig.idTopicConfigMap field 
is null");
+        } else if (metaConfig.getIdTopicConfigMap().isEmpty()) {
+            return ImmutablePair.of(false, "metaConfig.idTopicConfigMap is 
empty");
+        }
+        return ImmutablePair.of(true, "ok");
+    }
+
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
index 094aee59b3..6c48467c1e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * 
@@ -94,4 +95,23 @@ public class CacheClusterConfig {
                 .append("params", params)
                 .toString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof CacheClusterConfig)) {
+            return false;
+        }
+        CacheClusterConfig that = (CacheClusterConfig) o;
+        return Objects.equals(clusterName, that.clusterName)
+                && Objects.equals(token, that.token)
+                && Objects.equals(params, that.params);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clusterName, token, params);
+    }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
index 0d8a5430d4..2ac72981a2 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
@@ -73,7 +73,16 @@ public enum CacheType {
      */
     public static CacheType convert(String value) {
         for (CacheType v : values()) {
-            if (v.value().equals(value)) {
+            if (v.value().equalsIgnoreCase(value)) {
+                return v;
+            }
+        }
+        return N;
+    }
+
+    public static CacheType valueOf(int idValue) {
+        for (CacheType v : values()) {
+            if (v.getId() == idValue) {
                 return v;
             }
         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index 48b19d446d..8d2a01915d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.dataproxy.config.pojo;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.InlongCompressType;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 
 import org.apache.commons.lang.StringUtils;
@@ -25,6 +27,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * IdTopicConfig
@@ -42,6 +45,8 @@ public class IdTopicConfig {
     private String fieldDelimiter = "|";
     private String fileDelimiter = "\n";
     private Boolean useExtendedFields = false;
+    private MessageWrapType msgWrapType = MessageWrapType.UNKNOWN;
+    private InlongCompressType v1CompressType = 
InlongCompressType.INLONG_SNAPPY;
 
     private Map<String, String> params = new HashMap<>();
 
@@ -49,7 +54,7 @@ public class IdTopicConfig {
 
     }
 
-    public Boolean getUseExtendedFields() {
+    public boolean isUseExtendedFields() {
         return useExtendedFields;
     }
 
@@ -57,6 +62,22 @@ public class IdTopicConfig {
         this.useExtendedFields = useExtendedFields;
     }
 
+    public MessageWrapType getMsgWrapType() {
+        return msgWrapType;
+    }
+
+    public void setMsgWrapType(MessageWrapType msgWrapType) {
+        this.msgWrapType = msgWrapType;
+    }
+
+    public InlongCompressType getV1CompressType() {
+        return v1CompressType;
+    }
+
+    public void setV1CompressType(InlongCompressType v1CompressType) {
+        this.v1CompressType = v1CompressType;
+    }
+
     /**
      * get uid
      * @return the uid
@@ -221,11 +242,42 @@ public class IdTopicConfig {
                 .append("inlongGroupId", inlongGroupId)
                 .append("inlongStreamid", inlongStreamid)
                 .append("topicName", topicName)
+                .append("tenant", tenant)
                 .append("nameSpace", nameSpace)
                 .append("dataType", dataType)
                 .append("fieldDelimiter", fieldDelimiter)
                 .append("fileDelimiter", fileDelimiter)
+                .append("useExtendedFields", useExtendedFields)
+                .append("msgWrapType", msgWrapType)
+                .append("pbCompressType", v1CompressType)
                 .append("params", params)
                 .toString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IdTopicConfig)) {
+            return false;
+        }
+        IdTopicConfig that = (IdTopicConfig) o;
+        return uid.equals(that.uid) && Objects.equals(inlongGroupId, 
that.inlongGroupId)
+                && Objects.equals(inlongStreamid, that.inlongStreamid) && 
topicName.equals(that.topicName)
+                && Objects.equals(tenant, that.tenant) && 
Objects.equals(nameSpace, that.nameSpace)
+                && dataType == that.dataType && Objects.equals(fieldDelimiter, 
that.fieldDelimiter)
+                && Objects.equals(fileDelimiter, that.fileDelimiter)
+                && Objects.equals(useExtendedFields, that.useExtendedFields)
+                && Objects.equals(msgWrapType, that.msgWrapType)
+                && v1CompressType == that.v1CompressType
+                && Objects.equals(params, that.params);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(uid, inlongGroupId, inlongStreamid, topicName, 
tenant, nameSpace,
+                dataType, fieldDelimiter, fileDelimiter, useExtendedFields, 
msgWrapType,
+                v1CompressType, params);
+    }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java
new file mode 100644
index 0000000000..4d80ee4d13
--- /dev/null
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.pojo;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+import java.util.Map;
+
+public class InLongMetaConfig {
+
+    private String md5;
+    private CacheType mqType;
+    private Map<String, CacheClusterConfig> clusterConfigMap;
+    private Map<String, IdTopicConfig> idTopicConfigMap;
+
+    public InLongMetaConfig() {
+
+    }
+
+    public InLongMetaConfig(String md5, CacheType mqType,
+            Map<String, CacheClusterConfig> clusterConfigMap,
+            Map<String, IdTopicConfig> idTopicConfigMap) {
+        this.md5 = md5;
+        this.mqType = mqType;
+        this.clusterConfigMap = clusterConfigMap;
+        this.idTopicConfigMap = idTopicConfigMap;
+    }
+
+    public String getMd5() {
+        return md5;
+    }
+
+    public CacheType getMqType() {
+        return mqType;
+    }
+
+    public Map<String, CacheClusterConfig> getClusterConfigMap() {
+        return clusterConfigMap;
+    }
+
+    public Map<String, IdTopicConfig> getIdTopicConfigMap() {
+        return idTopicConfigMap;
+    }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this)
+                .append("md5", md5)
+                .append("mqType", mqType)
+                .append("clusterConfigMap", clusterConfigMap)
+                .append("idTopicConfigMap", idTopicConfigMap)
+                .toString();
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
similarity index 51%
rename from 
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
rename to 
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
index 3eb12d6a8c..9ead853540 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.dataproxy.source;
+package org.apache.inlong.dataproxy.consts;
 
 public class SourceConstants {
 
@@ -105,95 +105,4 @@ public class SourceConstants {
     public static final String SRC_PROTOCOL_TYPE_TCP = "tcp";
     public static final String SRC_PROTOCOL_TYPE_UDP = "udp";
     public static final String SRC_PROTOCOL_TYPE_HTTP = "http";
-
-    public static final String SERVICE_PROCESSOR_NAME = "service-decoder-name";
-    public static final String ENABLE_EXCEPTION_RETURN = 
"enableExceptionReturn";
-
-    public static final String TRAFFIC_CLASS = "trafficClass";
-
-    public static final String HEART_INTERVAL_SEC = "heart-interval-sec";
-
-    public static final String PACKAGE_TIMEOUT_SEC = "package-timeout-sec";
-
-    public static final String HEART_SERVERS = "heart-servers";
-
-    public static final String TOPIC_KEY = "topic";
-    public static final String REMOTE_IP_KEY = "srcIp";
-    public static final String DATAPROXY_IP_KEY = "dpIp";
-    public static final String MSG_ENCODE_VER = "msgEnType";
-    public static final String REMOTE_IDC_KEY = "idc";
-    public static final String MSG_COUNTER_KEY = "msgcnt";
-    public static final String PKG_COUNTER_KEY = "pkgcnt";
-    public static final String PKG_TIME_KEY = "msg.pkg.time";
-    public static final String TRANSFER_KEY = "transfer";
-    public static final String DEST_IP_KEY = "dstIp";
-    public static final String INTERFACE_KEY = "interface";
-    public static final String SINK_MIN_METRIC_KEY = "sink-min-metric-topic";
-    public static final String SINK_HOUR_METRIC_KEY = "sink-hour-metric-topic";
-    public static final String SINK_TEN_METRIC_KEY = "sink-ten-metric-topic";
-    public static final String SINK_QUA_METRIC_KEY = "sink-qua-metric-topic";
-    public static final String L5_MIN_METRIC_KEY = "l5-min-metric-topic";
-    public static final String L5_MIN_FAIL_METRIC_KEY = 
"l5-min-fail-metric-key";
-    public static final String L5_HOUR_METRIC_KEY = "l5-hour-metric-topic";
-    public static final String L5_ID_KEY = "l5id";
-    public static final String SET_KEY = "set";
-    public static final String CLUSTER_ID_KEY = "clusterId";
-
-    public static final String DECODER_BODY = "body";
-    public static final String DECODER_TOPICKEY = "topic_key";
-    public static final String DECODER_ATTRS = "attrs";
-    public static final String MSG_TYPE = "msg_type";
-    public static final String COMPRESS_TYPE = "compress_type";
-    public static final String EXTRA_ATTR = "extra_attr";
-    public static final String COMMON_ATTR_MAP = "common_attr_map";
-    public static final String MSG_LIST = "msg_list";
-    public static final String VERSION_TYPE = "version";
-    public static final String FILE_CHECK_DATA = "file-check-data";
-    public static final String MINUTE_CHECK_DATA = "minute-check-data";
-    public static final String SLA_METRIC_DATA = "sla-metric-data";
-    public static final String SLA_METRIC_GROUPID = "manager_sla_metric";
-
-    public static final String FILE_BODY = "file-body";
-    public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;
-
-    public static final String SEQUENCE_ID = "sequencial_id";
-
-    public static final String TOTAL_LEN = "totalLen";
-
-    public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 
"link_max_allowed_delayed_msg_count";
-    public static final String SESSION_WARN_DELAYED_MSG_COUNT = 
"session_warn_delayed_msg_count";
-    public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 
"session_max_allowed_delayed_msg_count";
-    public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 
"netty_write_buffer_high_water_mark";
-    public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
-
-    public static final String MANAGER_PATH = "/inlong/manager/openapi";
-    public static final String MANAGER_GET_CONFIG_PATH = 
"/dataproxy/getConfig";
-    public static final String MANAGER_GET_ALL_CONFIG_PATH = 
"/dataproxy/getAllConfig";
-    public static final String MANAGER_HEARTBEAT_REPORT = "/heartbeat/report";
-
-    public static final String MANAGER_AUTH_SECRET_ID = 
"manager.auth.secretId";
-    public static final String MANAGER_AUTH_SECRET_KEY = 
"manager.auth.secretKey";
-    // Pulsar config
-    public static final String KEY_TENANT = "tenant";
-    public static final String KEY_NAMESPACE = "namespace";
-
-    public static final String KEY_SERVICE_URL = "serviceUrl";
-    public static final String KEY_AUTHENTICATION = "authentication";
-    public static final String KEY_STATS_INTERVAL_SECONDS = 
"statsIntervalSeconds";
-
-    public static final String KEY_ENABLEBATCHING = "enableBatching";
-    public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
-    public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
-    public static final String KEY_BATCHINGMAXPUBLISHDELAY = 
"batchingMaxPublishDelay";
-    public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
-    public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = 
"maxPendingMessagesAcrossPartitions";
-    public static final String KEY_SENDTIMEOUT = "sendTimeout";
-    public static final String KEY_COMPRESSIONTYPE = "compressionType";
-    public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
-    public static final String 
KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
-            + "BatchingPartitionSwitchFrequency";
-
-    public static final String KEY_IOTHREADS = "ioThreads";
-    public static final String KEY_MEMORYLIMIT = "memoryLimit";
-    public static final String KEY_CONNECTIONSPERBROKER = 
"connectionsPerBroker";
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index c317bba799..acf0e4d8a4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -25,6 +25,7 @@ import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
 import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index d68a8b4df2..517c11113b 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.dataproxy.source;
 
+import org.apache.inlong.dataproxy.consts.SourceConstants;
+
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
index 64375733fd..3a90e9f10a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.source;
 
 import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
 
 import com.google.common.base.Preconditions;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 5455cdddb4..b343dc4dcb 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.source;
 
 import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
 import org.apache.inlong.dataproxy.utils.EventLoopUtil;
 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index fef78bdd1b..380fc3dab9 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.source;
 
 import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelOption;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
index 71516da004..04322fe339 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.dataproxy.utils;
 
-import org.apache.inlong.dataproxy.source.SourceConstants;
+import org.apache.inlong.dataproxy.consts.SourceConstants;
 
 import io.netty.channel.Channel;
 import org.apache.commons.lang3.StringUtils;
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json 
b/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json
index 074be5958c..b9de51bafb 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json
@@ -1 +1 @@
-{"result":true,"errCode":0,"md5":"5a3f5939bb7368f493bf41c1d785b8f3","data":{"proxyCluster":{"name":"test_dataproxy","setName":"test_set","zone":"default\u003dtrue","channels":[],"inlongIds":[{"inlongId":"test_group.stream1","topic":"stream1","params":{"namespace":"test_group","ignoreParseError":"true","appGroupName":"app_test_group","productId":"58","productName":"test_meta","wrapWithInlongMsg":"true"}}],"sources":[],"sinks":[]},"cacheClusterSet":{"setName":"test_set","type":"TUBEMQ","ca
 [...]
+{"md5":"5a3f5939bb7368f493bf41c1d785b8f3","mqType":"TUBE","clusterConfigMap":{"test_tubemq":{"clusterName":
 "test_tubemq","token": "******","zone": "default=true","params": 
{"masterWebUrl": "http://127.0.0.1:8080","messageQueueHandler": 
"org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler","master-host-port-list": 
"127.0.0.1:8000"}}},"idTopicConfigMap":{"test_group.stream1":{"uid":"test_group.stream1","inlongGroupId":"test_group","inlongStreamid":"stream1","topicName":"test_group","data
 [...]
\ No newline at end of file

Reply via email to