yunqingmoswu commented on code in PR #6010: URL: https://github.com/apache/inlong/pull/6010#discussion_r978450125
########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java: ########## @@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } + private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources, + List<TransformResponse> transforms) { + Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()); + Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName) + .collect(Collectors.toSet()); + Set<String> result = Sets.newHashSet(); + result.addAll(mqSourceNameSet); + result.addAll(transformNameSet); + return result; + } + + private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources, + List<TransformResponse> transforms) { + if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { + return; + } + + // set transform fields' origin node to mq node when necessary + String mqNodeName = sources.get(0).getSourceName(); + Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms); + for (TransformResponse transform : transforms) { + for (StreamField field : transform.getFieldList()) { + String originNodeName = field.getOriginNodeName(); + if (!(validNameSet.contains(originNodeName))) { + // in standard mode transform input node must either be mq source node or transform node, + // otherwise replace it with mq node name, which should be stream id + field.setOriginNodeName(mqNodeName); + } + } + } + } + + private void adjustNodeRelations(List<NodeRelation> relations, InlongGroupInfo groupInfo, + List<StreamSource> sources, List<TransformResponse> transforms) { + if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { + return; + } + + // set relations' input node to mq node when necessary + String mqNodeName = sources.get(0).getSourceName(); + Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms); + for (NodeRelation relation : relations) { + List<String> inputs = relation.getInputs(); + for (int index = 0; index < inputs.size(); ++index) { + String inputName = inputs.get(index); + if (!(validNameSet.contains(inputName))) { Review Comment: !(validNameSet.contains(inputName)) -> !validNameSet.contains(inputName) ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java: ########## @@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } + private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources, + List<TransformResponse> transforms) { + Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()); + Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName) + .collect(Collectors.toSet()); + Set<String> result = Sets.newHashSet(); + result.addAll(mqSourceNameSet); + result.addAll(transformNameSet); + return result; + } + + private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources, + List<TransformResponse> transforms) { + if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { + return; + } + + // set transform fields' origin node to mq node when necessary + String mqNodeName = sources.get(0).getSourceName(); Review Comment: Why is it get(0)? ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java: ########## @@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } + private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources, + List<TransformResponse> transforms) { + Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()); + Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName) + .collect(Collectors.toSet()); + Set<String> result = Sets.newHashSet(); + result.addAll(mqSourceNameSet); + result.addAll(transformNameSet); + return result; + } + + private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources, + List<TransformResponse> transforms) { + if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { + return; + } + + // set transform fields' origin node to mq node when necessary + String mqNodeName = sources.get(0).getSourceName(); + Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms); + for (TransformResponse transform : transforms) { + for (StreamField field : transform.getFieldList()) { + String originNodeName = field.getOriginNodeName(); + if (!(validNameSet.contains(originNodeName))) { + // in standard mode transform input node must either be mq source node or transform node, + // otherwise replace it with mq node name, which should be stream id + field.setOriginNodeName(mqNodeName); + } + } + } + } + + private void adjustNodeRelations(List<NodeRelation> relations, InlongGroupInfo groupInfo, + List<StreamSource> sources, List<TransformResponse> transforms) { + if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { + return; + } + + // set relations' input node to mq node when necessary + String mqNodeName = sources.get(0).getSourceName(); Review Comment: Why is it get(0)? ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java: ########## @@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } + private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources, + List<TransformResponse> transforms) { + Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()); + Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName) + .collect(Collectors.toSet()); + Set<String> result = Sets.newHashSet(); + result.addAll(mqSourceNameSet); + result.addAll(transformNameSet); + return result; + } + + private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources, + List<TransformResponse> transforms) { + if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { + return; + } + + // set transform fields' origin node to mq node when necessary + String mqNodeName = sources.get(0).getSourceName(); + Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms); + for (TransformResponse transform : transforms) { + for (StreamField field : transform.getFieldList()) { + String originNodeName = field.getOriginNodeName(); + if (!(validNameSet.contains(originNodeName))) { Review Comment: !(validNameSet.contains(originNodeName)) -> !validNameSet.contains(originNodeName) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org