This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 5f8c997a0 [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010) 5f8c997a0 is described below commit 5f8c997a02a098a885ba4c408b0270e6401d60b1 Author: woofyzhao <490467...@qq.com> AuthorDate: Mon Sep 26 16:12:36 2022 +0800 [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010) * Change the transform source node name to be MQ node in standard mode * Reuse some params, and use an iterator to loop and modify the List Co-authored-by: healchow <healc...@gmail.com> --- .../resource/sort/DefaultSortConfigOperator.java | 91 ++++++++++++++++++---- 1 file changed, 74 insertions(+), 17 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 53ece64dc..6096777ac 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -17,23 +17,22 @@ package org.apache.inlong.manager.service.resource.sort; -import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils; +import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils; +import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils; +import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.transform.TransformResponse; import org.apache.inlong.manager.service.sink.StreamSinkService; -import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils; -import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils; -import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils; -import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils; import org.apache.inlong.manager.service.source.StreamSourceService; import org.apache.inlong.manager.service.transform.StreamTransformService; import org.apache.inlong.sort.protocol.GroupInfo; @@ -47,8 +46,11 @@ import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.ListIterator; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -99,8 +101,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator { // get sink info Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList); - List<TransformResponse> transformResponses = transformService.listTransform(groupInfo.getInlongGroupId(), null); - Map<String, List<TransformResponse>> transformMap = transformResponses.stream() + List<TransformResponse> transformList = transformService.listTransform(groupInfo.getInlongGroupId(), null); + Map<String, List<TransformResponse>> transformMap = transformList.stream() .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new))); @@ -111,35 +113,90 @@ public class DefaultSortConfigOperator implements SortConfigOperator { inlongStream.getSourceList().forEach( source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap)); - List<TransformResponse> transformResponseList = transformMap.get(streamId); - if (CollectionUtils.isNotEmpty(transformResponseList)) { - transformResponseList.forEach( + List<TransformResponse> transformResponses = transformMap.get(streamId); + if (CollectionUtils.isNotEmpty(transformResponses)) { + transformResponses.forEach( trans -> parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap)); } // build a stream info from the nodes and relations List<StreamSource> sources = sourceMap.get(streamId); List<StreamSink> sinks = sinkMap.get(streamId); - List<Node> nodes = this.createNodes(sources, transformResponseList, sinks, fieldMap); List<NodeRelation> relations; - if (CollectionUtils.isEmpty(transformResponseList)) { + if (CollectionUtils.isEmpty(transformResponses)) { relations = NodeRelationUtils.createNodeRelations(sources, sinks); } else { relations = NodeRelationUtils.createNodeRelations(inlongStream); + // in standard mode, replace upstream source node and transform input fields node to mq node + if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) { + // mq node name, which is inlong stream id + String mqNodeName = sources.get(0).getSourceName(); + Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses); + adjustTransformField(transformResponses, nodeNameSet, mqNodeName); + adjustNodeRelations(relations, nodeNameSet, mqNodeName); + } } + + // create extract-transform-load nodes + List<Node> nodes = this.createNodes(sources, transformResponses, sinks, fieldMap); + StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations); sortStreamInfos.add(streamInfo); - // rebuild joinerNode relation if transformResponseList is not empty - NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList); + // rebuild joinerNode relation + NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponses); } return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } + /** + * Deduplicate to get the node names of Source and Transform. + */ + private Set<String> getInputNodeNames(List<StreamSource> sources, List<TransformResponse> transforms) { + Set<String> result = new HashSet<>(); + if (CollectionUtils.isNotEmpty(sources)) { + result.addAll(sources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet())); + } + if (CollectionUtils.isNotEmpty(transforms)) { + result.addAll(transforms.stream().map(TransformResponse::getTransformName).collect(Collectors.toSet())); + } + return result; + } + + /** + * Set origin node to mq node for transform fields if necessary. + * + * In standard mode for InlongGroup, transform input node must either be + * mq source node or transform node, otherwise replace it with mq node name. + */ + private void adjustTransformField(List<TransformResponse> transforms, Set<String> nodeNameSet, String mqNodeName) { + for (TransformResponse transform : transforms) { + for (StreamField field : transform.getFieldList()) { + if (!nodeNameSet.contains(field.getOriginNodeName())) { + field.setOriginNodeName(mqNodeName); + } + } + } + } + + /** + * Set the input node to MQ node for NodeRelations + */ + private void adjustNodeRelations(List<NodeRelation> relations, Set<String> nodeNameSet, String mqNodeName) { + for (NodeRelation relation : relations) { + ListIterator<String> iterator = relation.getInputs().listIterator(); + while (iterator.hasNext()) { + if (!nodeNameSet.contains(iterator.next())) { + iterator.set(mqNodeName); + } + } + } + } + private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses, List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) { - List<Node> nodes = Lists.newArrayList(); + List<Node> nodes = new ArrayList<>(); nodes.addAll(ExtractNodeUtils.createExtractNodes(sources)); nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap)); nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap)); @@ -170,7 +227,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { */ private void addToGroupExt(InlongGroupInfo groupInfo, String value) { if (groupInfo.getExtList() == null) { - groupInfo.setExtList(Lists.newArrayList()); + groupInfo.setExtList(new ArrayList<>()); } InlongGroupExtInfo extInfo = new InlongGroupExtInfo(); @@ -188,7 +245,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { private void addToStreamExt(List<InlongStreamInfo> streamInfos, String value) { streamInfos.forEach(streamInfo -> { if (streamInfo.getExtList() == null) { - streamInfo.setExtList(Lists.newArrayList()); + streamInfo.setExtList(new ArrayList<>()); } InlongStreamExtInfo extInfo = new InlongStreamExtInfo();