This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d698b5cde6 [INLONG-10527][Sort] EsSink support switch metadata acquire 
mode (#10552)
d698b5cde6 is described below

commit d698b5cde64f9e591a8f8f92c66b35c1a62f966a
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Jul 2 14:19:31 2024 +0800

    [INLONG-10527][Sort] EsSink support switch metadata acquire mode (#10552)
    
    * [INLONG-10527][Sort] EsSink support switch metadata acquire mode
---
 .../sink/elasticsearch/EsSinkContext.java          | 267 +++++++++++++--------
 .../src/test/java/common.properties                |   1 +
 .../src/test/resources/common.properties           |   2 +-
 3 files changed, 171 insertions(+), 99 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 6357dc8330..66ad584427 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -20,16 +20,21 @@ package 
org.apache.inlong.sort.standalone.sink.elasticsearch;
 import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
 import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -49,7 +54,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /**
- * 
  * EsSinkContext
  */
 public class EsSinkContext extends SinkContext {
@@ -114,14 +118,6 @@ public class EsSinkContext extends SinkContext {
     private String strHttpHosts;
     private HttpHost[] httpHosts;
 
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param context
-     * @param channel
-     * @param dispatchQueue
-     */
     public EsSinkContext(String sinkName, Context context, Channel channel,
             BufferQueue<EsIndexRequest> dispatchQueue) {
         super(sinkName, context, channel);
@@ -138,61 +134,30 @@ public class EsSinkContext extends SinkContext {
             LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}",
                     taskName, dispatchQueue.size(), offerCounter.getAndSet(0),
                     takeCounter.getAndSet(0), backCounter.getAndSet(0));
-            TaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
-            if (this.taskConfig != null && 
this.taskConfig.equals(newSortTaskConfig)) {
+            TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
+                    && (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig))) {
                 return;
             }
-            LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
-                    objectMapper.writeValueAsString(newSortTaskConfig));
-            this.taskConfig = newSortTaskConfig;
-            EsNodeConfig requestNodeConfig = (EsNodeConfig) 
taskConfig.getNodeConfig();
+            LOG.info("get new SortTaskConfig:taskName:{}", taskName);
+
+            EsNodeConfig requestNodeConfig = (EsNodeConfig) 
newTaskConfig.getNodeConfig();
             if (esNodeConfig == null || requestNodeConfig.getVersion() > 
esNodeConfig.getVersion()) {
                 this.esNodeConfig = requestNodeConfig;
             }
-            Map<String, String> properties = 
this.taskConfig.getNodeConfig().getProperties();
-            this.sinkContext = new Context(properties != null ? properties : 
new HashMap<>());
+            this.taskConfig = newTaskConfig;
+            this.sortTaskConfig = newSortTaskConfig;
+
             // change current config
-            this.idConfigMap = this.taskConfig.getClusterTagConfigs()
-                    .stream()
-                    .map(ClusterTagConfig::getDataFlowConfigs)
-                    .flatMap(Collection::stream)
-                    .map(EsIdConfig::create)
-                    .collect(Collectors.toMap(
-                            config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
-                            v -> v,
-                            (flow1, flow2) -> flow1));
-            // rest client
-            this.username = esNodeConfig.getUsername();
-            this.password = esNodeConfig.getPassword();
-            this.bulkAction = esNodeConfig.getBulkAction();
-            this.bulkSizeMb = esNodeConfig.getBulkSizeMb();
-            this.flushInterval = esNodeConfig.getFlushInterval();
-            this.concurrentRequests = esNodeConfig.getConcurrentRequests();
-            this.maxConnect = esNodeConfig.getMaxConnect();
-            this.keywordMaxLength = esNodeConfig.getKeywordMaxLength();
-            this.isUseIndexId = esNodeConfig.getIsUseIndexId();
-
-            this.maxConnectPerRoute = 
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, 
DEFAULT_MAX_CONNECT_PER_ROUTE);
-            this.connectionRequestTimeout =
-                    sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, 
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
-            this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, 
DEFAULT_SOCKET_TIMEOUT);
-            this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, 
DEFAULT_MAX_REDIRECTS);
-            this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, 
DEFAULT_LOG_MAX_LENGTH);
-            // http host
-            this.strHttpHosts = esNodeConfig.getHttpHosts();
-            if (!StringUtils.isBlank(strHttpHosts)) {
-                String[] strHttpHostArray = strHttpHosts.split("\\s+");
-                List<HttpHost> newHttpHosts = new 
ArrayList<>(strHttpHostArray.length);
-                for (String strHttpHost : strHttpHostArray) {
-                    String[] ipPort = strHttpHost.split(":");
-                    if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) 
{
-                        newHttpHosts.add(new HttpHost(ipPort[0], 
NumberUtils.toInt(ipPort[1])));
-                    }
-                }
-                if (newHttpHosts.size() > 0) {
-                    HttpHost[] newHostHostArray = new 
HttpHost[newHttpHosts.size()];
-                    this.httpHosts = newHttpHosts.toArray(newHostHostArray);
-                }
+            Map<String, EsIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig);
+            Map<String, EsIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+            if (unifiedConfiguration) {
+                idConfigMap = fromTaskConfig;
+                reloadClientsFromNodeConfig(esNodeConfig);
+            } else {
+                idConfigMap = fromSortTaskConfig;
+                reloadClientsFromSortTaskConfig(sortTaskConfig);
             }
             // log
             LOG.info("end to get 
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,
@@ -202,9 +167,115 @@ public class EsSinkContext extends SinkContext {
         }
     }
 
+    private Map<String, EsIdConfig> reloadIdParamsFromTaskConfig(TaskConfig 
taskConfig) {
+        if (taskConfig == null) {
+            return new HashMap<>();
+        }
+        return taskConfig.getClusterTagConfigs()
+                .stream()
+                .map(ClusterTagConfig::getDataFlowConfigs)
+                .flatMap(Collection::stream)
+                .map(EsIdConfig::create)
+                .collect(Collectors.toMap(
+                        config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+                        v -> v,
+                        (flow1, flow2) -> flow1));
+    }
+
+    private Map<String, EsIdConfig> 
reloadIdParamsFromSortTaskConfig(SortTaskConfig sortTaskConfig)
+            throws JsonProcessingException {
+        if (sortTaskConfig == null) {
+            return new HashMap<>();
+        }
+        Map<String, EsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+        List<Map<String, String>> idList = this.sortTaskConfig.getIdParams();
+        ObjectMapper objectMapper = new ObjectMapper();
+        
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+        for (Map<String, String> idParam : idList) {
+            String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+            String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+            String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+            String jsonIdConfig = objectMapper.writeValueAsString(idParam);
+            EsIdConfig idConfig = objectMapper.readValue(jsonIdConfig, 
EsIdConfig.class);
+            newIdConfigMap.put(uid, idConfig);
+        }
+        return newIdConfigMap;
+    }
+
+    private void reloadClientsFromNodeConfig(EsNodeConfig esNodeConfig) {
+        Map<String, String> properties = esNodeConfig.getProperties();
+        this.sinkContext = new Context(properties != null ? properties : new 
HashMap<>());
+        this.username = esNodeConfig.getUsername();
+        this.password = esNodeConfig.getPassword();
+        this.bulkAction = esNodeConfig.getBulkAction();
+        this.bulkSizeMb = esNodeConfig.getBulkSizeMb();
+        this.flushInterval = esNodeConfig.getFlushInterval();
+        this.concurrentRequests = esNodeConfig.getConcurrentRequests();
+        this.maxConnect = esNodeConfig.getMaxConnect();
+        this.keywordMaxLength = esNodeConfig.getKeywordMaxLength();
+        this.isUseIndexId = esNodeConfig.getIsUseIndexId();
+
+        this.maxConnectPerRoute = 
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, 
DEFAULT_MAX_CONNECT_PER_ROUTE);
+        this.connectionRequestTimeout =
+                sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, 
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+        this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, 
DEFAULT_SOCKET_TIMEOUT);
+        this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, 
DEFAULT_MAX_REDIRECTS);
+        this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, 
DEFAULT_LOG_MAX_LENGTH);
+        // http host
+        this.strHttpHosts = esNodeConfig.getHttpHosts();
+        if (!StringUtils.isBlank(strHttpHosts)) {
+            String[] strHttpHostArray = strHttpHosts.split("\\s+");
+            List<HttpHost> newHttpHosts = new 
ArrayList<>(strHttpHostArray.length);
+            for (String strHttpHost : strHttpHostArray) {
+                String[] ipPort = strHttpHost.split(":");
+                if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) {
+                    newHttpHosts.add(new HttpHost(ipPort[0], 
NumberUtils.toInt(ipPort[1])));
+                }
+            }
+            if (newHttpHosts.size() > 0) {
+                HttpHost[] newHostHostArray = new 
HttpHost[newHttpHosts.size()];
+                this.httpHosts = newHttpHosts.toArray(newHostHostArray);
+            }
+        }
+    }
+
+    private void reloadClientsFromSortTaskConfig(SortTaskConfig 
sortTaskConfig) {
+        this.sinkContext = new Context(sortTaskConfig.getSinkParams());
+        this.username = sinkContext.getString(KEY_USERNAME);
+        this.password = sinkContext.getString(KEY_PASSWORD);
+        this.bulkAction = sinkContext.getInteger(KEY_BULK_ACTION, 
DEFAULT_BULK_ACTION);
+        this.bulkSizeMb = sinkContext.getInteger(KEY_BULK_SIZE_MB, 
DEFAULT_BULK_SIZE_MB);
+        this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL, 
DEFAULT_FLUSH_INTERVAL);
+        this.concurrentRequests = 
sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS);
+        this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL, 
DEFAULT_MAX_CONNECT_TOTAL);
+
+        this.maxConnectPerRoute = 
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, 
DEFAULT_MAX_CONNECT_PER_ROUTE);
+        this.connectionRequestTimeout =
+                sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, 
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+        this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, 
DEFAULT_SOCKET_TIMEOUT);
+        this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, 
DEFAULT_MAX_REDIRECTS);
+        this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, 
DEFAULT_LOG_MAX_LENGTH);
+        // http host
+        this.strHttpHosts = sinkContext.getString(KEY_HTTP_HOSTS);
+        if (!StringUtils.isBlank(strHttpHosts)) {
+            String[] strHttpHostArray = strHttpHosts.split("\\s+");
+            List<HttpHost> newHttpHosts = new 
ArrayList<>(strHttpHostArray.length);
+            for (String strHttpHost : strHttpHostArray) {
+                String[] ipPort = strHttpHost.split(":");
+                if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) {
+                    newHttpHosts.add(new HttpHost(ipPort[0], 
NumberUtils.toInt(ipPort[1])));
+                }
+            }
+            if (newHttpHosts.size() > 0) {
+                HttpHost[] newHostHostArray = new 
HttpHost[newHttpHosts.size()];
+                this.httpHosts = newHttpHosts.toArray(newHostHostArray);
+            }
+        }
+    }
+
     /**
      * addSendMetric
-     * 
+     *
      * @param currentRecord
      * @param bid
      */
@@ -242,7 +313,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * addSendResultMetric
-     * 
+     *
      * @param currentRecord
      * @param bid
      * @param result
@@ -281,8 +352,8 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * getIdConfig
-     * 
-     * @param  uid
+     *
+     * @param uid
      * @return
      */
     public EsIdConfig getIdConfig(String uid) {
@@ -291,7 +362,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get nodeId
-     * 
+     *
      * @return the nodeId
      */
     public String getNodeId() {
@@ -300,7 +371,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get idConfigMap
-     * 
+     *
      * @return the idConfigMap
      */
     public Map<String, EsIdConfig> getIdConfigMap() {
@@ -309,7 +380,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get sinkContext
-     * 
+     *
      * @return the sinkContext
      */
     public Context getSinkContext() {
@@ -318,7 +389,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set sinkContext
-     * 
+     *
      * @param sinkContext the sinkContext to set
      */
     public void setSinkContext(Context sinkContext) {
@@ -327,8 +398,8 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * offerDispatchQueue
-     * 
-     * @param  indexRequest
+     *
+     * @param indexRequest
      * @return
      */
     public void offerDispatchQueue(EsIndexRequest indexRequest) {
@@ -339,7 +410,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * takeDispatchQueue
-     * 
+     *
      * @return
      */
     public EsIndexRequest takeDispatchQueue() {
@@ -352,8 +423,8 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * backDispatchQueue
-     * 
-     * @param  indexRequest
+     *
+     * @param indexRequest
      * @return
      */
     public void backDispatchQueue(EsIndexRequest indexRequest) {
@@ -363,8 +434,8 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * releaseDispatchQueue
-     * 
-     * @param  indexRequest
+     *
+     * @param indexRequest
      * @return
      */
     public void releaseDispatchQueue(EsIndexRequest indexRequest) {
@@ -373,7 +444,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get bulkAction
-     * 
+     *
      * @return the bulkAction
      */
     public int getBulkAction() {
@@ -382,7 +453,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set bulkAction
-     * 
+     *
      * @param bulkAction the bulkAction to set
      */
     public void setBulkAction(int bulkAction) {
@@ -391,7 +462,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get bulkSizeMb
-     * 
+     *
      * @return the bulkSizeMb
      */
     public int getBulkSizeMb() {
@@ -400,7 +471,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set bulkSizeMb
-     * 
+     *
      * @param bulkSizeMb the bulkSizeMb to set
      */
     public void setBulkSizeMb(int bulkSizeMb) {
@@ -409,7 +480,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get flushInterval
-     * 
+     *
      * @return the flushInterval
      */
     public int getFlushInterval() {
@@ -418,7 +489,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set flushInterval
-     * 
+     *
      * @param flushInterval the flushInterval to set
      */
     public void setFlushInterval(int flushInterval) {
@@ -427,7 +498,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get concurrentRequests
-     * 
+     *
      * @return the concurrentRequests
      */
     public int getConcurrentRequests() {
@@ -436,7 +507,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set concurrentRequests
-     * 
+     *
      * @param concurrentRequests the concurrentRequests to set
      */
     public void setConcurrentRequests(int concurrentRequests) {
@@ -445,7 +516,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get maxConnect
-     * 
+     *
      * @return the maxConnect
      */
     public int getMaxConnect() {
@@ -489,7 +560,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set maxConnect
-     * 
+     *
      * @param maxConnect the maxConnect to set
      */
     public void setMaxConnect(int maxConnect) {
@@ -498,7 +569,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get strHttpHosts
-     * 
+     *
      * @return the strHttpHosts
      */
     public String getStrHttpHosts() {
@@ -507,7 +578,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set strHttpHosts
-     * 
+     *
      * @param strHttpHosts the strHttpHosts to set
      */
     public void setStrHttpHosts(String strHttpHosts) {
@@ -516,7 +587,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get httpHosts
-     * 
+     *
      * @return the httpHosts
      */
     public HttpHost[] getHttpHosts() {
@@ -525,7 +596,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set httpHosts
-     * 
+     *
      * @param httpHosts the httpHosts to set
      */
     public void setHttpHosts(HttpHost[] httpHosts) {
@@ -534,7 +605,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set nodeId
-     * 
+     *
      * @param nodeId the nodeId to set
      */
     public void setNodeId(String nodeId) {
@@ -543,7 +614,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set idConfigMap
-     * 
+     *
      * @param idConfigMap the idConfigMap to set
      */
     public void setIdConfigMap(Map<String, EsIdConfig> idConfigMap) {
@@ -552,7 +623,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get username
-     * 
+     *
      * @return the username
      */
     public String getUsername() {
@@ -561,7 +632,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set username
-     * 
+     *
      * @param username the username to set
      */
     public void setUsername(String username) {
@@ -570,7 +641,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get password
-     * 
+     *
      * @return the password
      */
     public String getPassword() {
@@ -579,7 +650,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set password
-     * 
+     *
      * @param password the password to set
      */
     public void setPassword(String password) {
@@ -588,7 +659,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get keywordMaxLength
-     * 
+     *
      * @return the keywordMaxLength
      */
     public int getKeywordMaxLength() {
@@ -597,7 +668,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set keywordMaxLength
-     * 
+     *
      * @param keywordMaxLength the keywordMaxLength to set
      */
     public void setKeywordMaxLength(int keywordMaxLength) {
@@ -606,7 +677,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * get isUseIndexId
-     * 
+     *
      * @return the isUseIndexId
      */
     public boolean isUseIndexId() {
@@ -615,7 +686,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * set isUseIndexId
-     * 
+     *
      * @param isUseIndexId the isUseIndexId to set
      */
     public void setUseIndexId(boolean isUseIndexId) {
@@ -624,7 +695,7 @@ public class EsSinkContext extends SinkContext {
 
     /**
      * create indexRequestHandler
-     * 
+     *
      * @return the indexRequestHandler
      */
     public IEvent2IndexRequestHandler createIndexRequestHandler() {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
index 5f79465039..d7a0a2978e 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
@@ -28,6 +28,7 @@ 
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassReso
 
sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
 
 
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler
+useUnifiedConfiguration=true
 
 maxThreads=10
 reloadInterval=60000
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
index 5f79465039..e448826938 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
@@ -28,7 +28,7 @@ 
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassReso
 
sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
 
 
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler
-
+useUnifiedConfiguration=true
 maxThreads=10
 reloadInterval=60000
 processInterval=100

Reply via email to