[ 
https://issues.apache.org/jira/browse/FLINK-32680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-32680:
-------------------------------
    Description: 
Take the following test(put it to {{MultipleInputITCase}}) as example:
{code:java}
    @Test
    public void testMultipleInputDoesNotChainedWithSource() throws Exception {
        testJobVertexName(false);
    }
    
    @Test
    public void testMultipleInputChainedWithSource() throws Exception {
        testJobVertexName(true);
    }

    public void testJobVertexName(boolean chain) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        TestListResultSink<Long> resultSink = new TestListResultSink<>();

        DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1");
        DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2");
        DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3");

        KeyedMultipleInputTransformation<Long> transform =
                new KeyedMultipleInputTransformation<>(
                        "MultipleInput",
                        new KeyedSumMultipleInputOperatorFactory(),
                        BasicTypeInfo.LONG_TYPE_INFO,
                        1,
                        BasicTypeInfo.LONG_TYPE_INFO);
        if (chain) {
            transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        }
        KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value 
-> value % 3;

        env.addOperator(
                transform
                        .addInput(source1.getTransformation(), keySelector)
                        .addInput(source2.getTransformation(), keySelector)
                        .addInput(source3.getTransformation(), keySelector));

        new 
MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink");

        env.execute();
    }{code}
 

When we run {{testMultipleInputDoesNotChainedWithSource}} , all job vertex 
names are normal:

!image-2023-07-26-15-24-24-077.png|width=494,height=246!

When we run {{testMultipleInputChainedWithSource}} (the MultipleInput chained 
with source1), job vertex names get messed up (all job vertex names contain 
{{{}Source: source1{}}}):

!image-2023-07-26-15-23-29-551.png|width=515,height=182!

 

I think it's a bug.

  was:
Take the following test(put it to {{{}MultipleInputITCase{}}}) as example:
{code:java}
    @Test
    public void testMultipleInputDoesNotChainedWithSource() throws Exception {
        testJobVertexName(false);
    }
    
    @Test
    public void testMultipleInputChainedWithSource() throws Exception {
        testJobVertexName(true);
    }

    public void testJobVertexName(boolean chain) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        TestListResultSink<Long> resultSink = new TestListResultSink<>();

        DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1");
        DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2");
        DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3");

        KeyedMultipleInputTransformation<Long> transform =
                new KeyedMultipleInputTransformation<>(
                        "MultipleInput",
                        new KeyedSumMultipleInputOperatorFactory(),
                        BasicTypeInfo.LONG_TYPE_INFO,
                        1,
                        BasicTypeInfo.LONG_TYPE_INFO);
        if (chain) {
            transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        }
        KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value 
-> value % 3;

        env.addOperator(
                transform
                        .addInput(source1.getTransformation(), keySelector)
                        .addInput(source2.getTransformation(), keySelector)
                        .addInput(source3.getTransformation(), keySelector));

        new 
MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink");

        env.execute();
    }{code}
 

When we run {{{}testMultipleInputDoesNotChainedWithSource{}}}, all job vertex 
names are normal:

!image-2023-07-26-15-24-24-077.png|width=494,height=246!

When we run {{{}testMultipleInputChainedWithSource{}}}, job vertex names get 
messed up (all names contain {{{}Source: source1{}}}):

!image-2023-07-26-15-23-29-551.png|width=515,height=182!

 

I think it's a bug.


> Job vertex names get messed up once there is a source vertex chained with a 
> MultipleInput vertex in job graph
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32680
>                 URL: https://issues.apache.org/jira/browse/FLINK-32680
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.2, 1.18.0, 1.17.1
>            Reporter: Lijie Wang
>            Priority: Major
>         Attachments: image-2023-07-26-15-23-29-551.png, 
> image-2023-07-26-15-24-24-077.png
>
>
> Take the following test(put it to {{MultipleInputITCase}}) as example:
> {code:java}
>     @Test
>     public void testMultipleInputDoesNotChainedWithSource() throws Exception {
>         testJobVertexName(false);
>     }
>     
>     @Test
>     public void testMultipleInputChainedWithSource() throws Exception {
>         testJobVertexName(true);
>     }
>     public void testJobVertexName(boolean chain) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         TestListResultSink<Long> resultSink = new TestListResultSink<>();
>         DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1");
>         DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2");
>         DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3");
>         KeyedMultipleInputTransformation<Long> transform =
>                 new KeyedMultipleInputTransformation<>(
>                         "MultipleInput",
>                         new KeyedSumMultipleInputOperatorFactory(),
>                         BasicTypeInfo.LONG_TYPE_INFO,
>                         1,
>                         BasicTypeInfo.LONG_TYPE_INFO);
>         if (chain) {
>             transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
>         }
>         KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value 
> -> value % 3;
>         env.addOperator(
>                 transform
>                         .addInput(source1.getTransformation(), keySelector)
>                         .addInput(source2.getTransformation(), keySelector)
>                         .addInput(source3.getTransformation(), keySelector));
>         new 
> MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink");
>         env.execute();
>     }{code}
>  
> When we run {{testMultipleInputDoesNotChainedWithSource}} , all job vertex 
> names are normal:
> !image-2023-07-26-15-24-24-077.png|width=494,height=246!
> When we run {{testMultipleInputChainedWithSource}} (the MultipleInput chained 
> with source1), job vertex names get messed up (all job vertex names contain 
> {{{}Source: source1{}}}):
> !image-2023-07-26-15-23-29-551.png|width=515,height=182!
>  
> I think it's a bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to