yunqingmoswu commented on code in PR #6010:
URL: https://github.com/apache/inlong/pull/6010#discussion_r978450125


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> 
getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = 
mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = 
transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, 
List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = 
getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {
+                    // in standard mode transform input node must either be mq 
source node or transform node,
+                    // otherwise replace it with mq node name, which should be 
stream id
+                    field.setOriginNodeName(mqNodeName);
+                }
+            }
+        }
+    }
+
+    private void adjustNodeRelations(List<NodeRelation> relations, 
InlongGroupInfo groupInfo,
+            List<StreamSource> sources, List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set relations' input node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = 
getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (NodeRelation relation : relations) {
+            List<String> inputs = relation.getInputs();
+            for (int index = 0; index < inputs.size(); ++index) {
+                String inputName = inputs.get(index);
+                if (!(validNameSet.contains(inputName))) {

Review Comment:
   !(validNameSet.contains(inputName)) -> !validNameSet.contains(inputName)



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> 
getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = 
mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = 
transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, 
List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();

Review Comment:
   Why is it get(0)?



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> 
getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = 
mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = 
transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, 
List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = 
getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {
+                    // in standard mode transform input node must either be mq 
source node or transform node,
+                    // otherwise replace it with mq node name, which should be 
stream id
+                    field.setOriginNodeName(mqNodeName);
+                }
+            }
+        }
+    }
+
+    private void adjustNodeRelations(List<NodeRelation> relations, 
InlongGroupInfo groupInfo,
+            List<StreamSource> sources, List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set relations' input node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();

Review Comment:
   Why is it get(0)?



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> 
getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = 
mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = 
transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, 
List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = 
getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {

Review Comment:
   !(validNameSet.contains(originNodeName)) -> 
!validNameSet.contains(originNodeName)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to