This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d77048497a [INLONG-10213][SDK] SortSDK support unified sort configuration (#10219) d77048497a is described below commit d77048497a66a07bba7c7dd7d2b56450973bd841 Author: vernedeng <verned...@apache.org> AuthorDate: Wed May 15 16:40:17 2024 +0800 [INLONG-10213][SDK] SortSDK support unified sort configuration (#10219) * [INLONG-10213][SDK] SortSDK support unified sort configuration --- .../common/pojo/sort/dataflow/DataFlowConfig.java | 1 + .../common/pojo/sort/mq/PulsarClusterConfig.java | 3 + .../listener/queue/ClusterConfigListener.java | 1 + .../resource/sort/DefaultSortConfigOperator.java | 1 + .../config/holder/v2/SortConfigHolder.java | 37 ++++++++- .../loader/SortConfigQueryConsumeConfig.java | 90 ++++++++++++++++++++++ 6 files changed, 130 insertions(+), 3 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java index a7bb1c36a3..a089bdc3f5 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/DataFlowConfig.java @@ -35,6 +35,7 @@ public class DataFlowConfig implements Serializable { private String dataflowId; private Integer version; + private String auditTag; private String inlongGroupId; private String inlongStreamId; private SourceConfig sourceConfig; diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java index 1bb503b7a4..071c93f9c1 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/PulsarClusterConfig.java @@ -30,4 +30,7 @@ public class PulsarClusterConfig extends MqClusterConfig { @JsonInclude(JsonInclude.Include.NON_NULL) private String serviceUrl; + + @JsonInclude(JsonInclude.Include.NON_NULL) + private String token; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java index 3a09c4ecea..8c0aa037ae 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/ClusterConfigListener.java @@ -84,6 +84,7 @@ public class ClusterConfigListener implements ClusterOperateListener { CommonBeanUtils.copyProperties(pulsarCluster, PulsarClusterConfig::new); pulsarClusterConfig.setVersion(String.valueOf(pulsarCluster.getVersion())); pulsarClusterConfig.setClusterName(pulsarCluster.getName()); + pulsarClusterConfig.setServiceUrl(pulsarCluster.getUrl()); list.add(pulsarClusterConfig); } clusterConfigEntity.setConfigParams(objectMapper.writeValueAsString(list)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 5c51c1abcf..670201c357 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -160,6 +160,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { return DataFlowConfig.builder() .dataflowId(String.valueOf(sink.getId())) .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink)) + .auditTag(String.valueOf(sink.getId())) .sinkConfig(getSinkConfig(sink)) .inlongGroupId(groupInfo.getInlongGroupId()) .inlongStreamId(streamInfo.getInlongStreamId()) diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java index ef9106cc97..eb2bf9bf92 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java @@ -17,22 +17,29 @@ package org.apache.inlong.sort.standalone.config.holder.v2; +import org.apache.inlong.common.pojo.sort.SortClusterConfig; import org.apache.inlong.common.pojo.sort.SortConfig; import org.apache.inlong.common.pojo.sort.SortTaskConfig; +import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType; import org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader; import org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader; import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader; +import org.apache.inlong.sort.standalone.config.pojo.InlongId; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flume.Context; +import java.util.Collection; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.stream.Collectors; import static org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL; @@ -45,9 +52,10 @@ public class SortConfigHolder { private Timer reloadTimer; private SortConfigLoader loader; private SortConfig config; + private Map<String, Map<String, String>> auditTagMap; private SortConfigHolder() { - + this.auditTagMap = new HashMap<>(); } private static SortConfigHolder get() { @@ -110,9 +118,21 @@ public class SortConfigHolder { private void reload() { try { SortConfig newConfig = this.loader.load(); - if (newConfig != null) { - this.config = newConfig; + if (newConfig == null) { + return; } + + // <SortTaskName, <InlongId, AuditTag>> + this.auditTagMap = newConfig.getTasks() + .stream() + .collect(Collectors.toMap(SortTaskConfig::getSortTaskName, + v -> v.getClusters() + .stream() + .map(SortClusterConfig::getDataFlowConfigs) + .flatMap(Collection::stream) + .collect(Collectors.toMap(flow -> InlongId.generateUid(flow.getInlongGroupId(), + flow.getInlongStreamId()), + DataFlowConfig::getAuditTag)))); } catch (Throwable e) { log.error("failed to reload sort config", e); } @@ -133,4 +153,15 @@ public class SortConfigHolder { } return null; } + + public static String getAuditTag( + String sortTaskName, + String inlongGroupId, + String inlongStreamId) { + Map<String, String> taskMap = get().auditTagMap.get(sortTaskName); + if (taskMap == null) { + return null; + } + return taskMap.get(InlongId.generateUid(inlongGroupId, inlongStreamId)); + } } diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java new file mode 100644 index 0000000000..3f893041a6 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.config.loader; + +import org.apache.inlong.common.pojo.sort.SortClusterConfig; +import org.apache.inlong.common.pojo.sort.SortTaskConfig; +import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; +import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig; +import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig; +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.entity.CacheZoneCluster; +import org.apache.inlong.sdk.sort.entity.ConsumeConfig; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +public class SortConfigQueryConsumeConfig implements QueryConsumeConfig { + + private List<InLongTopic> subscribedTopic = new ArrayList<>(); + + public void reload() { + + } + + @Override + public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) { + SortTaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId); + List<InLongTopic> topics = taskConfig.getClusters() + .stream() + .map(this::parseTopics) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + return new ConsumeConfig(topics); + } + + public List<InLongTopic> parseTopics(SortClusterConfig clusterConfig) { + List<InLongTopic> topics = new ArrayList<>(); + List<MqClusterConfig> mqClusterConfigs = clusterConfig.getMqClusterConfigs(); + List<DataFlowConfig> dataFlowConfigs = clusterConfig.getDataFlowConfigs(); + for (MqClusterConfig mq : mqClusterConfigs) { + for (DataFlowConfig flow : dataFlowConfigs) { + InLongTopic topic = new InLongTopic(); + topic.setInLongCluster(this.parseCacheZone(mq)); + topic.setTopic(flow.getSourceConfig().getTopic()); + // only supports pulsar now + topic.setTopicType(InlongTopicTypeEnum.PULSAR.getName()); + topic.setProperties(flow.getProperties() != null ? flow.getProperties() : new HashMap<>()); + topics.add(topic); + } + } + return topics; + } + + public CacheZoneCluster parseCacheZone(MqClusterConfig mqClusterConfig) { + PulsarClusterConfig pulsarClusterConfig = (PulsarClusterConfig) mqClusterConfig; + return new CacheZoneCluster(pulsarClusterConfig.getClusterName(), + pulsarClusterConfig.getServiceUrl(), pulsarClusterConfig.getToken()); + } + + @Override + public void configure(ClientContext context) { + + } +}