[ https://issues.apache.org/jira/browse/FLINK-32680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17747409#comment-17747409 ]
Junrui Li commented on FLINK-32680: ----------------------------------- This bug is because the global ChainedSources are used when generating the JobVertex name ([here|https://github.com/apache/flink/blob/c8ae39d4ac73f81873e1d8ac37e17c29ae330b23/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903]). But in fact, it should be filtered according to the id of the current node. An example can refer to is [here.|https://github.com/apache/flink/blob/c8ae39d4ac73f81873e1d8ac37e17c29ae330b23/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1052] If the idea is correct, I will prepare a PR to fix this issue. > Job vertex names get messed up once there is a source vertex chained with a > MultipleInput vertex in job graph > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-32680 > URL: https://issues.apache.org/jira/browse/FLINK-32680 > Project: Flink > Issue Type: Bug > Affects Versions: 1.16.2, 1.18.0, 1.17.1 > Reporter: Lijie Wang > Priority: Major > Attachments: image-2023-07-26-15-23-29-551.png, > image-2023-07-26-15-24-24-077.png > > > Take the following test(put it to {{MultipleInputITCase}}) as example: > {code:java} > @Test > public void testMultipleInputDoesNotChainedWithSource() throws Exception { > testJobVertexName(false); > } > > @Test > public void testMultipleInputChainedWithSource() throws Exception { > testJobVertexName(true); > } > public void testJobVertexName(boolean chain) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > TestListResultSink<Long> resultSink = new TestListResultSink<>(); > DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1"); > DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2"); > DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3"); > KeyedMultipleInputTransformation<Long> transform = > new KeyedMultipleInputTransformation<>( > "MultipleInput", > new KeyedSumMultipleInputOperatorFactory(), > BasicTypeInfo.LONG_TYPE_INFO, > 1, > BasicTypeInfo.LONG_TYPE_INFO); > if (chain) { > transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES); > } > KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value > -> value % 3; > env.addOperator( > transform > .addInput(source1.getTransformation(), keySelector) > .addInput(source2.getTransformation(), keySelector) > .addInput(source3.getTransformation(), keySelector)); > new > MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink"); > env.execute(); > }{code} > > When we run {{testMultipleInputDoesNotChainedWithSource}} , all job vertex > names are normal: > !image-2023-07-26-15-24-24-077.png|width=494,height=246! > When we run {{testMultipleInputChainedWithSource}} (the MultipleInput chained > with source1), job vertex names get messed up (all job vertex names contain > {{{}Source: source1{}}}): > !image-2023-07-26-15-23-29-551.png|width=515,height=182! > > I think it's a bug. -- This message was sent by Atlassian Jira (v8.20.10#820010)