This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b5714f8e26 [INLONG-10081][DataProxy] Modify the data format of metadata saved in the metadata.json file (#10083) b5714f8e26 is described below commit b5714f8e26dcd1e8633c409d3e767f8dfeb18c0e Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri Apr 26 17:06:07 2024 +0800 [INLONG-10081][DataProxy] Modify the data format of metadata saved in the metadata.json file (#10083) --- .../inlong/dataproxy/config/ConfigManager.java | 260 ++++++++++++-- .../dataproxy/config/holder/MetaConfigHolder.java | 372 ++++++++------------- .../dataproxy/config/pojo/CacheClusterConfig.java | 20 ++ .../inlong/dataproxy/config/pojo/CacheType.java | 11 +- .../dataproxy/config/pojo/IdTopicConfig.java | 54 ++- .../dataproxy/config/pojo/InLongMetaConfig.java | 69 ++++ .../{source => consts}/SourceConstants.java | 93 +----- .../apache/inlong/dataproxy/source/BaseSource.java | 1 + .../dataproxy/source/ServerMessageFactory.java | 2 + .../inlong/dataproxy/source/SimpleHttpSource.java | 1 + .../inlong/dataproxy/source/SimpleTcpSource.java | 1 + .../inlong/dataproxy/source/SimpleUdpSource.java | 1 + .../inlong/dataproxy/utils/AddressUtils.java | 2 +- .../src/test/resources/metadata.json | 2 +- 14 files changed, 535 insertions(+), 354 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java index 01ef883b44..68772a8c78 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java @@ -17,10 +17,17 @@ package org.apache.inlong.dataproxy.config; +import org.apache.inlong.common.constant.Constants; +import org.apache.inlong.common.enums.DataTypeEnum; +import org.apache.inlong.common.enums.InlongCompressType; +import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.common.heartbeat.AddressInfo; -import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster; +import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject; +import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject; import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigRequest; import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse; +import org.apache.inlong.common.pojo.dataproxy.InLongIdObject; +import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject; import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder; import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder; @@ -29,12 +36,15 @@ import org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder; import org.apache.inlong.dataproxy.config.holder.WeightConfigHolder; import org.apache.inlong.dataproxy.config.holder.WhiteListConfigHolder; import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig; +import org.apache.inlong.dataproxy.config.pojo.CacheType; import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig; +import org.apache.inlong.dataproxy.config.pojo.InLongMetaConfig; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.utils.HttpUtils; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -45,6 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.security.SecureRandom; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -141,8 +152,8 @@ public class ConfigManager { return metaConfigHolder.getConfigMd5(); } - public boolean updateMetaConfigInfo(String inDataMd5, String inDataJsonStr) { - return metaConfigHolder.updateConfigMap(inDataMd5, inDataJsonStr); + public boolean updateMetaConfigInfo(InLongMetaConfig metaConfig) { + return metaConfigHolder.updateConfigMap(metaConfig); } // register meta-config callback @@ -331,7 +342,7 @@ public class ConfigManager { } httpPost.setEntity(HttpUtils.getEntity(request)); if (LOG.isDebugEnabled()) { - LOG.debug("Start to request {} to get config info, with params: {}, headers: {}", + LOG.debug("Sync meta: start to get config, to:{}, params: {}, headers: {}", url, request, httpPost.getAllHeaders()); } // request with post @@ -340,52 +351,69 @@ public class ConfigManager { String returnStr = EntityUtils.toString(response.getEntity()); long dltTime = System.currentTimeMillis() - startTime; if (dltTime >= CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()) { - LOG.warn("End to request {} to get config info, WAIST {} ms, over alarm value {} ms", - url, dltTime, CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs()); + LOG.warn("Sync meta: end to get config, WAIST {} ms, over alarm: {} ms, from:{}", + dltTime, CommonConfigHolder.getInstance().getMetaConfigWastAlarmMs(), url); } else { if (LOG.isDebugEnabled()) { - LOG.debug("End to request {} to get config info:{}, WAIST {} ms", - url, returnStr, dltTime); + LOG.debug("Sync meta: end to get config, WAIST {} ms, from:{}, result:{}", + dltTime, url, returnStr); } } if (response.getStatusLine().getStatusCode() != 200) { - LOG.warn("Failed to request {}, with params: {}, headers: {}, the response is {}", - url, request, httpPost.getAllHeaders(), returnStr); + LOG.error( + "Sync meta: return failure, errCode {}, from:{}, params:{}, headers:{}, response:{}", + response.getStatusLine().getStatusCode(), url, request, httpPost.getAllHeaders(), + returnStr); return false; } // get groupId <-> topic and m value. - DataProxyConfigResponse proxyResponse = - gson.fromJson(returnStr, DataProxyConfigResponse.class); - if (!proxyResponse.isResult()) { - LOG.warn("Fail to get config from url {}, with params {}, error code is {}", - url, request, proxyResponse.getErrCode()); + DataProxyConfigResponse proxyResponse; + try { + proxyResponse = gson.fromJson(returnStr, DataProxyConfigResponse.class); + } catch (Throwable e) { + LOG.error("Sync meta: exception thrown while parsing config, from:{}, params:{}, response:{}", + url, request, returnStr, e); return false; } - if (proxyResponse.getErrCode() != DataProxyConfigResponse.SUCC) { + // check required fields + ImmutablePair<Boolean, String> validResult = validRequiredFields(proxyResponse); + if (!validResult.getLeft()) { if (proxyResponse.getErrCode() != DataProxyConfigResponse.NOUPDATE) { - LOG.warn("Get config failure from url:{}, with params {}, error code is {}", - url, request, proxyResponse.getErrCode()); + LOG.error("Sync meta: {}, from:{}, params:{}, return:{}", + validResult.getRight(), url, request, returnStr); } return true; } - DataProxyCluster dataProxyCluster = proxyResponse.getData(); - if (dataProxyCluster == null - || dataProxyCluster.getCacheClusterSet() == null - || dataProxyCluster.getCacheClusterSet().getCacheClusters().isEmpty()) { - LOG.warn("Get config empty from url:{}, with params {}, return:{}, cluster is empty!", + // get mq cluster info + ImmutablePair<CacheType, Map<String, CacheClusterConfig>> clusterInfo = + buildCacheClusterConfig(proxyResponse.getData().getCacheClusterSet()); + if (clusterInfo.getLeft() == CacheType.N) { + LOG.error("Sync meta: unsupported mq type {}, from:{}, params:{}, return:{}", + clusterInfo.getLeft(), url, request, returnStr); + return true; + } + if (clusterInfo.getRight().isEmpty()) { + LOG.error("Sync meta: cacheClusters is empty, from:{}, params:{}, return:{}", url, request, returnStr); return true; } + // get ID to Topic configure + Map<String, IdTopicConfig> idTopicConfigMap = buildCacheTopicConfig( + clusterInfo.getLeft(), proxyResponse.getData().getProxyCluster()); + InLongMetaConfig inLongMetaConfig = new InLongMetaConfig(proxyResponse.getMd5(), + clusterInfo.getLeft(), clusterInfo.getRight(), idTopicConfigMap); // update meta configure - if (configManager.updateMetaConfigInfo(proxyResponse.getMd5(), returnStr)) { - if (!ConfigManager.handshakeManagerOk.get()) { - ConfigManager.handshakeManagerOk.set(true); - LOG.info("Get config success from manager and updated, set handshake status is ok!"); - } + configManager.updateMetaConfigInfo(inLongMetaConfig); + // update handshake to manager status + if (ConfigManager.handshakeManagerOk.get()) { + LOG.info("Sync meta: sync config success, from:{}", url); + } else { + ConfigManager.handshakeManagerOk.set(true); + LOG.info("Sync meta: sync config success, handshake manager ok, from:{}", url); } return true; } catch (Throwable ex) { - LOG.error("Request manager {} failure, throw exception", url, ex); + LOG.error("Sync meta: process throw exception, from:{}", url, ex); return false; } finally { if (httpPost != null) { @@ -393,5 +421,179 @@ public class ConfigManager { } } } + + /** + * check required fields status + * + * @param response response from Manager + * + * @return check result + */ + public ImmutablePair<Boolean, String> validRequiredFields(DataProxyConfigResponse response) { + if (response == null) { + return ImmutablePair.of(false, "parse result is null"); + } else if (!response.isResult()) { + return ImmutablePair.of(false, "result is NOT true"); + } else if (response.getErrCode() != DataProxyConfigResponse.SUCC) { + return ImmutablePair.of(false, "errCode is " + + response.getErrCode() + ", NOT success"); + } else if (response.getMd5() == null) { + return ImmutablePair.of(false, "md5 field is null"); + } else if (response.getData() == null) { + return ImmutablePair.of(false, "data field is null"); + } else if (response.getData().getProxyCluster() == null) { + return ImmutablePair.of(false, "proxyCluster field is null"); + } else if (response.getData().getCacheClusterSet() == null) { + return ImmutablePair.of(false, "cacheClusterSet field is null"); + } else if (response.getData().getProxyCluster().getInlongIds() == null) { + return ImmutablePair.of(false, "inlongIds field is null"); + } else if (response.getData().getCacheClusterSet().getCacheClusters() == null) { + return ImmutablePair.of(false, "cacheClusters field is null"); + } + return ImmutablePair.of(true, "ok"); + } + + /** + * build cluster config based on cluster set object + * + * @param clusterSetObject mq cluster set obect + * + * @return mq type and cluster set configure + */ + private ImmutablePair<CacheType, Map<String, CacheClusterConfig>> buildCacheClusterConfig( + CacheClusterSetObject clusterSetObject) { + CacheType mqType = CacheType.convert(clusterSetObject.getType()); + Map<String, CacheClusterConfig> result = new HashMap<>(); + for (CacheClusterObject clusterObject : clusterSetObject.getCacheClusters()) { + if (clusterObject == null || StringUtils.isBlank(clusterObject.getName())) { + continue; + } + CacheClusterConfig config = new CacheClusterConfig(); + config.setClusterName(clusterObject.getName()); + config.setToken(clusterObject.getToken()); + config.getParams().putAll(clusterObject.getParams()); + result.put(config.getClusterName(), config); + } + return ImmutablePair.of(mqType, result); + } + + /** + * build id2field config based on id2Topic configure + * + * @param mqType mq cluster type + * @param proxyClusterObject cluster object info + * + * @return ID to Topic configures + */ + private Map<String, IdTopicConfig> buildCacheTopicConfig( + CacheType mqType, ProxyClusterObject proxyClusterObject) { + Map<String, IdTopicConfig> tmpTopicConfigMap = new HashMap<>(); + List<InLongIdObject> inLongIds = proxyClusterObject.getInlongIds(); + if (inLongIds.isEmpty()) { + return tmpTopicConfigMap; + } + int index; + String[] idItems; + String groupId; + String streamId; + String topicName; + String tenant; + String nameSpace; + for (InLongIdObject idObject : inLongIds) { + if (idObject == null + || StringUtils.isBlank(idObject.getInlongId()) + || StringUtils.isBlank(idObject.getTopic())) { + continue; + } + // parse inlong id + idItems = idObject.getInlongId().split("\\."); + if (idItems.length == 2) { + if (StringUtils.isBlank(idItems[0])) { + continue; + } + groupId = idItems[0].trim(); + streamId = idItems[1].trim(); + } else { + groupId = idObject.getInlongId().trim(); + streamId = ""; + } + topicName = idObject.getTopic().trim(); + // change full topic name "pulsar-xxx/test/base_topic_name" to + // base topic name "base_topic_name" + index = topicName.lastIndexOf('/'); + if (index >= 0) { + topicName = topicName.substring(index + 1).trim(); + } + tenant = idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, ""); + nameSpace = idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, ""); + if (StringUtils.isBlank(idObject.getTopic())) { + // namespace field must exist and value not be empty, + // otherwise it is an illegal configuration item. + continue; + } + if (mqType.equals(CacheType.TUBE)) { + topicName = nameSpace; + } else if (mqType.equals(CacheType.KAFKA)) { + if (topicName.equals(streamId)) { + topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName); + } + } + IdTopicConfig tmpConfig = new IdTopicConfig(); + tmpConfig.setInlongGroupIdAndStreamId(groupId, streamId); + tmpConfig.setTenantAndNameSpace(tenant, nameSpace); + tmpConfig.setTopicName(topicName); + tmpConfig.setParams(idObject.getParams()); + tmpConfig.setDataType(DataTypeEnum.convert( + idObject.getParams().getOrDefault("dataType", DataTypeEnum.TEXT.getType()))); + tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter", "|")); + tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter", "\n")); + tmpConfig.setUseExtendedFields(Boolean.valueOf( + idObject.getParams().getOrDefault("useExtendedFields", "false"))); + tmpConfig.setMsgWrapType(getPbWrapType(idObject)); + tmpConfig.setV1CompressType(getPbCompressType(idObject)); + tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig); + // add only groupId object for tube + if (mqType.equals(CacheType.TUBE) + && !tmpConfig.getUid().equals(tmpConfig.getInlongGroupId()) + && tmpTopicConfigMap.get(tmpConfig.getInlongGroupId()) == null) { + IdTopicConfig tmpConfig2 = new IdTopicConfig(); + tmpConfig2.setInlongGroupIdAndStreamId(groupId, ""); + tmpConfig2.setTenantAndNameSpace(tenant, nameSpace); + tmpConfig2.setTopicName(topicName); + tmpConfig2.setDataType(tmpConfig.getDataType()); + tmpConfig2.setFieldDelimiter(tmpConfig.getFieldDelimiter()); + tmpConfig2.setFileDelimiter(tmpConfig.getFileDelimiter()); + tmpConfig2.setParams(new HashMap<>(tmpConfig.getParams())); + tmpConfig2.setUseExtendedFields(tmpConfig.isUseExtendedFields()); + tmpConfig2.setMsgWrapType(tmpConfig.getMsgWrapType()); + tmpConfig2.setV1CompressType(tmpConfig.getV1CompressType()); + tmpTopicConfigMap.put(tmpConfig2.getUid(), tmpConfig2); + } + } + return tmpTopicConfigMap; + } + + private MessageWrapType getPbWrapType(InLongIdObject idObject) { + String strWrapType = idObject.getParams().get("wrapType"); + if (StringUtils.isBlank(strWrapType)) { + return MessageWrapType.UNKNOWN; + } else { + return MessageWrapType.forType(strWrapType); + } + } + + private InlongCompressType getPbCompressType(InLongIdObject idObject) { + String strCompressType = idObject.getParams().get("inlongCompressType"); + if (StringUtils.isBlank(strCompressType)) { + return CommonConfigHolder.getInstance().getDefV1MsgCompressType(); + } else { + InlongCompressType msgCompType = InlongCompressType.forType(strCompressType); + if (msgCompType == InlongCompressType.UNKNOWN) { + return CommonConfigHolder.getInstance().getDefV1MsgCompressType(); + } else { + return msgCompType; + } + } + } } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java index 96c5bd820a..555c55f9cc 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java @@ -17,26 +17,19 @@ package org.apache.inlong.dataproxy.config.holder; -import org.apache.inlong.common.constant.Constants; -import org.apache.inlong.common.enums.DataTypeEnum; -import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject; -import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject; -import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster; -import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse; -import org.apache.inlong.common.pojo.dataproxy.InLongIdObject; -import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject; import org.apache.inlong.dataproxy.config.CommonConfigHolder; import org.apache.inlong.dataproxy.config.ConfigHolder; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig; import org.apache.inlong.dataproxy.config.pojo.CacheType; import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig; -import org.apache.inlong.dataproxy.consts.ConfigConstants; +import org.apache.inlong.dataproxy.config.pojo.InLongMetaConfig; import org.apache.inlong.sdk.commons.protocol.InlongId; import com.google.gson.Gson; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +67,6 @@ public class MetaConfigHolder extends ConfigHolder { private String tmpDataMd5 = ""; private final AtomicLong lastSyncVersion = new AtomicLong(0); // cached data - private final List<String> defTopics = new ArrayList<>(); private final AtomicInteger clusterType = new AtomicInteger(CacheType.N.getId()); private final ConcurrentHashMap<String, CacheClusterConfig> mqClusterMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, IdTopicConfig> id2TopicSrcMap = new ConcurrentHashMap<>(); @@ -84,13 +76,6 @@ public class MetaConfigHolder extends ConfigHolder { super(metaConfigFileName); } - public void addDefTopic(String defTopic) { - if (StringUtils.isBlank(defTopic)) { - return; - } - defTopics.add(defTopic); - } - /** * get source topic by groupId and streamId */ @@ -153,28 +138,46 @@ public class MetaConfigHolder extends ConfigHolder { } } - public boolean updateConfigMap(String inDataMd5, String inDataJsonStr) { - if (StringUtils.isBlank(inDataMd5) - || StringUtils.isBlank(inDataJsonStr)) { - return false; - } + public boolean updateConfigMap(InLongMetaConfig metaConfig) { + String inDataJsonStr; + // check cache data synchronized (this.lastSyncVersion) { if (this.lastSyncVersion.get() > this.lastUpdVersion.get()) { - if (inDataJsonStr.equals(tmpDataMd5)) { + if (tmpDataMd5.equals(metaConfig.getMd5())) { return false; } - LOG.info("Load changed metadata {} , but reloading content, over {} ms", + LOG.info("Update metadata: NOT UPDATE, {} is loading, but wast over {} ms", getFileName(), System.currentTimeMillis() - this.lastSyncVersion.get()); return false; } else { - if (inDataMd5.equals(dataMd5)) { + if (dataMd5.equals(metaConfig.getMd5())) { return false; } } - return storeConfigToFile(inDataMd5, inDataJsonStr); + InLongMetaConfig newMetaConfig = buildMixedMetaConfig(metaConfig); + try { + inDataJsonStr = GSON.toJson(newMetaConfig); + } catch (Throwable e) { + LOG.error("Update metadata: failure to serial meta config to json", e); + return false; + } + return storeConfigToFile(inDataJsonStr, newMetaConfig); } } + private InLongMetaConfig buildMixedMetaConfig(InLongMetaConfig metaConfig) { + // process and check cluster info + Map<String, CacheClusterConfig> newClusterConfigMap = + new HashMap<>(metaConfig.getClusterConfigMap().size()); + newClusterConfigMap.putAll(metaConfig.getClusterConfigMap()); + // process id2topic info + Map<String, IdTopicConfig> newIdTopicConfigMap = + new HashMap<>(metaConfig.getIdTopicConfigMap().size()); + newIdTopicConfigMap.putAll(metaConfig.getIdTopicConfigMap()); + return new InLongMetaConfig(metaConfig.getMd5(), + metaConfig.getMqType(), newClusterConfigMap, newIdTopicConfigMap); + } + public List<CacheClusterConfig> forkCachedCLusterConfig() { List<CacheClusterConfig> result = new ArrayList<>(); if (mqClusterMap.isEmpty()) { @@ -214,264 +217,146 @@ public class MetaConfigHolder extends ConfigHolder { // check meta update setting if (!CommonConfigHolder.getInstance().isEnableStartupUsingLocalMetaFile() && !ConfigManager.handshakeManagerOk.get()) { - LOG.warn("Failed to load json config from {}, don't obtain metadata from the Manager," - + " and the startup via the cache file is false", getFileName()); + LOG.warn("Load metadata: StartupUsingLocalMetaFile is false, don't obtain metadata from {}" + + " before handshake with Manager", getFileName()); return false; } String jsonString = ""; + InLongMetaConfig metaConfig; readWriteLock.writeLock().lock(); try { jsonString = loadConfigFromFile(); if (StringUtils.isBlank(jsonString)) { - LOG.warn("Load changed json {}, but no records configured", getFileName()); + LOG.error("Load metadata: NOT LOADED, changed but empty records, file:{}", getFileName()); return true; } - DataProxyConfigResponse metaConfig = - GSON.fromJson(jsonString, DataProxyConfigResponse.class); - // check result tag - if (!metaConfig.isResult() || metaConfig.getErrCode() != DataProxyConfigResponse.SUCC) { - LOG.warn("Load failed json config from {}, error code is {}", - getFileName(), metaConfig.getErrCode()); + try { + metaConfig = GSON.fromJson(jsonString, InLongMetaConfig.class); + } catch (Throwable e) { + LOG.error("Load metadata: NOT LOADED, parse json config failure, file:{}", getFileName(), e); return true; } - // check cluster data - DataProxyCluster clusterObj = metaConfig.getData(); - if (clusterObj == null) { - LOG.warn("Load failed json config from {}, malformed content, data is null", getFileName()); + // check required fields + ImmutablePair<Boolean, String> paramChkResult = validRequiredFields(metaConfig); + if (!paramChkResult.getLeft()) { + LOG.error("Load metadata: NOT LOADED, {}, file:{}", + paramChkResult.getRight(), getFileName()); return true; } - // update cache data - if (updateCacheData(jsonString, metaConfig)) { - LOG.info("Load changed {} file success!", getFileName()); - } + // update cached data + replaceCacheConfig(metaConfig.getMqType(), + metaConfig.getClusterConfigMap(), metaConfig.getIdTopicConfigMap()); + this.dataMd5 = metaConfig.getMd5(); + this.dataStr = jsonString; + LOG.info("Load metadata: LOADED success, from {}!", getFileName()); return true; } catch (Throwable e) { - LOG.warn("Process json {} changed data {} failure", getFileName(), jsonString, e); + LOG.error("Load metadata: NOT LOADED, load from {} throw exception", getFileName(), e); return false; } finally { + if (this.lastSyncVersion.get() == 0) { + this.lastUpdVersion.set(System.currentTimeMillis()); + this.lastSyncVersion.compareAndSet(0, this.lastUpdVersion.get()); + } else { + this.lastUpdVersion.set(this.lastSyncVersion.get()); + } readWriteLock.writeLock().unlock(); } } - private boolean updateCacheData(String jsonString, DataProxyConfigResponse metaConfig) { - // get and valid inlongIds configure - ProxyClusterObject proxyClusterObject = metaConfig.getData().getProxyCluster(); - if (proxyClusterObject == null) { - LOG.warn("Load failed json config from {}, malformed content, proxyCluster field is null", - getFileName()); - return false; - } - CacheClusterSetObject clusterSetObject = metaConfig.getData().getCacheClusterSet(); - if (clusterSetObject == null) { - LOG.warn("Load failed json config from {}, malformed content, cacheClusterSet field is null", - getFileName()); - return false; - } - List<InLongIdObject> inLongIds = proxyClusterObject.getInlongIds(); - if (inLongIds == null) { - LOG.warn("Load failed json config from {}, malformed content, inlongIds field is null", - getFileName()); - return false; - } - // get mq type - CacheType mqType = CacheType.convert(clusterSetObject.getType()); - if (mqType == CacheType.N) { - LOG.warn("Load failed json config from {}, unsupported mq type {}", - getFileName(), clusterSetObject.getType()); - return false; + /** + * store meta config to file + * + * @param metaJsonStr meta info string + * @param metaConfig meta info object + * + * @return store result + */ + private boolean storeConfigToFile(String metaJsonStr, InLongMetaConfig metaConfig) { + boolean isSuccess = false; + String filePath = getFilePath(); + if (StringUtils.isBlank(filePath)) { + LOG.error("Store metadata: error in writing file {} as the file path is null.", getFileName()); + return isSuccess; } - // get mq cluster info - Map<String, CacheClusterConfig> tmpClusterConfigMap = new HashMap<>(); - for (CacheClusterObject clusterObject : clusterSetObject.getCacheClusters()) { - if (clusterObject == null || StringUtils.isBlank(clusterObject.getName())) { - continue; + readWriteLock.writeLock().lock(); + try { + File sourceFile = new File(filePath); + File targetFile = new File(getNextBackupFileName()); + File tmpNewFile = new File(getFileName() + ".tmp"); + + if (sourceFile.exists()) { + FileUtils.copyFile(sourceFile, targetFile); } - CacheClusterConfig config = new CacheClusterConfig(); - config.setClusterName(clusterObject.getName()); - config.setToken(clusterObject.getToken()); - config.getParams().putAll(clusterObject.getParams()); - tmpClusterConfigMap.put(config.getClusterName(), config); - } - if (tmpClusterConfigMap.isEmpty()) { - LOG.warn("Load failed json config from {}, no valid {} mq cluster", - getFileName(), clusterSetObject.getType()); - return false; - } - // get topic config info - Map<String, IdTopicConfig> tmpTopicConfigMap = buildCacheTopicConfig(mqType, inLongIds); - replaceCacheConfig(mqType, tmpClusterConfigMap, tmpTopicConfigMap); - // update cached data - this.dataMd5 = metaConfig.getMd5(); - this.dataStr = jsonString; - if (this.lastSyncVersion.get() == 0) { - this.lastUpdVersion.set(System.currentTimeMillis()); - this.lastSyncVersion.compareAndSet(0, this.lastUpdVersion.get()); - } else { - this.lastUpdVersion.set(this.lastSyncVersion.get()); + FileUtils.writeStringToFile(tmpNewFile, metaJsonStr, StandardCharsets.UTF_8); + FileUtils.copyFile(tmpNewFile, sourceFile); + tmpNewFile.delete(); + tmpDataMd5 = metaConfig.getMd5(); + lastSyncVersion.set(System.currentTimeMillis()); + isSuccess = true; + setFileChanged(); + } catch (Throwable ex) { + LOG.error("Store metadata: exception thrown while writing to file {}", getFileName(), ex); + } finally { + readWriteLock.writeLock().unlock(); } - return true; + return isSuccess; } + /** + * update locally cached configuration with input information + * + * @param cacheType mq cluster type + * @param clusterConfigMap mq cluster config + * @param idTopicConfigMap id to topic config + */ private void replaceCacheConfig(CacheType cacheType, Map<String, CacheClusterConfig> clusterConfigMap, - Map<String, IdTopicConfig> topicConfigMap) { + Map<String, IdTopicConfig> idTopicConfigMap) { this.clusterType.getAndSet(cacheType.getId()); // remove deleted id2topic config - Set<String> tmpKeys = new HashSet<>(); + Set<String> tmpRmvKeys = new HashSet<>(); for (Map.Entry<String, IdTopicConfig> entry : id2TopicSrcMap.entrySet()) { if (entry == null || entry.getKey() == null || entry.getValue() == null) { continue; } - if (!topicConfigMap.containsKey(entry.getKey())) { - tmpKeys.add(entry.getKey()); + if (!idTopicConfigMap.containsKey(entry.getKey())) { + tmpRmvKeys.add(entry.getKey()); } } - for (String key : tmpKeys) { + for (String key : tmpRmvKeys) { id2TopicSrcMap.remove(key); } // add new id2topic source config - id2TopicSrcMap.putAll(topicConfigMap); + id2TopicSrcMap.putAll(idTopicConfigMap); // add new id2topic sink config - id2TopicSinkMap.putAll(topicConfigMap); + id2TopicSinkMap.putAll(idTopicConfigMap); // remove deleted cluster config - tmpKeys.clear(); + tmpRmvKeys.clear(); for (Map.Entry<String, CacheClusterConfig> entry : mqClusterMap.entrySet()) { if (entry == null || entry.getKey() == null || entry.getValue() == null) { continue; } if (!clusterConfigMap.containsKey(entry.getKey())) { - tmpKeys.add(entry.getKey()); + tmpRmvKeys.add(entry.getKey()); } } - for (String key : tmpKeys) { + for (String key : tmpRmvKeys) { mqClusterMap.remove(key); } // add new mq cluster config mqClusterMap.putAll(clusterConfigMap); } - private Map<String, IdTopicConfig> buildCacheTopicConfig( - CacheType mqType, List<InLongIdObject> inLongIds) { - Map<String, IdTopicConfig> tmpTopicConfigMap = new HashMap<>(); - if (inLongIds.isEmpty()) { - return tmpTopicConfigMap; - } - int index; - String[] idItems; - String groupId; - String streamId; - String topicName; - String tenant; - String nameSpace; - for (InLongIdObject idObject : inLongIds) { - if (idObject == null - || StringUtils.isBlank(idObject.getInlongId()) - || StringUtils.isBlank(idObject.getTopic())) { - continue; - } - // parse inlong id - idItems = idObject.getInlongId().split("\\."); - if (idItems.length == 2) { - if (StringUtils.isBlank(idItems[0])) { - continue; - } - groupId = idItems[0].trim(); - streamId = idItems[1].trim(); - } else { - groupId = idObject.getInlongId().trim(); - streamId = ""; - } - topicName = idObject.getTopic().trim(); - // change full topic name "pulsar-xxx/test/base_topic_name" to - // base topic name "base_topic_name" - index = topicName.lastIndexOf('/'); - if (index >= 0) { - topicName = topicName.substring(index + 1).trim(); - } - tenant = idObject.getParams().getOrDefault(ConfigConstants.KEY_TENANT, ""); - nameSpace = idObject.getParams().getOrDefault(ConfigConstants.KEY_NAMESPACE, ""); - if (StringUtils.isBlank(idObject.getTopic())) { - // namespace field must exist and value not be empty, - // otherwise it is an illegal configuration item. - continue; - } - if (mqType.equals(CacheType.TUBE)) { - topicName = nameSpace; - } else if (mqType.equals(CacheType.KAFKA)) { - if (topicName.equals(streamId)) { - topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, nameSpace, topicName); - } - } - IdTopicConfig tmpConfig = new IdTopicConfig(); - tmpConfig.setInlongGroupIdAndStreamId(groupId, streamId); - tmpConfig.setTenantAndNameSpace(tenant, nameSpace); - tmpConfig.setTopicName(topicName); - tmpConfig.setParams(idObject.getParams()); - tmpConfig.setDataType(DataTypeEnum.convert( - idObject.getParams().getOrDefault("dataType", DataTypeEnum.TEXT.getType()))); - tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter", "|")); - tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter", "\n")); - tmpConfig.setUseExtendedFields(Boolean.valueOf( - idObject.getParams().getOrDefault("useExtendedFields", "false"))); - tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig); - if (mqType.equals(CacheType.TUBE) - && !tmpConfig.getUid().equals(tmpConfig.getInlongGroupId()) - && tmpTopicConfigMap.get(tmpConfig.getInlongGroupId()) == null) { - IdTopicConfig tmpConfig2 = new IdTopicConfig(); - tmpConfig2.setInlongGroupIdAndStreamId(groupId, ""); - tmpConfig.setTenantAndNameSpace(tenant, nameSpace); - tmpConfig2.setTopicName(topicName); - tmpConfig2.setDataType(tmpConfig.getDataType()); - tmpConfig2.setFieldDelimiter(tmpConfig.getFieldDelimiter()); - tmpConfig2.setFileDelimiter(tmpConfig.getFileDelimiter()); - tmpConfig2.setParams(tmpConfig.getParams()); - tmpTopicConfigMap.put(tmpConfig.getUid(), tmpConfig2); - } - } - return tmpTopicConfigMap; - } - /** - * store meta config to file - */ - private boolean storeConfigToFile(String inDataMd5, String metaJsonStr) { - boolean isSuccess = false; - String filePath = getFilePath(); - if (StringUtils.isBlank(filePath)) { - LOG.error("Error in writing file {} as the file path is null.", getFileName()); - return isSuccess; - } - readWriteLock.writeLock().lock(); - try { - File sourceFile = new File(filePath); - File targetFile = new File(getNextBackupFileName()); - File tmpNewFile = new File(getFileName() + ".tmp"); - - if (sourceFile.exists()) { - FileUtils.copyFile(sourceFile, targetFile); - } - FileUtils.writeStringToFile(tmpNewFile, metaJsonStr, StandardCharsets.UTF_8); - FileUtils.copyFile(tmpNewFile, sourceFile); - tmpNewFile.delete(); - tmpDataMd5 = inDataMd5; - lastSyncVersion.set(System.currentTimeMillis()); - isSuccess = true; - setFileChanged(); - } catch (Throwable ex) { - LOG.error("Error in writing file {}", getFileName(), ex); - } finally { - readWriteLock.writeLock().unlock(); - } - return isSuccess; - } - - /** - * load from holder + * load configure from holder + * + * @return the configure info */ private String loadConfigFromFile() { String result = ""; if (StringUtils.isBlank(getFileName())) { - LOG.error("Fail to load json {} as the file name is null.", getFileName()); + LOG.error("Load metadata: fail to load json {} as the file name is null.", getFileName()); return result; } InputStream inStream = null; @@ -479,12 +364,12 @@ public class MetaConfigHolder extends ConfigHolder { URL url = getClass().getClassLoader().getResource(getFileName()); inStream = url != null ? url.openStream() : null; if (inStream == null) { - LOG.error("Fail to load json {} as the input stream is null", getFileName()); + LOG.error("Load metadata: fail to load json {} as the input stream is null", getFileName()); return result; } int size = inStream.available(); if (size > MAX_ALLOWED_JSON_FILE_SIZE) { - LOG.error("Fail to load json {} as the content size({}) over max allowed size({})", + LOG.error("Load metadata: fail to load json {} as the content size({}) over max allowed size({})", getFileName(), size, MAX_ALLOWED_JSON_FILE_SIZE); return result; } @@ -492,16 +377,45 @@ public class MetaConfigHolder extends ConfigHolder { inStream.read(buffer); result = new String(buffer, StandardCharsets.UTF_8); } catch (Throwable e) { - LOG.error("Fail to load json {}", getFileName(), e); + LOG.error("Load metadata: exception thrown while load from file {}", getFileName(), e); } finally { if (null != inStream) { try { inStream.close(); } catch (IOException e) { - LOG.error("Fail in inStream.close for file {}", getFileName(), e); + LOG.error("Load metadata: fail in inStream.close for file {}", getFileName(), e); } } } return result; } + + /** + * check required fields status + * + * @param metaConfig response from Manager + * + * @return check result + */ + public ImmutablePair<Boolean, String> validRequiredFields(InLongMetaConfig metaConfig) { + if (metaConfig == null) { + return ImmutablePair.of(false, "metaConfig object is null"); + } else if (metaConfig.getMd5() == null) { + return ImmutablePair.of(false, "metaConfig.md5 field is null"); + } else if (metaConfig.getMqType() == null) { + return ImmutablePair.of(false, "metaConfig.mqType field is null"); + } else if (metaConfig.getMqType() == CacheType.N) { + return ImmutablePair.of(false, "metaConfig.mqType value is CacheType.N"); + } else if (metaConfig.getClusterConfigMap() == null) { + return ImmutablePair.of(false, "metaConfig.clusterConfigMap field is null"); + } else if (metaConfig.getClusterConfigMap().isEmpty()) { + return ImmutablePair.of(false, "metaConfig.clusterConfigMap field is empty"); + } else if (metaConfig.getIdTopicConfigMap() == null) { + return ImmutablePair.of(false, "metaConfig.idTopicConfigMap field is null"); + } else if (metaConfig.getIdTopicConfigMap().isEmpty()) { + return ImmutablePair.of(false, "metaConfig.idTopicConfigMap is empty"); + } + return ImmutablePair.of(true, "ok"); + } + } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java index 094aee59b3..6c48467c1e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java @@ -21,6 +21,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * @@ -94,4 +95,23 @@ public class CacheClusterConfig { .append("params", params) .toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CacheClusterConfig)) { + return false; + } + CacheClusterConfig that = (CacheClusterConfig) o; + return Objects.equals(clusterName, that.clusterName) + && Objects.equals(token, that.token) + && Objects.equals(params, that.params); + } + + @Override + public int hashCode() { + return Objects.hash(clusterName, token, params); + } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java index 0d8a5430d4..2ac72981a2 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java @@ -73,7 +73,16 @@ public enum CacheType { */ public static CacheType convert(String value) { for (CacheType v : values()) { - if (v.value().equals(value)) { + if (v.value().equalsIgnoreCase(value)) { + return v; + } + } + return N; + } + + public static CacheType valueOf(int idValue) { + for (CacheType v : values()) { + if (v.getId() == idValue) { return v; } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java index 48b19d446d..8d2a01915d 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java @@ -18,6 +18,8 @@ package org.apache.inlong.dataproxy.config.pojo; import org.apache.inlong.common.enums.DataTypeEnum; +import org.apache.inlong.common.enums.InlongCompressType; +import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.sdk.commons.protocol.InlongId; import org.apache.commons.lang.StringUtils; @@ -25,6 +27,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * IdTopicConfig @@ -42,6 +45,8 @@ public class IdTopicConfig { private String fieldDelimiter = "|"; private String fileDelimiter = "\n"; private Boolean useExtendedFields = false; + private MessageWrapType msgWrapType = MessageWrapType.UNKNOWN; + private InlongCompressType v1CompressType = InlongCompressType.INLONG_SNAPPY; private Map<String, String> params = new HashMap<>(); @@ -49,7 +54,7 @@ public class IdTopicConfig { } - public Boolean getUseExtendedFields() { + public boolean isUseExtendedFields() { return useExtendedFields; } @@ -57,6 +62,22 @@ public class IdTopicConfig { this.useExtendedFields = useExtendedFields; } + public MessageWrapType getMsgWrapType() { + return msgWrapType; + } + + public void setMsgWrapType(MessageWrapType msgWrapType) { + this.msgWrapType = msgWrapType; + } + + public InlongCompressType getV1CompressType() { + return v1CompressType; + } + + public void setV1CompressType(InlongCompressType v1CompressType) { + this.v1CompressType = v1CompressType; + } + /** * get uid * @return the uid @@ -221,11 +242,42 @@ public class IdTopicConfig { .append("inlongGroupId", inlongGroupId) .append("inlongStreamid", inlongStreamid) .append("topicName", topicName) + .append("tenant", tenant) .append("nameSpace", nameSpace) .append("dataType", dataType) .append("fieldDelimiter", fieldDelimiter) .append("fileDelimiter", fileDelimiter) + .append("useExtendedFields", useExtendedFields) + .append("msgWrapType", msgWrapType) + .append("pbCompressType", v1CompressType) .append("params", params) .toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IdTopicConfig)) { + return false; + } + IdTopicConfig that = (IdTopicConfig) o; + return uid.equals(that.uid) && Objects.equals(inlongGroupId, that.inlongGroupId) + && Objects.equals(inlongStreamid, that.inlongStreamid) && topicName.equals(that.topicName) + && Objects.equals(tenant, that.tenant) && Objects.equals(nameSpace, that.nameSpace) + && dataType == that.dataType && Objects.equals(fieldDelimiter, that.fieldDelimiter) + && Objects.equals(fileDelimiter, that.fileDelimiter) + && Objects.equals(useExtendedFields, that.useExtendedFields) + && Objects.equals(msgWrapType, that.msgWrapType) + && v1CompressType == that.v1CompressType + && Objects.equals(params, that.params); + } + + @Override + public int hashCode() { + return Objects.hash(uid, inlongGroupId, inlongStreamid, topicName, tenant, nameSpace, + dataType, fieldDelimiter, fileDelimiter, useExtendedFields, msgWrapType, + v1CompressType, params); + } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java new file mode 100644 index 0000000000..4d80ee4d13 --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/InLongMetaConfig.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.config.pojo; + +import org.apache.commons.lang.builder.ToStringBuilder; + +import java.util.Map; + +public class InLongMetaConfig { + + private String md5; + private CacheType mqType; + private Map<String, CacheClusterConfig> clusterConfigMap; + private Map<String, IdTopicConfig> idTopicConfigMap; + + public InLongMetaConfig() { + + } + + public InLongMetaConfig(String md5, CacheType mqType, + Map<String, CacheClusterConfig> clusterConfigMap, + Map<String, IdTopicConfig> idTopicConfigMap) { + this.md5 = md5; + this.mqType = mqType; + this.clusterConfigMap = clusterConfigMap; + this.idTopicConfigMap = idTopicConfigMap; + } + + public String getMd5() { + return md5; + } + + public CacheType getMqType() { + return mqType; + } + + public Map<String, CacheClusterConfig> getClusterConfigMap() { + return clusterConfigMap; + } + + public Map<String, IdTopicConfig> getIdTopicConfigMap() { + return idTopicConfigMap; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("md5", md5) + .append("mqType", mqType) + .append("clusterConfigMap", clusterConfigMap) + .append("idTopicConfigMap", idTopicConfigMap) + .toString(); + } +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java similarity index 51% rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java index 3eb12d6a8c..9ead853540 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/SourceConstants.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.dataproxy.source; +package org.apache.inlong.dataproxy.consts; public class SourceConstants { @@ -105,95 +105,4 @@ public class SourceConstants { public static final String SRC_PROTOCOL_TYPE_TCP = "tcp"; public static final String SRC_PROTOCOL_TYPE_UDP = "udp"; public static final String SRC_PROTOCOL_TYPE_HTTP = "http"; - - public static final String SERVICE_PROCESSOR_NAME = "service-decoder-name"; - public static final String ENABLE_EXCEPTION_RETURN = "enableExceptionReturn"; - - public static final String TRAFFIC_CLASS = "trafficClass"; - - public static final String HEART_INTERVAL_SEC = "heart-interval-sec"; - - public static final String PACKAGE_TIMEOUT_SEC = "package-timeout-sec"; - - public static final String HEART_SERVERS = "heart-servers"; - - public static final String TOPIC_KEY = "topic"; - public static final String REMOTE_IP_KEY = "srcIp"; - public static final String DATAPROXY_IP_KEY = "dpIp"; - public static final String MSG_ENCODE_VER = "msgEnType"; - public static final String REMOTE_IDC_KEY = "idc"; - public static final String MSG_COUNTER_KEY = "msgcnt"; - public static final String PKG_COUNTER_KEY = "pkgcnt"; - public static final String PKG_TIME_KEY = "msg.pkg.time"; - public static final String TRANSFER_KEY = "transfer"; - public static final String DEST_IP_KEY = "dstIp"; - public static final String INTERFACE_KEY = "interface"; - public static final String SINK_MIN_METRIC_KEY = "sink-min-metric-topic"; - public static final String SINK_HOUR_METRIC_KEY = "sink-hour-metric-topic"; - public static final String SINK_TEN_METRIC_KEY = "sink-ten-metric-topic"; - public static final String SINK_QUA_METRIC_KEY = "sink-qua-metric-topic"; - public static final String L5_MIN_METRIC_KEY = "l5-min-metric-topic"; - public static final String L5_MIN_FAIL_METRIC_KEY = "l5-min-fail-metric-key"; - public static final String L5_HOUR_METRIC_KEY = "l5-hour-metric-topic"; - public static final String L5_ID_KEY = "l5id"; - public static final String SET_KEY = "set"; - public static final String CLUSTER_ID_KEY = "clusterId"; - - public static final String DECODER_BODY = "body"; - public static final String DECODER_TOPICKEY = "topic_key"; - public static final String DECODER_ATTRS = "attrs"; - public static final String MSG_TYPE = "msg_type"; - public static final String COMPRESS_TYPE = "compress_type"; - public static final String EXTRA_ATTR = "extra_attr"; - public static final String COMMON_ATTR_MAP = "common_attr_map"; - public static final String MSG_LIST = "msg_list"; - public static final String VERSION_TYPE = "version"; - public static final String FILE_CHECK_DATA = "file-check-data"; - public static final String MINUTE_CHECK_DATA = "minute-check-data"; - public static final String SLA_METRIC_DATA = "sla-metric-data"; - public static final String SLA_METRIC_GROUPID = "manager_sla_metric"; - - public static final String FILE_BODY = "file-body"; - public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024; - - public static final String SEQUENCE_ID = "sequencial_id"; - - public static final String TOTAL_LEN = "totalLen"; - - public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count"; - public static final String SESSION_WARN_DELAYED_MSG_COUNT = "session_warn_delayed_msg_count"; - public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count"; - public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark"; - public static final String RECOVER_THREAD_COUNT = "recover_thread_count"; - - public static final String MANAGER_PATH = "/inlong/manager/openapi"; - public static final String MANAGER_GET_CONFIG_PATH = "/dataproxy/getConfig"; - public static final String MANAGER_GET_ALL_CONFIG_PATH = "/dataproxy/getAllConfig"; - public static final String MANAGER_HEARTBEAT_REPORT = "/heartbeat/report"; - - public static final String MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; - public static final String MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; - // Pulsar config - public static final String KEY_TENANT = "tenant"; - public static final String KEY_NAMESPACE = "namespace"; - - public static final String KEY_SERVICE_URL = "serviceUrl"; - public static final String KEY_AUTHENTICATION = "authentication"; - public static final String KEY_STATS_INTERVAL_SECONDS = "statsIntervalSeconds"; - - public static final String KEY_ENABLEBATCHING = "enableBatching"; - public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes"; - public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages"; - public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay"; - public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages"; - public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions"; - public static final String KEY_SENDTIMEOUT = "sendTimeout"; - public static final String KEY_COMPRESSIONTYPE = "compressionType"; - public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull"; - public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter" - + "BatchingPartitionSwitchFrequency"; - - public static final String KEY_IOTHREADS = "ioThreads"; - public static final String KEY_MEMORYLIMIT = "memoryLimit"; - public static final String KEY_CONNECTIONSPERBROKER = "connectionsPerBroker"; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java index c317bba799..acf0e4d8a4 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java @@ -25,6 +25,7 @@ import org.apache.inlong.dataproxy.config.CommonConfigHolder; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; import org.apache.inlong.dataproxy.consts.AttrConstants; +import org.apache.inlong.dataproxy.consts.SourceConstants; import org.apache.inlong.dataproxy.consts.StatConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java index d68a8b4df2..517c11113b 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java @@ -17,6 +17,8 @@ package org.apache.inlong.dataproxy.source; +import org.apache.inlong.dataproxy.consts.SourceConstants; + import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java index 64375733fd..3a90e9f10a 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleHttpSource.java @@ -18,6 +18,7 @@ package org.apache.inlong.dataproxy.source; import org.apache.inlong.dataproxy.config.ConfigManager; +import org.apache.inlong.dataproxy.consts.SourceConstants; import org.apache.inlong.dataproxy.utils.ConfStringUtils; import com.google.common.base.Preconditions; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java index 5455cdddb4..b343dc4dcb 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java @@ -18,6 +18,7 @@ package org.apache.inlong.dataproxy.source; import org.apache.inlong.dataproxy.config.ConfigManager; +import org.apache.inlong.dataproxy.consts.SourceConstants; import org.apache.inlong.dataproxy.utils.ConfStringUtils; import org.apache.inlong.dataproxy.utils.EventLoopUtil; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java index fef78bdd1b..380fc3dab9 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java @@ -18,6 +18,7 @@ package org.apache.inlong.dataproxy.source; import org.apache.inlong.dataproxy.config.ConfigManager; +import org.apache.inlong.dataproxy.consts.SourceConstants; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelOption; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java index 71516da004..04322fe339 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/AddressUtils.java @@ -17,7 +17,7 @@ package org.apache.inlong.dataproxy.utils; -import org.apache.inlong.dataproxy.source.SourceConstants; +import org.apache.inlong.dataproxy.consts.SourceConstants; import io.netty.channel.Channel; import org.apache.commons.lang3.StringUtils; diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json b/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json index 074be5958c..b9de51bafb 100644 --- a/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json +++ b/inlong-dataproxy/dataproxy-source/src/test/resources/metadata.json @@ -1 +1 @@ -{"result":true,"errCode":0,"md5":"5a3f5939bb7368f493bf41c1d785b8f3","data":{"proxyCluster":{"name":"test_dataproxy","setName":"test_set","zone":"default\u003dtrue","channels":[],"inlongIds":[{"inlongId":"test_group.stream1","topic":"stream1","params":{"namespace":"test_group","ignoreParseError":"true","appGroupName":"app_test_group","productId":"58","productName":"test_meta","wrapWithInlongMsg":"true"}}],"sources":[],"sinks":[]},"cacheClusterSet":{"setName":"test_set","type":"TUBEMQ","ca [...] +{"md5":"5a3f5939bb7368f493bf41c1d785b8f3","mqType":"TUBE","clusterConfigMap":{"test_tubemq":{"clusterName": "test_tubemq","token": "******","zone": "default=true","params": {"masterWebUrl": "http://127.0.0.1:8080","messageQueueHandler": "org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler","master-host-port-list": "127.0.0.1:8000"}}},"idTopicConfigMap":{"test_group.stream1":{"uid":"test_group.stream1","inlongGroupId":"test_group","inlongStreamid":"stream1","topicName":"test_group","data [...] \ No newline at end of file