woofyzhao commented on code in PR #6010:
URL: https://github.com/apache/inlong/pull/6010#discussion_r978460623


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> 
getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = 
mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = 
transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, 
List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = 
getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {
+                    // in standard mode transform input node must either be mq 
source node or transform node,
+                    // otherwise replace it with mq node name, which should be 
stream id
+                    field.setOriginNodeName(mqNodeName);
+                }
+            }
+        }
+    }
+
+    private void adjustNodeRelations(List<NodeRelation> relations, 
InlongGroupInfo groupInfo,
+            List<StreamSource> sources, List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set relations' input node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();

Review Comment:
   In standard mode there is exactly one mq source for each stream at the 
moment. 
   Its source name is the corresponding stream id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to