noorall commented on code in PR #25366: URL: https://github.com/apache/flink/pull/25366#discussion_r1774908113
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -593,15 +616,22 @@ private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs( if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES && isChainableInput(sourceOutEdge, streamGraph)) { + // we cache the non-chainable outputs here, and set the non-chained config later + OperatorInfo operatorInfo = new OperatorInfo(); + operatorInfo.setChainableOutputs(Collections.emptyList()); Review Comment: > maybe nonChainable? > > ``` > OperatorInfo operatorInfo = new OperatorInfo(); > jobVertexBuildContext.addOperatorInfo(sourceNodeId, operatorInfo); > > final OperatorID opId = new OperatorID(hashes.get(sourceNodeId)); > final StreamConfig.SourceInputConfig inputConfig = > new StreamConfig.SourceInputConfig(sourceOutEdge); > final StreamConfig operatorConfig = new StreamConfig(new Configuration()); > > setOperatorConfig( > sourceNodeId, > operatorConfig, > Collections.emptyMap(), > jobVertexBuildContext); > setOperatorChainedOutputsConfig( > operatorConfig, Collections.emptyList(), jobVertexBuildContext); > > // we cache the non-chainable outputs here, and set the non-chained config later > operatorInfo.setNonChainableOutputs(Collections.emptyList()); > ``` > > And a unit test case is needed Thank you for the reminder and I have corrected this part. However, there is no need for an additional unit test because the property in OperatorInfo is an empty list by default. Therefore, it doesn't matter whether it is set or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org