This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e68c69f986 [INLONG-10208][Sort] ClsSink support unified configuration (#10220) e68c69f986 is described below commit e68c69f986df57f04d0ef0ba067db07ae2912f4e Author: vernedeng <verned...@apache.org> AuthorDate: Wed May 15 15:54:21 2024 +0800 [INLONG-10208][Sort] ClsSink support unified configuration (#10220) * [INLONG-10208][Sort] ClsSink support unified configuration --- .../inlong/common/pojo/sort/SortClusterConfig.java | 4 + .../inlong/common/pojo/sort/SortTaskConfig.java | 4 + .../common/pojo/sort/dataflow/DataFlowConfig.java | 6 + .../resource/sort/DefaultSortConfigOperator.java | 2 + .../inlong/sort/standalone/sink/SinkContext.java | 1 + .../sort/standalone/sink/cls/ClsIdConfig.java | 49 +++-- .../inlong/sort/standalone/sink/cls/ClsSink.java | 26 +-- .../sort/standalone/sink/cls/ClsSinkContext.java | 167 +++-------------- .../sort/standalone/sink/{ => v2}/SinkContext.java | 108 +---------- .../source/sortsdk/v2/SortSdkSource.java | 205 +++++++++++++++++++++ .../sort/standalone/sink/cls/TestClsIdConfig.java | 4 +- .../sink/cls/TestDefaultEvent2LogItemHandler.java | 3 +- 12 files changed, 296 insertions(+), 283 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java index 13109d1871..ff3d337cb1 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java @@ -20,14 +20,18 @@ package org.apache.inlong.common.pojo.sort; import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.List; @Data @Builder +@AllArgsConstructor +@NoArgsConstructor public class SortClusterConfig implements Serializable { private String clusterTag; diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java index e8795fb2be..b8f80ef26f 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java @@ -19,14 +19,18 @@ package org.apache.inlong.common.pojo.sort; import org.apache.inlong.common.pojo.sort.node.NodeConfig; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.List; @Data @Builder +@AllArgsConstructor +@NoArgsConstructor public class SortTaskConfig implements Serializable { private String sortTaskName; diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java index 7429a78be3..a7bb1c36a3 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java @@ -19,18 +19,24 @@ package org.apache.inlong.common.pojo.sort.dataflow; import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Map; @Data @Builder +@AllArgsConstructor +@NoArgsConstructor public class DataFlowConfig implements Serializable { private String dataflowId; private Integer version; + private String inlongGroupId; + private String inlongStreamId; private SourceConfig sourceConfig; private SinkConfig sinkConfig; private Map<String, String> properties; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index b712ff11ee..5c51c1abcf 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -161,6 +161,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator { .dataflowId(String.valueOf(sink.getId())) .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink)) .sinkConfig(getSinkConfig(sink)) + .inlongGroupId(groupInfo.getInlongGroupId()) + .inlongStreamId(streamInfo.getInlongStreamId()) .build(); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java index 7e8b26e3ca..16b15ab8b9 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java @@ -41,6 +41,7 @@ import java.util.TimerTask; * * SinkContext */ +@Deprecated public class SinkContext { public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java index faefaa6aa1..f0fd784acb 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java @@ -17,16 +17,26 @@ package org.apache.inlong.sort.standalone.sink.cls; +import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; +import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig; +import org.apache.inlong.common.pojo.sort.dataflow.sink.ClsSinkConfig; +import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig; + +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Cls config of each uid. */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class ClsIdConfig { private String inlongGroupId; @@ -36,23 +46,28 @@ public class ClsIdConfig { private String secretId; private String secretKey; private String topicId; - private String fieldNames; + private List<String> fieldList; private int fieldOffset = 2; private int contentOffset = 0; - private List<String> fieldList; - /** - * Parse fieldNames to list of fields. - * @return List of fields. - */ - public List<String> getFieldList() { - if (fieldList == null) { - this.fieldList = new ArrayList<>(); - if (fieldNames != null) { - String[] fieldNameArray = fieldNames.split("\\s+"); - this.fieldList.addAll(Arrays.asList(fieldNameArray)); - } - } - return fieldList; + public static ClsIdConfig create(DataFlowConfig dataFlowConfig, ClsNodeConfig nodeConfig) { + ClsSinkConfig sinkConfig = (ClsSinkConfig) dataFlowConfig.getSinkConfig(); + List<String> fields = sinkConfig.getFieldConfigs() + .stream() + .map(FieldConfig::getName) + .collect(Collectors.toList()); + return ClsIdConfig.builder() + .inlongGroupId(dataFlowConfig.getInlongGroupId()) + .inlongStreamId(dataFlowConfig.getInlongStreamId()) + .contentOffset(sinkConfig.getContentOffset()) + .fieldOffset(sinkConfig.getFieldOffset()) + .separator(sinkConfig.getSeparator()) + .fieldList(fields) + .topicId(sinkConfig.getTopicId()) + .endpoint(nodeConfig.getEndpoint()) + .secretId(nodeConfig.getSendSecretId()) + .secretKey(nodeConfig.getSendSecretKey()) + .build(); } + } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java index 30289489e4..e959b249fe 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSink.java @@ -27,13 +27,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -/** - * Cls Sink implementation. - * - * <p> - * Response for initialization of {@link ClsChannelWorker}. - * </p> - */ public class ClsSink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(ClsSink.class); @@ -42,9 +35,6 @@ public class ClsSink extends AbstractSink implements Configurable { private ClsSinkContext context; private List<ClsChannelWorker> workers = new ArrayList<>(); - /** - * Start {@link ClsChannelWorker}. - */ @Override public void start() { super.start(); @@ -57,13 +47,10 @@ public class ClsSink extends AbstractSink implements Configurable { worker.start(); } } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("failed to start cls sink, ex={}", e.getMessage(), e); } } - /** - * Stop {@link ClsChannelWorker}. - */ @Override public void stop() { super.stop(); @@ -74,24 +61,15 @@ public class ClsSink extends AbstractSink implements Configurable { } this.workers.clear(); } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("failed to stop cls sink, ex={}", e.getMessage(), e); } } - /** - * Process. - * @return Status - * @throws EventDeliveryException - */ @Override public Status process() throws EventDeliveryException { return Status.BACKOFF; } - /** - * Config parent context. - * @param context Parent context. - */ @Override public void configure(Context context) { LOG.info("start to configure:{}, context:{}.", this.getName(), context.toString()); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java index 6d05717243..fef43cadfc 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java @@ -17,18 +17,18 @@ package org.apache.inlong.sort.standalone.sink.cls; -import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; +import org.apache.inlong.common.pojo.sort.SortClusterConfig; +import org.apache.inlong.common.pojo.sort.SortTaskConfig; +import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; -import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; import org.apache.inlong.sort.standalone.config.pojo.InlongId; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils; -import org.apache.inlong.sort.standalone.sink.SinkContext; -import org.apache.inlong.sort.standalone.utils.Constants; +import org.apache.inlong.sort.standalone.sink.v2.SinkContext; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.tencentcloudapi.cls.producer.AsyncProducerClient; @@ -41,16 +41,13 @@ import org.apache.flume.Context; import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -/** - * Cls sink context. - */ public class ClsSinkContext extends SinkContext { private static final Logger LOG = InlongLoggerFactory.getLogger(ClsSinkContext.class); @@ -74,20 +71,16 @@ public class ClsSinkContext extends SinkContext { private final Map<String, AsyncProducerClient> clientMap; private List<AsyncProducerClient> deletingClients = new ArrayList<>(); - private Context sinkContext; private Map<String, ClsIdConfig> idConfigMap = new ConcurrentHashMap<>(); private IEvent2LogItemHandler event2LogItemHandler; + private ClsNodeConfig clsNodeConfig; + private ObjectMapper objectMapper; - /** - * Constructor - * - * @param sinkName Name of sink. - * @param context Basic context. - * @param channel Channel which worker acquire profile event from. - */ public ClsSinkContext(String sinkName, Context context, Channel channel) { super(sinkName, context, channel); this.clientMap = new ConcurrentHashMap<>(); + this.objectMapper = new ObjectMapper(); + this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } @Override @@ -104,26 +97,25 @@ public class ClsSinkContext extends SinkContext { } }); - SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName); + SortTaskConfig newSortTaskConfig = SortConfigHolder.getTaskConfig(taskName); if (newSortTaskConfig == null || newSortTaskConfig.equals(sortTaskConfig)) { return; } LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName, - new ObjectMapper().writeValueAsString(newSortTaskConfig)); + objectMapper.writeValueAsString(newSortTaskConfig)); this.sortTaskConfig = newSortTaskConfig; - this.sinkContext = new Context(this.sortTaskConfig.getSinkParams()); + ClsNodeConfig requestNodeConfig = (ClsNodeConfig) sortTaskConfig.getNodeConfig(); + this.clsNodeConfig = + requestNodeConfig.getVersion() >= clsNodeConfig.getVersion() ? requestNodeConfig : clsNodeConfig; + this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH; this.reloadIdParams(); this.reloadClients(); this.reloadHandler(); - this.keywordMaxLength = sinkContext.getInteger(KEY_MAX_KEYWORD_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH); } catch (Exception e) { LOG.error(e.getMessage(), e); } } - /** - * Reload LogItemHandler. - */ private void reloadHandler() { String logItemHandlerClass = CommonPropertiesHolder.getString(KEY_EVENT_LOG_ITEM_HANDLER, DefaultEvent2LogItemHandler.class.getName()); @@ -136,47 +128,22 @@ public class ClsSinkContext extends SinkContext { LOG.error("{} is not the instance of IEvent2LogItemHandler", logItemHandlerClass); } } catch (Throwable t) { - LOG.error("Fail to init IEvent2LogItemHandler, handlerClass:{}, error:{}", + LOG.error("fail to init IEvent2LogItemHandler, handlerClass:{}, error:{}", logItemHandlerClass, t.getMessage()); } } - /** - * Reload id params. - * - * @throws JsonProcessingException - */ - private void reloadIdParams() throws JsonProcessingException { - List<Map<String, String>> idList = this.sortTaskConfig.getIdParams(); - Map<String, ClsIdConfig> newIdConfigMap = new ConcurrentHashMap<>(); - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - for (Map<String, String> idParam : idList) { - String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID); - String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID); - String uid = InlongId.generateUid(inlongGroupId, inlongStreamId); - String jsonIdConfig = objectMapper.writeValueAsString(idParam); - ClsIdConfig idConfig = objectMapper.readValue(jsonIdConfig, ClsIdConfig.class); - idConfig.getFieldList(); - newIdConfigMap.put(uid, idConfig); - } - this.idConfigMap = newIdConfigMap; + private void reloadIdParams() { + this.idConfigMap = this.sortTaskConfig.getClusters() + .stream() + .map(SortClusterConfig::getDataFlowConfigs) + .flatMap(Collection::stream) + .map(dataFlowConfig -> ClsIdConfig.create(dataFlowConfig, clsNodeConfig)) + .collect(Collectors.toMap( + config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()), + v -> v)); } - /** - * Close expire clients and start new clients. - * - * <p> - * Each client response for data of one secretId. - * </p> - * <p> - * First, find all secretId that are in the active clientMap but not in the updated id config (or to say EXPIRE - * secretId), and put those clients into deletingClientsMap. The real close process will be done at the beginning of - * next period of reloading. Second, find all secretIds that in the updated id config but not in the active - * clientMap(or to say NEW secretId), and start new clients for these secretId and put them into the active - * clientMap. - * </p> - */ private void reloadClients() { // get update secretIds Map<String, ClsIdConfig> updateConfigMap = idConfigMap.values() @@ -196,76 +163,25 @@ public class ClsSinkContext extends SinkContext { .forEach(this::startNewClient); } - /** - * Start new cls client and put it to the active clientMap. - * - * @param idConfig idConfig of new client. - */ private void startNewClient(ClsIdConfig idConfig) { AsyncProducerConfig producerConfig = new AsyncProducerConfig( idConfig.getEndpoint(), idConfig.getSecretId(), idConfig.getSecretKey(), NetworkUtils.getLocalMachineIP()); - this.setCommonClientConfig(producerConfig); AsyncProducerClient client = new AsyncProducerClient(producerConfig); clientMap.put(idConfig.getSecretId(), client); } - /** - * Get common client config from context and set them. - * - * @param config Config to be set. - */ - private void setCommonClientConfig(AsyncProducerConfig config) { - Optional.ofNullable(sinkContext.getInteger(KEY_TOTAL_SIZE_IN_BYTES)) - .ifPresent(config::setTotalSizeInBytes); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_SEND_THREAD_COUNT)) - .ifPresent(config::setSendThreadCount); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BLOCK_SEC)) - .ifPresent(config::setMaxBlockMs); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_SIZE)) - .ifPresent(config::setBatchSizeThresholdInBytes); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_BATCH_COUNT)) - .ifPresent(config::setBatchCountThreshold); - Optional.ofNullable(sinkContext.getInteger(KEY_LINGER_MS)) - .ifPresent(config::setLingerMs); - Optional.ofNullable(sinkContext.getInteger(KEY_RETRIES)) - .ifPresent(config::setRetries); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RESERVED_ATTEMPTS)) - .ifPresent(config::setMaxReservedAttempts); - Optional.ofNullable(sinkContext.getInteger(KEY_BASE_RETRY_BACKOFF_MS)) - .ifPresent(config::setBaseRetryBackoffMs); - Optional.ofNullable(sinkContext.getInteger(KEY_MAX_RETRY_BACKOFF_MS)) - .ifPresent(config::setMaxRetryBackoffMs); - } - - /** - * Remove expire client from active clientMap and into the deleting client list. - * <P> - * The reason why not close client when it remove from clientMap is to avoid <b>Race Condition</b>. Which will - * happen when worker thread get the client and ready to send msg, while the reload thread try to close it. - * </P> - * - * @param secretId SecretId of expire client. - */ private void removeExpireClient(String secretId) { AsyncProducerClient client = clientMap.get(secretId); if (client == null) { - LOG.error("Remove client failed, there is not client of {}", secretId); + LOG.error("remove client failed, there is not client of {}", secretId); return; } deletingClients.add(clientMap.remove(secretId)); } - /** - * Add send result. - * - * @param currentRecord Event to be sent. - * @param bid Topic or dest ip of event. - * @param result Result of send. - * @param sendTime Time of sending. - */ public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) { Map<String, String> dimensions = this.getDimensions(currentRecord, bid); SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions); @@ -288,13 +204,6 @@ public class ClsSinkContext extends SinkContext { } } - /** - * Get report dimensions. - * - * @param currentRecord Event. - * @param bid Topic or dest ip. - * @return Prepared dimensions map. - */ private Map<String, String> getDimensions(ProfileEvent currentRecord, String bid) { Map<String, String> dimensions = new HashMap<>(); dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId()); @@ -309,40 +218,18 @@ public class ClsSinkContext extends SinkContext { return dimensions; } - /** - * Get {@link ClsIdConfig} by uid. - * - * @param uid Uid of event. - * @return Corresponding cls id config. - */ public ClsIdConfig getIdConfig(String uid) { return idConfigMap.get(uid); } - /** - * Get max length of single value. - * - * @return Max length of single value. - */ public int getKeywordMaxLength() { return keywordMaxLength; } - /** - * Get LogItem handler. - * - * @return Handler. - */ public IEvent2LogItemHandler getLogItemHandler() { return event2LogItemHandler; } - /** - * Get cls client. - * - * @param secretId ID of client. - * @return Client instance. - */ public AsyncProducerClient getClient(String secretId) { return clientMap.get(secretId); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java similarity index 75% copy from inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java copy to inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java index 7e8b26e3ca..251a6d56af 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.sort.standalone.sink; +package org.apache.inlong.sort.standalone.sink.v2; import org.apache.inlong.common.metric.MetricRegister; -import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; +import org.apache.inlong.common.pojo.sort.SortTaskConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; -import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet; import org.apache.inlong.sort.standalone.utils.BufferQueue; @@ -37,84 +37,57 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; -/** - * - * SinkContext - */ public class SinkContext { public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class); - public static final String KEY_MAX_THREADS = "maxThreads"; public static final String KEY_PROCESSINTERVAL = "processInterval"; public static final String KEY_RELOADINTERVAL = "reloadInterval"; public static final String KEY_TASK_NAME = "taskName"; public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; - protected final String clusterId; protected final String taskName; protected final String sinkName; protected final Context sinkContext; - protected SortTaskConfig sortTaskConfig; - protected final Channel channel; - // protected final int maxThreads; protected final long processInterval; protected final long reloadInterval; - // protected final SortMetricItemSet metricItemSet; protected Timer reloadTimer; - /** - * Constructor - * - * @param sinkName - * @param context - * @param channel - */ public SinkContext(String sinkName, Context context, Channel channel) { this.sinkName = sinkName; this.sinkContext = context; this.channel = channel; - this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID); - this.taskName = context.getString(KEY_TASK_NAME); + this.clusterId = sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID); + this.taskName = sinkContext.getString(KEY_TASK_NAME); this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10); this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100); this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L); - // this.metricItemSet = new SortMetricItemSet(sinkName); MetricRegister.register(this.metricItemSet); } - /** - * start - */ public void start() { try { this.reload(); this.setReloadTimer(); } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("failed to start sink context", e); } } - /** - * close - */ public void close() { try { this.reloadTimer.cancel(); } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error("failed to close sink context", e); } } - /** - * setReloadTimer - */ protected void setReloadTimer() { reloadTimer = new Timer(true); TimerTask task = new TimerTask() { @@ -126,113 +99,54 @@ public class SinkContext { reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval); } - /** - * reload - */ public void reload() { try { - this.sortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName); + this.sortTaskConfig = SortConfigHolder.getTaskConfig(taskName); } catch (Throwable e) { - LOG.error(e.getMessage(), e); + LOG.error("failed to stop sink context", e); } } - /** - * get clusterId - * - * @return the clusterId - */ public String getClusterId() { return clusterId; } - /** - * get taskName - * - * @return the taskName - */ public String getTaskName() { return taskName; } - /** - * get sinkName - * - * @return the sinkName - */ public String getSinkName() { return sinkName; } - /** - * get sinkContext - * - * @return the sinkContext - */ public Context getSinkContext() { return sinkContext; } - /** - * get sortTaskConfig - * - * @return the sortTaskConfig - */ public SortTaskConfig getSortTaskConfig() { return sortTaskConfig; } - /** - * get channel - * - * @return the channel - */ public Channel getChannel() { return channel; } - /** - * get maxThreads - * - * @return the maxThreads - */ public int getMaxThreads() { return maxThreads; } - /** - * get processInterval - * - * @return the processInterval - */ public long getProcessInterval() { return processInterval; } - /** - * get reloadInterval - * - * @return the reloadInterval - */ public long getReloadInterval() { return reloadInterval; } - /** - * get metricItemSet - * - * @return the metricItemSet - */ public SortMetricItemSet getMetricItemSet() { return metricItemSet; } - /** - * fillInlongId - * - * @param currentRecord - * @param dimensions - */ public static void fillInlongId(ProfileEvent currentRecord, Map<String, String> dimensions) { String inlongGroupId = currentRecord.getInlongGroupId(); inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : inlongGroupId; @@ -242,10 +156,6 @@ public class SinkContext { dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId); } - /** - * createBufferQueue - * @return - */ public static <U> BufferQueue<U> createBufferQueue() { int maxBufferQueueSizeKb = CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB, DEFAULT_MAX_BUFFERQUEUE_SIZE_KB); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java new file mode 100644 index 0000000000..43245da9d0 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/v2/SortSdkSource.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.source.sortsdk.v2; + +import org.apache.inlong.sdk.commons.admin.AdminServiceRegister; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.api.SortClient; +import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.api.SortClientFactory; +import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType; +import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; +import org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig; +import org.apache.inlong.sort.standalone.source.sortsdk.DefaultTopicChangeListener; +import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback; +import org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceContext; +import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.flume.Context; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.conf.Configurable; +import org.apache.flume.source.AbstractSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Default Source implementation of InLong. + * + * <p> + * SortSdkSource acquired msg from different upstream data store by register {@link SortClient} for each sort task. The + * only things SortSdkSource should do is to get one client by the sort task id, or remove one client when the task is + * finished or schedule to other source instance. + * </p> + * + * <p> + * The Default Manager of InLong will schedule the partition and topic automatically. + * </p> + * + * <p> + * Because all sources should implement {@link Configurable}, the SortSdkSource should have default constructor + * <b>WITHOUT</b> any arguments, and parameters will be configured by {@link Configurable#configure(Context)}. + * </p> + */ +public final class SortSdkSource extends AbstractSource + implements + Configurable, + Runnable, + EventDrivenSource, + ConsumerServiceMBean { + + private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class); + public static final String SORT_SDK_PREFIX = "sortsdk."; + private static final int CORE_POOL_SIZE = 1; + private static final SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest; + private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum"; + private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1; + private String taskName; + private SortSdkSourceContext context; + private String sortClusterName; + private long reloadInterval; + private ScheduledExecutorService pool; + + private List<SortClient> sortClients = new ArrayList<>(); + + @Override + public synchronized void start() { + int sortSdkClientNum = CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, DEFAULT_SORT_SDK_CLIENT_NUM); + LOG.info("start SortSdkSource:{}, client num is {}", taskName, sortSdkClientNum); + for (int i = 0; i < sortSdkClientNum; i++) { + SortClient client = this.newClient(taskName); + if (client != null) { + this.sortClients.add(client); + } + } + } + + @Override + public void stop() { + pool.shutdownNow(); + LOG.info("close sort client {}.", taskName); + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setStopConsume(true); + sortClient.close(); + } + } + + @Override + public void run() { + LOG.info("start to reload SortSdkSource:{}", taskName); + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl()); + } + } + + @Override + public void configure(Context context) { + this.taskName = context.getString(FlumeConfigGenerator.KEY_TASK_NAME); + this.context = new SortSdkSourceContext(getName(), context); + this.sortClusterName = SortConfigHolder.getSortConfig().getSortClusterName(); + this.reloadInterval = this.context.getReloadInterval(); + this.initReloadExecutor(); + // register + AdminServiceRegister.register(ConsumerServiceMBean.MBEAN_TYPE, taskName, this); + } + + private void initReloadExecutor() { + this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE); + pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, TimeUnit.SECONDS); + } + + private SortClient newClient(final String sortTaskName) { + LOG.info("start a new sort client for task: {}", sortTaskName); + try { + final SortClientConfig clientConfig = new SortClientConfig(sortTaskName, this.sortClusterName, + new DefaultTopicChangeListener(), + SortSdkSource.defaultStrategy, InetAddress.getLocalHost().getHostAddress()); + final FetchCallback callback = FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context); + clientConfig.setCallback(callback); + Map<String, String> sortSdkParams = this.getSortClientConfigParameters(); + clientConfig.setParameters(sortSdkParams); + + // create SortClient + String configType = CommonPropertiesHolder + .getString(SortSourceConfigType.KEY_TYPE, SortSourceConfigType.MANAGER.name()); + SortClient client = null; + if (SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) { + LOG.info("create sort sdk client in file way:{}", configType); + ClassResourceQueryConsumeConfig queryConfig = new ClassResourceQueryConsumeConfig(); + client = SortClientFactory.createSortClient(clientConfig, queryConfig); + } else if (SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) { + LOG.info("create sort sdk client in manager way:{}", configType); + clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl()); + client = SortClientFactory.createSortClient(clientConfig); + } else { + LOG.info("create sort sdk client in custom way:{}", configType); + Class<?> loaderClass = ClassUtils.getClass(configType); + Object loaderObject = loaderClass.getDeclaredConstructor().newInstance(); + if (loaderObject instanceof Configurable) { + ((Configurable) loaderObject).configure(new Context(CommonPropertiesHolder.get())); + } + if (!(loaderObject instanceof QueryConsumeConfig)) { + LOG.error("got exception when create QueryConsumeConfig instance, config key:{},config class:{}", + SortSourceConfigType.KEY_TYPE, configType); + return null; + } + // if it specifies the type of QueryConsumeConfig. + client = SortClientFactory.createSortClient(clientConfig, (QueryConsumeConfig) loaderObject); + } + + client.init(); + callback.setClient(client); + return client; + } catch (Throwable th) { + LOG.error("got one throwable when init client of id:{}", sortTaskName, th); + } + return null; + } + + private Map<String, String> getSortClientConfigParameters() { + Map<String, String> commonParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX); + return new HashMap<>(commonParams); + } + + @Override + public void stopConsumer() { + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setStopConsume(true); + } + } + + @Override + public void recoverConsumer() { + for (SortClient sortClient : sortClients) { + sortClient.getConfig().setStopConsume(false); + } + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java index 270760f317..bfeb0adde6 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestClsIdConfig.java @@ -23,6 +23,7 @@ import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.Arrays; import java.util.List; @RunWith(PowerMockRunner.class) @@ -32,8 +33,7 @@ public class TestClsIdConfig { @Test public void testGetFieldList() { ClsIdConfig idConfig = new ClsIdConfig(); - String testFieldName = "1 2 3 4 5 6 7"; - idConfig.setFieldNames(testFieldName); + idConfig.setFieldList(Arrays.asList("1", "2", "3", "4", "5", "6", "7")); List<String> fieldList = idConfig.getFieldList(); Assert.assertEquals(7, fieldList.size()); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java index 512a349190..efdf3cb52c 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java @@ -32,6 +32,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,7 +70,7 @@ public class TestDefaultEvent2LogItemHandler { private ClsIdConfig prepareIdConfig() { ClsIdConfig config = new ClsIdConfig(); - config.setFieldNames("f1 f2 f3 f4 f5 f6 f7 f8"); + config.setFieldList(Arrays.asList("f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8")); config.setInlongGroupId("testGroup"); config.setInlongStreamId("testStream"); config.setSecretId("testSecretId");