This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d698b5cde6 [INLONG-10527][Sort] EsSink support switch metadata acquire mode (#10552) d698b5cde6 is described below commit d698b5cde64f9e591a8f8f92c66b35c1a62f966a Author: vernedeng <verned...@apache.org> AuthorDate: Tue Jul 2 14:19:31 2024 +0800 [INLONG-10527][Sort] EsSink support switch metadata acquire mode (#10552) * [INLONG-10527][Sort] EsSink support switch metadata acquire mode --- .../sink/elasticsearch/EsSinkContext.java | 267 +++++++++++++-------- .../src/test/java/common.properties | 1 + .../src/test/resources/common.properties | 2 +- 3 files changed, 171 insertions(+), 99 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java index 6357dc8330..66ad584427 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java @@ -20,16 +20,21 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import org.apache.inlong.common.pojo.sort.ClusterTagConfig; import org.apache.inlong.common.pojo.sort.TaskConfig; import org.apache.inlong.common.pojo.sort.node.EsNodeConfig; +import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; import org.apache.inlong.sort.standalone.config.pojo.InlongId; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils; import org.apache.inlong.sort.standalone.sink.SinkContext; import org.apache.inlong.sort.standalone.utils.BufferQueue; +import org.apache.inlong.sort.standalone.utils.Constants; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; @@ -49,7 +54,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** - * * EsSinkContext */ public class EsSinkContext extends SinkContext { @@ -114,14 +118,6 @@ public class EsSinkContext extends SinkContext { private String strHttpHosts; private HttpHost[] httpHosts; - /** - * Constructor - * - * @param sinkName - * @param context - * @param channel - * @param dispatchQueue - */ public EsSinkContext(String sinkName, Context context, Channel channel, BufferQueue<EsIndexRequest> dispatchQueue) { super(sinkName, context, channel); @@ -138,61 +134,30 @@ public class EsSinkContext extends SinkContext { LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}", taskName, dispatchQueue.size(), offerCounter.getAndSet(0), takeCounter.getAndSet(0), backCounter.getAndSet(0)); - TaskConfig newSortTaskConfig = SortConfigHolder.getTaskConfig(taskName); - if (this.taskConfig != null && this.taskConfig.equals(newSortTaskConfig)) { + TaskConfig newTaskConfig = SortConfigHolder.getTaskConfig(taskName); + SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName); + if ((newTaskConfig == null || newTaskConfig.equals(taskConfig)) + && (newSortTaskConfig == null || newSortTaskConfig.equals(sortTaskConfig))) { return; } - LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName, - objectMapper.writeValueAsString(newSortTaskConfig)); - this.taskConfig = newSortTaskConfig; - EsNodeConfig requestNodeConfig = (EsNodeConfig) taskConfig.getNodeConfig(); + LOG.info("get new SortTaskConfig:taskName:{}", taskName); + + EsNodeConfig requestNodeConfig = (EsNodeConfig) newTaskConfig.getNodeConfig(); if (esNodeConfig == null || requestNodeConfig.getVersion() > esNodeConfig.getVersion()) { this.esNodeConfig = requestNodeConfig; } - Map<String, String> properties = this.taskConfig.getNodeConfig().getProperties(); - this.sinkContext = new Context(properties != null ? properties : new HashMap<>()); + this.taskConfig = newTaskConfig; + this.sortTaskConfig = newSortTaskConfig; + // change current config - this.idConfigMap = this.taskConfig.getClusterTagConfigs() - .stream() - .map(ClusterTagConfig::getDataFlowConfigs) - .flatMap(Collection::stream) - .map(EsIdConfig::create) - .collect(Collectors.toMap( - config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()), - v -> v, - (flow1, flow2) -> flow1)); - // rest client - this.username = esNodeConfig.getUsername(); - this.password = esNodeConfig.getPassword(); - this.bulkAction = esNodeConfig.getBulkAction(); - this.bulkSizeMb = esNodeConfig.getBulkSizeMb(); - this.flushInterval = esNodeConfig.getFlushInterval(); - this.concurrentRequests = esNodeConfig.getConcurrentRequests(); - this.maxConnect = esNodeConfig.getMaxConnect(); - this.keywordMaxLength = esNodeConfig.getKeywordMaxLength(); - this.isUseIndexId = esNodeConfig.getIsUseIndexId(); - - this.maxConnectPerRoute = sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, DEFAULT_MAX_CONNECT_PER_ROUTE); - this.connectionRequestTimeout = - sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, DEFAULT_CONNECTION_REQUEST_TIMEOUT); - this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); - this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, DEFAULT_MAX_REDIRECTS); - this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, DEFAULT_LOG_MAX_LENGTH); - // http host - this.strHttpHosts = esNodeConfig.getHttpHosts(); - if (!StringUtils.isBlank(strHttpHosts)) { - String[] strHttpHostArray = strHttpHosts.split("\\s+"); - List<HttpHost> newHttpHosts = new ArrayList<>(strHttpHostArray.length); - for (String strHttpHost : strHttpHostArray) { - String[] ipPort = strHttpHost.split(":"); - if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) { - newHttpHosts.add(new HttpHost(ipPort[0], NumberUtils.toInt(ipPort[1]))); - } - } - if (newHttpHosts.size() > 0) { - HttpHost[] newHostHostArray = new HttpHost[newHttpHosts.size()]; - this.httpHosts = newHttpHosts.toArray(newHostHostArray); - } + Map<String, EsIdConfig> fromTaskConfig = reloadIdParamsFromTaskConfig(taskConfig); + Map<String, EsIdConfig> fromSortTaskConfig = reloadIdParamsFromSortTaskConfig(sortTaskConfig); + if (unifiedConfiguration) { + idConfigMap = fromTaskConfig; + reloadClientsFromNodeConfig(esNodeConfig); + } else { + idConfigMap = fromSortTaskConfig; + reloadClientsFromSortTaskConfig(sortTaskConfig); } // log LOG.info("end to get SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName, @@ -202,9 +167,115 @@ public class EsSinkContext extends SinkContext { } } + private Map<String, EsIdConfig> reloadIdParamsFromTaskConfig(TaskConfig taskConfig) { + if (taskConfig == null) { + return new HashMap<>(); + } + return taskConfig.getClusterTagConfigs() + .stream() + .map(ClusterTagConfig::getDataFlowConfigs) + .flatMap(Collection::stream) + .map(EsIdConfig::create) + .collect(Collectors.toMap( + config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()), + v -> v, + (flow1, flow2) -> flow1)); + } + + private Map<String, EsIdConfig> reloadIdParamsFromSortTaskConfig(SortTaskConfig sortTaskConfig) + throws JsonProcessingException { + if (sortTaskConfig == null) { + return new HashMap<>(); + } + Map<String, EsIdConfig> newIdConfigMap = new ConcurrentHashMap<>(); + List<Map<String, String>> idList = this.sortTaskConfig.getIdParams(); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + for (Map<String, String> idParam : idList) { + String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID); + String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID); + String uid = InlongId.generateUid(inlongGroupId, inlongStreamId); + String jsonIdConfig = objectMapper.writeValueAsString(idParam); + EsIdConfig idConfig = objectMapper.readValue(jsonIdConfig, EsIdConfig.class); + newIdConfigMap.put(uid, idConfig); + } + return newIdConfigMap; + } + + private void reloadClientsFromNodeConfig(EsNodeConfig esNodeConfig) { + Map<String, String> properties = esNodeConfig.getProperties(); + this.sinkContext = new Context(properties != null ? properties : new HashMap<>()); + this.username = esNodeConfig.getUsername(); + this.password = esNodeConfig.getPassword(); + this.bulkAction = esNodeConfig.getBulkAction(); + this.bulkSizeMb = esNodeConfig.getBulkSizeMb(); + this.flushInterval = esNodeConfig.getFlushInterval(); + this.concurrentRequests = esNodeConfig.getConcurrentRequests(); + this.maxConnect = esNodeConfig.getMaxConnect(); + this.keywordMaxLength = esNodeConfig.getKeywordMaxLength(); + this.isUseIndexId = esNodeConfig.getIsUseIndexId(); + + this.maxConnectPerRoute = sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, DEFAULT_MAX_CONNECT_PER_ROUTE); + this.connectionRequestTimeout = + sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, DEFAULT_CONNECTION_REQUEST_TIMEOUT); + this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); + this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, DEFAULT_MAX_REDIRECTS); + this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, DEFAULT_LOG_MAX_LENGTH); + // http host + this.strHttpHosts = esNodeConfig.getHttpHosts(); + if (!StringUtils.isBlank(strHttpHosts)) { + String[] strHttpHostArray = strHttpHosts.split("\\s+"); + List<HttpHost> newHttpHosts = new ArrayList<>(strHttpHostArray.length); + for (String strHttpHost : strHttpHostArray) { + String[] ipPort = strHttpHost.split(":"); + if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) { + newHttpHosts.add(new HttpHost(ipPort[0], NumberUtils.toInt(ipPort[1]))); + } + } + if (newHttpHosts.size() > 0) { + HttpHost[] newHostHostArray = new HttpHost[newHttpHosts.size()]; + this.httpHosts = newHttpHosts.toArray(newHostHostArray); + } + } + } + + private void reloadClientsFromSortTaskConfig(SortTaskConfig sortTaskConfig) { + this.sinkContext = new Context(sortTaskConfig.getSinkParams()); + this.username = sinkContext.getString(KEY_USERNAME); + this.password = sinkContext.getString(KEY_PASSWORD); + this.bulkAction = sinkContext.getInteger(KEY_BULK_ACTION, DEFAULT_BULK_ACTION); + this.bulkSizeMb = sinkContext.getInteger(KEY_BULK_SIZE_MB, DEFAULT_BULK_SIZE_MB); + this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL, DEFAULT_FLUSH_INTERVAL); + this.concurrentRequests = sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS); + this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL, DEFAULT_MAX_CONNECT_TOTAL); + + this.maxConnectPerRoute = sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, DEFAULT_MAX_CONNECT_PER_ROUTE); + this.connectionRequestTimeout = + sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, DEFAULT_CONNECTION_REQUEST_TIMEOUT); + this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); + this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, DEFAULT_MAX_REDIRECTS); + this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, DEFAULT_LOG_MAX_LENGTH); + // http host + this.strHttpHosts = sinkContext.getString(KEY_HTTP_HOSTS); + if (!StringUtils.isBlank(strHttpHosts)) { + String[] strHttpHostArray = strHttpHosts.split("\\s+"); + List<HttpHost> newHttpHosts = new ArrayList<>(strHttpHostArray.length); + for (String strHttpHost : strHttpHostArray) { + String[] ipPort = strHttpHost.split(":"); + if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) { + newHttpHosts.add(new HttpHost(ipPort[0], NumberUtils.toInt(ipPort[1]))); + } + } + if (newHttpHosts.size() > 0) { + HttpHost[] newHostHostArray = new HttpHost[newHttpHosts.size()]; + this.httpHosts = newHttpHosts.toArray(newHostHostArray); + } + } + } + /** * addSendMetric - * + * * @param currentRecord * @param bid */ @@ -242,7 +313,7 @@ public class EsSinkContext extends SinkContext { /** * addSendResultMetric - * + * * @param currentRecord * @param bid * @param result @@ -281,8 +352,8 @@ public class EsSinkContext extends SinkContext { /** * getIdConfig - * - * @param uid + * + * @param uid * @return */ public EsIdConfig getIdConfig(String uid) { @@ -291,7 +362,7 @@ public class EsSinkContext extends SinkContext { /** * get nodeId - * + * * @return the nodeId */ public String getNodeId() { @@ -300,7 +371,7 @@ public class EsSinkContext extends SinkContext { /** * get idConfigMap - * + * * @return the idConfigMap */ public Map<String, EsIdConfig> getIdConfigMap() { @@ -309,7 +380,7 @@ public class EsSinkContext extends SinkContext { /** * get sinkContext - * + * * @return the sinkContext */ public Context getSinkContext() { @@ -318,7 +389,7 @@ public class EsSinkContext extends SinkContext { /** * set sinkContext - * + * * @param sinkContext the sinkContext to set */ public void setSinkContext(Context sinkContext) { @@ -327,8 +398,8 @@ public class EsSinkContext extends SinkContext { /** * offerDispatchQueue - * - * @param indexRequest + * + * @param indexRequest * @return */ public void offerDispatchQueue(EsIndexRequest indexRequest) { @@ -339,7 +410,7 @@ public class EsSinkContext extends SinkContext { /** * takeDispatchQueue - * + * * @return */ public EsIndexRequest takeDispatchQueue() { @@ -352,8 +423,8 @@ public class EsSinkContext extends SinkContext { /** * backDispatchQueue - * - * @param indexRequest + * + * @param indexRequest * @return */ public void backDispatchQueue(EsIndexRequest indexRequest) { @@ -363,8 +434,8 @@ public class EsSinkContext extends SinkContext { /** * releaseDispatchQueue - * - * @param indexRequest + * + * @param indexRequest * @return */ public void releaseDispatchQueue(EsIndexRequest indexRequest) { @@ -373,7 +444,7 @@ public class EsSinkContext extends SinkContext { /** * get bulkAction - * + * * @return the bulkAction */ public int getBulkAction() { @@ -382,7 +453,7 @@ public class EsSinkContext extends SinkContext { /** * set bulkAction - * + * * @param bulkAction the bulkAction to set */ public void setBulkAction(int bulkAction) { @@ -391,7 +462,7 @@ public class EsSinkContext extends SinkContext { /** * get bulkSizeMb - * + * * @return the bulkSizeMb */ public int getBulkSizeMb() { @@ -400,7 +471,7 @@ public class EsSinkContext extends SinkContext { /** * set bulkSizeMb - * + * * @param bulkSizeMb the bulkSizeMb to set */ public void setBulkSizeMb(int bulkSizeMb) { @@ -409,7 +480,7 @@ public class EsSinkContext extends SinkContext { /** * get flushInterval - * + * * @return the flushInterval */ public int getFlushInterval() { @@ -418,7 +489,7 @@ public class EsSinkContext extends SinkContext { /** * set flushInterval - * + * * @param flushInterval the flushInterval to set */ public void setFlushInterval(int flushInterval) { @@ -427,7 +498,7 @@ public class EsSinkContext extends SinkContext { /** * get concurrentRequests - * + * * @return the concurrentRequests */ public int getConcurrentRequests() { @@ -436,7 +507,7 @@ public class EsSinkContext extends SinkContext { /** * set concurrentRequests - * + * * @param concurrentRequests the concurrentRequests to set */ public void setConcurrentRequests(int concurrentRequests) { @@ -445,7 +516,7 @@ public class EsSinkContext extends SinkContext { /** * get maxConnect - * + * * @return the maxConnect */ public int getMaxConnect() { @@ -489,7 +560,7 @@ public class EsSinkContext extends SinkContext { /** * set maxConnect - * + * * @param maxConnect the maxConnect to set */ public void setMaxConnect(int maxConnect) { @@ -498,7 +569,7 @@ public class EsSinkContext extends SinkContext { /** * get strHttpHosts - * + * * @return the strHttpHosts */ public String getStrHttpHosts() { @@ -507,7 +578,7 @@ public class EsSinkContext extends SinkContext { /** * set strHttpHosts - * + * * @param strHttpHosts the strHttpHosts to set */ public void setStrHttpHosts(String strHttpHosts) { @@ -516,7 +587,7 @@ public class EsSinkContext extends SinkContext { /** * get httpHosts - * + * * @return the httpHosts */ public HttpHost[] getHttpHosts() { @@ -525,7 +596,7 @@ public class EsSinkContext extends SinkContext { /** * set httpHosts - * + * * @param httpHosts the httpHosts to set */ public void setHttpHosts(HttpHost[] httpHosts) { @@ -534,7 +605,7 @@ public class EsSinkContext extends SinkContext { /** * set nodeId - * + * * @param nodeId the nodeId to set */ public void setNodeId(String nodeId) { @@ -543,7 +614,7 @@ public class EsSinkContext extends SinkContext { /** * set idConfigMap - * + * * @param idConfigMap the idConfigMap to set */ public void setIdConfigMap(Map<String, EsIdConfig> idConfigMap) { @@ -552,7 +623,7 @@ public class EsSinkContext extends SinkContext { /** * get username - * + * * @return the username */ public String getUsername() { @@ -561,7 +632,7 @@ public class EsSinkContext extends SinkContext { /** * set username - * + * * @param username the username to set */ public void setUsername(String username) { @@ -570,7 +641,7 @@ public class EsSinkContext extends SinkContext { /** * get password - * + * * @return the password */ public String getPassword() { @@ -579,7 +650,7 @@ public class EsSinkContext extends SinkContext { /** * set password - * + * * @param password the password to set */ public void setPassword(String password) { @@ -588,7 +659,7 @@ public class EsSinkContext extends SinkContext { /** * get keywordMaxLength - * + * * @return the keywordMaxLength */ public int getKeywordMaxLength() { @@ -597,7 +668,7 @@ public class EsSinkContext extends SinkContext { /** * set keywordMaxLength - * + * * @param keywordMaxLength the keywordMaxLength to set */ public void setKeywordMaxLength(int keywordMaxLength) { @@ -606,7 +677,7 @@ public class EsSinkContext extends SinkContext { /** * get isUseIndexId - * + * * @return the isUseIndexId */ public boolean isUseIndexId() { @@ -615,7 +686,7 @@ public class EsSinkContext extends SinkContext { /** * set isUseIndexId - * + * * @param isUseIndexId the isUseIndexId to set */ public void setUseIndexId(boolean isUseIndexId) { @@ -624,7 +695,7 @@ public class EsSinkContext extends SinkContext { /** * create indexRequestHandler - * + * * @return the indexRequestHandler */ public IEvent2IndexRequestHandler createIndexRequestHandler() { diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties index 5f79465039..d7a0a2978e 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties @@ -28,6 +28,7 @@ sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassReso sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler +useUnifiedConfiguration=true maxThreads=10 reloadInterval=60000 diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties index 5f79465039..e448826938 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties +++ b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties @@ -28,7 +28,7 @@ sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassReso sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler - +useUnifiedConfiguration=true maxThreads=10 reloadInterval=60000 processInterval=100