This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e68c69f986 [INLONG-10208][Sort] ClsSink support unified configuration 
(#10220)
e68c69f986 is described below

commit e68c69f986df57f04d0ef0ba067db07ae2912f4e
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed May 15 15:54:21 2024 +0800

    [INLONG-10208][Sort] ClsSink support unified configuration (#10220)
    
    * [INLONG-10208][Sort] ClsSink support unified configuration
---
 .../inlong/common/pojo/sort/SortClusterConfig.java |   4 +
 .../inlong/common/pojo/sort/SortTaskConfig.java    |   4 +
 .../common/pojo/sort/dataflow/DataFlowConfig.java  |   6 +
 .../resource/sort/DefaultSortConfigOperator.java   |   2 +
 .../inlong/sort/standalone/sink/SinkContext.java   |   1 +
 .../sort/standalone/sink/cls/ClsIdConfig.java      |  49 +++--
 .../inlong/sort/standalone/sink/cls/ClsSink.java   |  26 +--
 .../sort/standalone/sink/cls/ClsSinkContext.java   | 167 +++--------------
 .../sort/standalone/sink/{ => v2}/SinkContext.java | 108 +----------
 .../source/sortsdk/v2/SortSdkSource.java           | 205 +++++++++++++++++++++
 .../sort/standalone/sink/cls/TestClsIdConfig.java  |   4 +-
 .../sink/cls/TestDefaultEvent2LogItemHandler.java  |   3 +-
 12 files changed, 296 insertions(+), 283 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
index 13109d1871..ff3d337cb1 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
@@ -20,14 +20,18 @@ package org.apache.inlong.common.pojo.sort;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
 
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
 
 @Data
 @Builder
+@AllArgsConstructor
+@NoArgsConstructor
 public class SortClusterConfig implements Serializable {
 
     private String clusterTag;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
index e8795fb2be..b8f80ef26f 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
@@ -19,14 +19,18 @@ package org.apache.inlong.common.pojo.sort;
 
 import org.apache.inlong.common.pojo.sort.node.NodeConfig;
 
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
 
 @Data
 @Builder
+@AllArgsConstructor
+@NoArgsConstructor
 public class SortTaskConfig implements Serializable {
 
     private String sortTaskName;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
index 7429a78be3..a7bb1c36a3 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java
@@ -19,18 +19,24 @@ package org.apache.inlong.common.pojo.sort.dataflow;
 
 import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
 
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
 import java.util.Map;
 
 @Data
 @Builder
+@AllArgsConstructor
+@NoArgsConstructor
 public class DataFlowConfig implements Serializable {
 
     private String dataflowId;
     private Integer version;
+    private String inlongGroupId;
+    private String inlongStreamId;
     private SourceConfig sourceConfig;
     private SinkConfig sinkConfig;
     private Map<String, String> properties;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index b712ff11ee..5c51c1abcf 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -161,6 +161,8 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
                 .dataflowId(String.valueOf(sink.getId()))
                 .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
                 .sinkConfig(getSinkConfig(sink))
+                .inlongGroupId(groupInfo.getInlongGroupId())
+                .inlongStreamId(streamInfo.getInlongStreamId())
                 .build();
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 7e8b26e3ca..16b15ab8b9 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -41,6 +41,7 @@ import java.util.TimerTask;
  * 
  * SinkContext
  */
+@Deprecated
 public class SinkContext {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(SinkContext.class);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index faefaa6aa1..f0fd784acb 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -17,16 +17,26 @@
 
 package org.apache.inlong.sort.standalone.sink.cls;
 
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig;
+import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Cls config of each uid.
  */
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class ClsIdConfig {
 
     private String inlongGroupId;
@@ -36,23 +46,28 @@ public class ClsIdConfig {
     private String secretId;
     private String secretKey;
     private String topicId;
-    private String fieldNames;
+    private List<String> fieldList;
     private int fieldOffset = 2;
     private int contentOffset = 0;
-    private List<String> fieldList;
 
-    /**
-     * Parse fieldNames to list of fields.
-     * @return List of fields.
-     */
-    public List<String> getFieldList() {
-        if (fieldList == null) {
-            this.fieldList = new ArrayList<>();
-            if (fieldNames != null) {
-                String[] fieldNameArray = fieldNames.split("\\s+");
-                this.fieldList.addAll(Arrays.asList(fieldNameArray));
-            }
-        }
-        return fieldList;
+    public static ClsIdConfig create(DataFlowConfig dataFlowConfig, 
ClsNodeConfig nodeConfig) {
+        ClsSinkConfig sinkConfig = (ClsSinkConfig) 
dataFlowConfig.getSinkConfig();
+        List<String> fields = sinkConfig.getFieldConfigs()
+                .stream()
+                .map(FieldConfig::getName)
+                .collect(Collectors.toList());
+        return ClsIdConfig.builder()
+                .inlongGroupId(dataFlowConfig.getInlongGroupId())
+                .inlongStreamId(dataFlowConfig.getInlongStreamId())
+                .contentOffset(sinkConfig.getContentOffset())
+                .fieldOffset(sinkConfig.getFieldOffset())
+                .separator(sinkConfig.getSeparator())
+                .fieldList(fields)
+                .topicId(sinkConfig.getTopicId())
+                .endpoint(nodeConfig.getEndpoint())
+                .secretId(nodeConfig.getSendSecretId())
+                .secretKey(nodeConfig.getSendSecretKey())
+                .build();
     }
+
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
index 30289489e4..e959b249fe 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java
@@ -27,13 +27,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-/**
- * Cls Sink implementation.
- *
- * <p>
- *     Response for initialization of {@link ClsChannelWorker}.
- * </p>
- */
 public class ClsSink extends AbstractSink implements Configurable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class);
@@ -42,9 +35,6 @@ public class ClsSink extends AbstractSink implements 
Configurable {
     private ClsSinkContext context;
     private List<ClsChannelWorker> workers = new ArrayList<>();
 
-    /**
-     * Start {@link ClsChannelWorker}.
-     */
     @Override
     public void start() {
         super.start();
@@ -57,13 +47,10 @@ public class ClsSink extends AbstractSink implements 
Configurable {
                 worker.start();
             }
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to start cls sink, ex={}", e.getMessage(), e);
         }
     }
 
-    /**
-     * Stop {@link ClsChannelWorker}.
-     */
     @Override
     public void stop() {
         super.stop();
@@ -74,24 +61,15 @@ public class ClsSink extends AbstractSink implements 
Configurable {
             }
             this.workers.clear();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to stop cls sink, ex={}", e.getMessage(), e);
         }
     }
 
-    /**
-     * Process.
-     * @return Status
-     * @throws EventDeliveryException
-     */
     @Override
     public Status process() throws EventDeliveryException {
         return Status.BACKOFF;
     }
 
-    /**
-     * Config parent context.
-     * @param context Parent context.
-     */
     @Override
     public void configure(Context context) {
         LOG.info("start to configure:{}, context:{}.", this.getName(), 
context.toString());
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 6d05717243..fef43cadfc 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -17,18 +17,18 @@
 
 package org.apache.inlong.sort.standalone.sink.cls;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
-import org.apache.inlong.sort.standalone.utils.Constants;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.tencentcloudapi.cls.producer.AsyncProducerClient;
@@ -41,16 +41,13 @@ import org.apache.flume.Context;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-/**
- * Cls sink context.
- */
 public class ClsSinkContext extends SinkContext {
 
     private static final Logger LOG = 
InlongLoggerFactory.getLogger(ClsSinkContext.class);
@@ -74,20 +71,16 @@ public class ClsSinkContext extends SinkContext {
 
     private final Map<String, AsyncProducerClient> clientMap;
     private List<AsyncProducerClient> deletingClients = new ArrayList<>();
-    private Context sinkContext;
     private Map<String, ClsIdConfig> idConfigMap = new ConcurrentHashMap<>();
     private IEvent2LogItemHandler event2LogItemHandler;
+    private ClsNodeConfig clsNodeConfig;
+    private ObjectMapper objectMapper;
 
-    /**
-     * Constructor
-     *
-     * @param sinkName Name of sink.
-     * @param context  Basic context.
-     * @param channel  Channel which worker acquire profile event from.
-     */
     public ClsSinkContext(String sinkName, Context context, Channel channel) {
         super(sinkName, context, channel);
         this.clientMap = new ConcurrentHashMap<>();
+        this.objectMapper = new ObjectMapper();
+        
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
     }
 
     @Override
@@ -104,26 +97,25 @@ public class ClsSinkContext extends SinkContext {
                 }
             });
 
-            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             if (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig)) {
                 return;
             }
             LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
-                    new ObjectMapper().writeValueAsString(newSortTaskConfig));
+                    objectMapper.writeValueAsString(newSortTaskConfig));
             this.sortTaskConfig = newSortTaskConfig;
-            this.sinkContext = new 
Context(this.sortTaskConfig.getSinkParams());
+            ClsNodeConfig requestNodeConfig = (ClsNodeConfig) 
sortTaskConfig.getNodeConfig();
+            this.clsNodeConfig =
+                    requestNodeConfig.getVersion() >= 
clsNodeConfig.getVersion() ? requestNodeConfig : clsNodeConfig;
+            this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
             this.reloadIdParams();
             this.reloadClients();
             this.reloadHandler();
-            this.keywordMaxLength = 
sinkContext.getInteger(KEY_MAX_KEYWORD_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH);
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
-    /**
-     * Reload LogItemHandler.
-     */
     private void reloadHandler() {
         String logItemHandlerClass = 
CommonPropertiesHolder.getString(KEY_EVENT_LOG_ITEM_HANDLER,
                 DefaultEvent2LogItemHandler.class.getName());
@@ -136,47 +128,22 @@ public class ClsSinkContext extends SinkContext {
                 LOG.error("{} is not the instance of IEvent2LogItemHandler", 
logItemHandlerClass);
             }
         } catch (Throwable t) {
-            LOG.error("Fail to init IEvent2LogItemHandler, handlerClass:{}, 
error:{}",
+            LOG.error("fail to init IEvent2LogItemHandler, handlerClass:{}, 
error:{}",
                     logItemHandlerClass, t.getMessage());
         }
     }
 
-    /**
-     * Reload id params.
-     *
-     * @throws JsonProcessingException
-     */
-    private void reloadIdParams() throws JsonProcessingException {
-        List<Map<String, String>> idList = this.sortTaskConfig.getIdParams();
-        Map<String, ClsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
-        ObjectMapper objectMapper = new ObjectMapper();
-        
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-        for (Map<String, String> idParam : idList) {
-            String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
-            String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
-            String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
-            String jsonIdConfig = objectMapper.writeValueAsString(idParam);
-            ClsIdConfig idConfig = objectMapper.readValue(jsonIdConfig, 
ClsIdConfig.class);
-            idConfig.getFieldList();
-            newIdConfigMap.put(uid, idConfig);
-        }
-        this.idConfigMap = newIdConfigMap;
+    private void reloadIdParams() {
+        this.idConfigMap = this.sortTaskConfig.getClusters()
+                .stream()
+                .map(SortClusterConfig::getDataFlowConfigs)
+                .flatMap(Collection::stream)
+                .map(dataFlowConfig -> ClsIdConfig.create(dataFlowConfig, 
clsNodeConfig))
+                .collect(Collectors.toMap(
+                        config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+                        v -> v));
     }
 
-    /**
-     * Close expire clients and start new clients.
-     *
-     * <p>
-     * Each client response for data of one secretId.
-     * </p>
-     * <p>
-     * First, find all secretId that are in the active clientMap but not in 
the updated id config (or to say EXPIRE
-     * secretId), and put those clients into deletingClientsMap. The real 
close process will be done at the beginning of
-     * next period of reloading. Second, find all secretIds that in the 
updated id config but not in the active
-     * clientMap(or to say NEW secretId), and start new clients for these 
secretId and put them into the active
-     * clientMap.
-     * </p>
-     */
     private void reloadClients() {
         // get update secretIds
         Map<String, ClsIdConfig> updateConfigMap = idConfigMap.values()
@@ -196,76 +163,25 @@ public class ClsSinkContext extends SinkContext {
                 .forEach(this::startNewClient);
     }
 
-    /**
-     * Start new cls client and put it to the active clientMap.
-     *
-     * @param idConfig idConfig of new client.
-     */
     private void startNewClient(ClsIdConfig idConfig) {
         AsyncProducerConfig producerConfig = new AsyncProducerConfig(
                 idConfig.getEndpoint(),
                 idConfig.getSecretId(),
                 idConfig.getSecretKey(),
                 NetworkUtils.getLocalMachineIP());
-        this.setCommonClientConfig(producerConfig);
         AsyncProducerClient client = new AsyncProducerClient(producerConfig);
         clientMap.put(idConfig.getSecretId(), client);
     }
 
-    /**
-     * Get common client config from context and set them.
-     *
-     * @param config Config to be set.
-     */
-    private void setCommonClientConfig(AsyncProducerConfig config) {
-        Optional.ofNullable(sinkContext.getInteger(KEY_TOTAL_SIZE_IN_BYTES))
-                .ifPresent(config::setTotalSizeInBytes);
-        Optional.ofNullable(sinkContext.getInteger(KEY_MAX_SEND_THREAD_COUNT))
-                .ifPresent(config::setSendThreadCount);
-        Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BLOCK_SEC))
-                .ifPresent(config::setMaxBlockMs);
-        Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_SIZE))
-                .ifPresent(config::setBatchSizeThresholdInBytes);
-        Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_COUNT))
-                .ifPresent(config::setBatchCountThreshold);
-        Optional.ofNullable(sinkContext.getInteger(KEY_LINGER_MS))
-                .ifPresent(config::setLingerMs);
-        Optional.ofNullable(sinkContext.getInteger(KEY_RETRIES))
-                .ifPresent(config::setRetries);
-        Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RESERVED_ATTEMPTS))
-                .ifPresent(config::setMaxReservedAttempts);
-        Optional.ofNullable(sinkContext.getInteger(KEY_BASE_RETRY_BACKOFF_MS))
-                .ifPresent(config::setBaseRetryBackoffMs);
-        Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RETRY_BACKOFF_MS))
-                .ifPresent(config::setMaxRetryBackoffMs);
-    }
-
-    /**
-     * Remove expire client from active clientMap and into the deleting client 
list.
-     * <P>
-     * The reason why not close client when it remove from clientMap is to 
avoid <b>Race Condition</b>. Which will
-     * happen when worker thread get the client and ready to send msg, while 
the reload thread try to close it.
-     * </P>
-     *
-     * @param secretId SecretId of expire client.
-     */
     private void removeExpireClient(String secretId) {
         AsyncProducerClient client = clientMap.get(secretId);
         if (client == null) {
-            LOG.error("Remove client failed, there is not client of {}", 
secretId);
+            LOG.error("remove client failed, there is not client of {}", 
secretId);
             return;
         }
         deletingClients.add(clientMap.remove(secretId));
     }
 
-    /**
-     * Add send result.
-     *
-     * @param currentRecord Event to be sent.
-     * @param bid           Topic or dest ip of event.
-     * @param result        Result of send.
-     * @param sendTime      Time of sending.
-     */
     public void addSendResultMetric(ProfileEvent currentRecord, String bid, 
boolean result, long sendTime) {
         Map<String, String> dimensions = this.getDimensions(currentRecord, 
bid);
         SortMetricItem metricItem = 
this.getMetricItemSet().findMetricItem(dimensions);
@@ -288,13 +204,6 @@ public class ClsSinkContext extends SinkContext {
         }
     }
 
-    /**
-     * Get report dimensions.
-     *
-     * @param  currentRecord Event.
-     * @param  bid  Topic or dest ip.
-     * @return  Prepared dimensions map.
-     */
     private Map<String, String> getDimensions(ProfileEvent currentRecord, 
String bid) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
@@ -309,40 +218,18 @@ public class ClsSinkContext extends SinkContext {
         return dimensions;
     }
 
-    /**
-     * Get {@link ClsIdConfig} by uid.
-     *
-     * @param  uid Uid of event.
-     * @return  Corresponding cls id config.
-     */
     public ClsIdConfig getIdConfig(String uid) {
         return idConfigMap.get(uid);
     }
 
-    /**
-     * Get max length of single value.
-     *
-     * @return Max length of single value.
-     */
     public int getKeywordMaxLength() {
         return keywordMaxLength;
     }
 
-    /**
-     * Get LogItem handler.
-     *
-     * @return Handler.
-     */
     public IEvent2LogItemHandler getLogItemHandler() {
         return event2LogItemHandler;
     }
 
-    /**
-     * Get cls client.
-     *
-     * @param  secretId ID of client.
-     * @return  Client instance.
-     */
     public AsyncProducerClient getClient(String secretId) {
         return clientMap.get(secretId);
     }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
similarity index 75%
copy from 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
copy to 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
index 7e8b26e3ca..251a6d56af 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.sink;
+package org.apache.inlong.sort.standalone.sink.v2;
 
 import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
@@ -37,84 +37,57 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 
-/**
- * 
- * SinkContext
- */
 public class SinkContext {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(SinkContext.class);
-
     public static final String KEY_MAX_THREADS = "maxThreads";
     public static final String KEY_PROCESSINTERVAL = "processInterval";
     public static final String KEY_RELOADINTERVAL = "reloadInterval";
     public static final String KEY_TASK_NAME = "taskName";
     public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = 
"maxBufferQueueSizeKb";
     public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
-
     protected final String clusterId;
     protected final String taskName;
     protected final String sinkName;
     protected final Context sinkContext;
-
     protected SortTaskConfig sortTaskConfig;
-
     protected final Channel channel;
-    //
     protected final int maxThreads;
     protected final long processInterval;
     protected final long reloadInterval;
-    //
     protected final SortMetricItemSet metricItemSet;
     protected Timer reloadTimer;
 
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param context
-     * @param channel
-     */
     public SinkContext(String sinkName, Context context, Channel channel) {
         this.sinkName = sinkName;
         this.sinkContext = context;
         this.channel = channel;
-        this.clusterId = 
context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
-        this.taskName = context.getString(KEY_TASK_NAME);
+        this.clusterId = 
sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+        this.taskName = sinkContext.getString(KEY_TASK_NAME);
         this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
         this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 
100);
         this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
-        //
         this.metricItemSet = new SortMetricItemSet(sinkName);
         MetricRegister.register(this.metricItemSet);
     }
 
-    /**
-     * start
-     */
     public void start() {
         try {
             this.reload();
             this.setReloadTimer();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to start sink context", e);
         }
     }
 
-    /**
-     * close
-     */
     public void close() {
         try {
             this.reloadTimer.cancel();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to close sink context", e);
         }
     }
 
-    /**
-     * setReloadTimer
-     */
     protected void setReloadTimer() {
         reloadTimer = new Timer(true);
         TimerTask task = new TimerTask() {
@@ -126,113 +99,54 @@ public class SinkContext {
         reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
reloadInterval), reloadInterval);
     }
 
-    /**
-     * reload
-     */
     public void reload() {
         try {
-            this.sortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            this.sortTaskConfig = SortConfigHolder.getTaskConfig(taskName);
         } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to stop sink context", e);
         }
     }
 
-    /**
-     * get clusterId
-     * 
-     * @return the clusterId
-     */
     public String getClusterId() {
         return clusterId;
     }
 
-    /**
-     * get taskName
-     * 
-     * @return the taskName
-     */
     public String getTaskName() {
         return taskName;
     }
 
-    /**
-     * get sinkName
-     * 
-     * @return the sinkName
-     */
     public String getSinkName() {
         return sinkName;
     }
 
-    /**
-     * get sinkContext
-     * 
-     * @return the sinkContext
-     */
     public Context getSinkContext() {
         return sinkContext;
     }
 
-    /**
-     * get sortTaskConfig
-     * 
-     * @return the sortTaskConfig
-     */
     public SortTaskConfig getSortTaskConfig() {
         return sortTaskConfig;
     }
 
-    /**
-     * get channel
-     * 
-     * @return the channel
-     */
     public Channel getChannel() {
         return channel;
     }
 
-    /**
-     * get maxThreads
-     * 
-     * @return the maxThreads
-     */
     public int getMaxThreads() {
         return maxThreads;
     }
 
-    /**
-     * get processInterval
-     * 
-     * @return the processInterval
-     */
     public long getProcessInterval() {
         return processInterval;
     }
 
-    /**
-     * get reloadInterval
-     * 
-     * @return the reloadInterval
-     */
     public long getReloadInterval() {
         return reloadInterval;
     }
 
-    /**
-     * get metricItemSet
-     * 
-     * @return the metricItemSet
-     */
     public SortMetricItemSet getMetricItemSet() {
         return metricItemSet;
     }
 
-    /**
-     * fillInlongId
-     *
-     * @param currentRecord
-     * @param dimensions
-     */
     public static void fillInlongId(ProfileEvent currentRecord, Map<String, 
String> dimensions) {
         String inlongGroupId = currentRecord.getInlongGroupId();
         inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : 
inlongGroupId;
@@ -242,10 +156,6 @@ public class SinkContext {
         dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
     }
 
-    /**
-     * createBufferQueue
-     * @return
-     */
     public static <U> BufferQueue<U> createBufferQueue() {
         int maxBufferQueueSizeKb = 
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
                 DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
new file mode 100644
index 0000000000..43245da9d0
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk.v2;
+
+import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
+import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import 
org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig;
+import 
org.apache.inlong.sort.standalone.source.sortsdk.DefaultTopicChangeListener;
+import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback;
+import org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceContext;
+import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default Source implementation of InLong.
+ *
+ * <p>
+ * SortSdkSource acquired msg from different upstream data store by register 
{@link SortClient} for each sort task. The
+ * only things SortSdkSource should do is to get one client by the sort task 
id, or remove one client when the task is
+ * finished or schedule to other source instance.
+ * </p>
+ *
+ * <p>
+ * The Default Manager of InLong will schedule the partition and topic 
automatically.
+ * </p>
+ *
+ * <p>
+ * Because all sources should implement {@link Configurable}, the 
SortSdkSource should have default constructor
+ * <b>WITHOUT</b> any arguments, and parameters will be configured by {@link 
Configurable#configure(Context)}.
+ * </p>
+ */
+public final class SortSdkSource extends AbstractSource
+        implements
+            Configurable,
+            Runnable,
+            EventDrivenSource,
+            ConsumerServiceMBean {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SortSdkSource.class);
+    public static final String SORT_SDK_PREFIX = "sortsdk.";
+    private static final int CORE_POOL_SIZE = 1;
+    private static final SortClientConfig.ConsumeStrategy defaultStrategy = 
SortClientConfig.ConsumeStrategy.lastest;
+    private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
+    private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
+    private String taskName;
+    private SortSdkSourceContext context;
+    private String sortClusterName;
+    private long reloadInterval;
+    private ScheduledExecutorService pool;
+
+    private List<SortClient> sortClients = new ArrayList<>();
+
+    @Override
+    public synchronized void start() {
+        int sortSdkClientNum = 
CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, 
DEFAULT_SORT_SDK_CLIENT_NUM);
+        LOG.info("start SortSdkSource:{}, client num is {}", taskName, 
sortSdkClientNum);
+        for (int i = 0; i < sortSdkClientNum; i++) {
+            SortClient client = this.newClient(taskName);
+            if (client != null) {
+                this.sortClients.add(client);
+            }
+        }
+    }
+
+    @Override
+    public void stop() {
+        pool.shutdownNow();
+        LOG.info("close sort client {}.", taskName);
+        for (SortClient sortClient : sortClients) {
+            sortClient.getConfig().setStopConsume(true);
+            sortClient.close();
+        }
+    }
+
+    @Override
+    public void run() {
+        LOG.info("start to reload SortSdkSource:{}", taskName);
+        for (SortClient sortClient : sortClients) {
+            
sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
+        }
+    }
+
+    @Override
+    public void configure(Context context) {
+        this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME);
+        this.context = new SortSdkSourceContext(getName(), context);
+        this.sortClusterName = 
SortConfigHolder.getSortConfig().getSortClusterName();
+        this.reloadInterval = this.context.getReloadInterval();
+        this.initReloadExecutor();
+        // register
+        AdminServiceRegister.register(ConsumerServiceMBean.MBEAN_TYPE, 
taskName, this);
+    }
+
+    private void initReloadExecutor() {
+        this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
+        pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, 
TimeUnit.SECONDS);
+    }
+
+    private SortClient newClient(final String sortTaskName) {
+        LOG.info("start a new sort client for task: {}", sortTaskName);
+        try {
+            final SortClientConfig clientConfig = new 
SortClientConfig(sortTaskName, this.sortClusterName,
+                    new DefaultTopicChangeListener(),
+                    SortSdkSource.defaultStrategy, 
InetAddress.getLocalHost().getHostAddress());
+            final FetchCallback callback = 
FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context);
+            clientConfig.setCallback(callback);
+            Map<String, String> sortSdkParams = 
this.getSortClientConfigParameters();
+            clientConfig.setParameters(sortSdkParams);
+
+            // create SortClient
+            String configType = CommonPropertiesHolder
+                    .getString(SortSourceConfigType.KEY_TYPE, 
SortSourceConfigType.MANAGER.name());
+            SortClient client = null;
+            if 
(SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) {
+                LOG.info("create sort sdk client in file way:{}", configType);
+                ClassResourceQueryConsumeConfig queryConfig = new 
ClassResourceQueryConsumeConfig();
+                client = SortClientFactory.createSortClient(clientConfig, 
queryConfig);
+            } else if 
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) {
+                LOG.info("create sort sdk client in manager way:{}", 
configType);
+                
clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
+                client = SortClientFactory.createSortClient(clientConfig);
+            } else {
+                LOG.info("create sort sdk client in custom way:{}", 
configType);
+                Class<?> loaderClass = ClassUtils.getClass(configType);
+                Object loaderObject = 
loaderClass.getDeclaredConstructor().newInstance();
+                if (loaderObject instanceof Configurable) {
+                    ((Configurable) loaderObject).configure(new 
Context(CommonPropertiesHolder.get()));
+                }
+                if (!(loaderObject instanceof QueryConsumeConfig)) {
+                    LOG.error("got exception when create QueryConsumeConfig 
instance, config key:{},config class:{}",
+                            SortSourceConfigType.KEY_TYPE, configType);
+                    return null;
+                }
+                // if it specifies the type of QueryConsumeConfig.
+                client = SortClientFactory.createSortClient(clientConfig, 
(QueryConsumeConfig) loaderObject);
+            }
+
+            client.init();
+            callback.setClient(client);
+            return client;
+        } catch (Throwable th) {
+            LOG.error("got one throwable when init client of id:{}", 
sortTaskName, th);
+        }
+        return null;
+    }
+
+    private Map<String, String> getSortClientConfigParameters() {
+        Map<String, String> commonParams = 
CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
+        return new HashMap<>(commonParams);
+    }
+
+    @Override
+    public void stopConsumer() {
+        for (SortClient sortClient : sortClients) {
+            sortClient.getConfig().setStopConsume(true);
+        }
+    }
+
+    @Override
+    public void recoverConsumer() {
+        for (SortClient sortClient : sortClients) {
+            sortClient.getConfig().setStopConsume(false);
+        }
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
index 270760f317..bfeb0adde6 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java
@@ -23,6 +23,7 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.List;
 
 @RunWith(PowerMockRunner.class)
@@ -32,8 +33,7 @@ public class TestClsIdConfig {
     @Test
     public void testGetFieldList() {
         ClsIdConfig idConfig = new ClsIdConfig();
-        String testFieldName = "1 2 3 4 5 6 7";
-        idConfig.setFieldNames(testFieldName);
+        idConfig.setFieldList(Arrays.asList("1", "2", "3", "4", "5", "6", 
"7"));
         List<String> fieldList = idConfig.getFieldList();
         Assert.assertEquals(7, fieldList.size());
     }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
index 512a349190..efdf3cb52c 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java
@@ -32,6 +32,7 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,7 +70,7 @@ public class TestDefaultEvent2LogItemHandler {
 
     private ClsIdConfig prepareIdConfig() {
         ClsIdConfig config = new ClsIdConfig();
-        config.setFieldNames("f1 f2 f3 f4 f5 f6 f7 f8");
+        config.setFieldList(Arrays.asList("f1", "f2", "f3", "f4", "f5", "f6", 
"f7", "f8"));
         config.setInlongGroupId("testGroup");
         config.setInlongStreamId("testStream");
         config.setSecretId("testSecretId");


Reply via email to