This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8eec67bc72 [INLONG-10529][Sort] PulsarSink support switch metadata acquire mode (#10554) 8eec67bc72 is described below commit 8eec67bc7218bd1a5303de9c6eee967363a4927d Author: vernedeng <verned...@apache.org> AuthorDate: Tue Jul 2 19:02:09 2024 +0800 [INLONG-10529][Sort] PulsarSink support switch metadata acquire mode (#10554) --- .../sink/pulsar/PulsarFederationSinkContext.java | 54 ++++++++++++++++------ .../standalone/sink/pulsar/PulsarIdConfig.java | 13 ++++++ 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java index 770f028da0..a31d9f90ba 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java @@ -20,8 +20,10 @@ package org.apache.inlong.sort.standalone.sink.pulsar; import org.apache.inlong.common.pojo.sort.ClusterTagConfig; import org.apache.inlong.common.pojo.sort.TaskConfig; import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig; +import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; import org.apache.inlong.sort.standalone.config.pojo.InlongId; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; @@ -36,6 +38,7 @@ import org.slf4j.Logger; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -53,36 +56,59 @@ public class PulsarFederationSinkContext extends SinkContext { public void reload() { try { - TaskConfig newSortTaskConfig = SortConfigHolder.getTaskConfig(taskName); - if (newSortTaskConfig == null) { + TaskConfig newTaskConfig = SortConfigHolder.getTaskConfig(taskName); + SortTaskConfig newSortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName); + if (newTaskConfig == null && newSortTaskConfig == null) { LOG.error("newSortTaskConfig is null."); return; } - if (this.taskConfig != null && this.taskConfig.equals(newSortTaskConfig)) { + if ((this.taskConfig != null && this.taskConfig.equals(newTaskConfig)) + && (this.sortTaskConfig != null && this.sortTaskConfig.equals(newSortTaskConfig))) { LOG.info("Same sortTaskConfig, do nothing."); return; } - this.taskConfig = newSortTaskConfig; - PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newSortTaskConfig.getNodeConfig(); + PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) newTaskConfig.getNodeConfig(); if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) { this.pulsarNodeConfig = requestNodeConfig; } + this.taskConfig = newTaskConfig; + this.sortTaskConfig = newSortTaskConfig; - this.idConfigMap = this.taskConfig.getClusterTagConfigs() - .stream() - .map(ClusterTagConfig::getDataFlowConfigs) - .flatMap(Collection::stream) - .map(PulsarIdConfig::create) - .collect(Collectors.toMap( - config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()), - v -> v, - (v1, v2) -> v1)); + Map<String, PulsarIdConfig> fromTaskConfig = fromTaskConfig(taskConfig); + Map<String, PulsarIdConfig> fromSortTaskConfig = fromSortTaskConfig(sortTaskConfig); + idConfigMap = unifiedConfiguration ? fromTaskConfig : fromSortTaskConfig; } catch (Throwable e) { LOG.error(e.getMessage(), e); } } + public Map<String, PulsarIdConfig> fromTaskConfig(TaskConfig taskConfig) { + return taskConfig.getClusterTagConfigs() + .stream() + .map(ClusterTagConfig::getDataFlowConfigs) + .flatMap(Collection::stream) + .map(PulsarIdConfig::create) + .collect(Collectors.toMap( + config -> InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()), + v -> v, + (v1, v2) -> v1)); + } + + public Map<String, PulsarIdConfig> fromSortTaskConfig(SortTaskConfig sortTaskConfig) { + Map<String, PulsarIdConfig> newIdConfigMap = new ConcurrentHashMap<>(); + List<Map<String, String>> idList = sortTaskConfig.getIdParams(); + for (Map<String, String> idParam : idList) { + try { + PulsarIdConfig idConfig = new PulsarIdConfig(idParam); + newIdConfigMap.put(idConfig.getUid(), idConfig); + } catch (Exception e) { + LOG.error("fail to parse pulsar id config", e); + } + } + return newIdConfigMap; + } + public String getTopic(String uid) { PulsarIdConfig idConfig = this.idConfigMap.get(uid); if (idConfig == null) { diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java index cb6c651982..69b042506d 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java @@ -21,12 +21,15 @@ import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig; import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig; import org.apache.inlong.sort.standalone.config.pojo.InlongId; +import org.apache.inlong.sort.standalone.utils.Constants; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.Map; + @Data @Builder @NoArgsConstructor @@ -46,6 +49,16 @@ public class PulsarIdConfig { private String topic; private DataTypeEnum dataType = DataTypeEnum.TEXT; + public PulsarIdConfig(Map<String, String> idParam) { + this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID); + this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID, DEFAULT_INLONG_STREAM); + this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); + this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR, PulsarIdConfig.DEFAULT_SEPARATOR); + this.topic = idParam.getOrDefault(Constants.TOPIC, uid); + this.dataType = DataTypeEnum + .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, DataTypeEnum.TEXT.getType())); + } + public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) { PulsarSinkConfig sinkConfig = (PulsarSinkConfig) dataFlowConfig.getSinkConfig();