[ 
https://issues.apache.org/jira/browse/FLINK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-26576:
-------------------------------
    Description: 
In [StreamExecutionEnvironment#createFileInput 
|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B],
 the {{env.getParallelism()}} was passed to 
{{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers. 
This value is incorrect when the parallelism of the downstream readers is 
manually configured by the user.

For example, in the test below, *1* will be passed as 
{{{}readerParallelism{}}}, but the actual parallelism of readers is {*}5{*}.
{code:java}
    @Test
    public void testContinuousFileMonitoringFunction() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);

        final String fileContent = "line1\n" + "line2\n" + "line3\n" + 
"line4\n" + "line5\n";

        final File file = createTempFile(fileContent);

        env.readTextFile(file.getPath()).name("TextSource").setParallelism(5)
                .forward()
                .addSink(new PrintSinkFunction<>()).setParallelism(5);

        env.execute();
    }

    private File createTempFile(String content) throws IOException {
        File tempFile = File.createTempFile("test_contents", "tmp");
        tempFile.deleteOnExit();

        OutputStreamWriter wrt =
                new OutputStreamWriter(new FileOutputStream(tempFile), 
StandardCharsets.UTF_8);
        wrt.write(content);
        wrt.close();

        return tempFile;
    }
{code}

  was:
In [StreamExecutionEnvironment#createFileInput 
|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B],
 the {{env.getParallelism()}} was passed to 
{{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers. 
This value is incorrect when the parallelism of the downstream readers is 
manually configured by the user.

For example, in the test below, *1* will be passed as {{readerParallelism}}, 
but the actual parallelism of readers is *5*.

{code:java}
// Some comments here
    @Test
    public void testContinuousFileMonitoringFunction() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);

        final String fileContent = "line1\n" + "line2\n" + "line3\n" + 
"line4\n" + "line5\n";

        final File file = createTempFile(fileContent);

        env.readTextFile(file.getPath()).name("TextSource").setParallelism(5)
                .forward()
                .addSink(new PrintSinkFunction<>()).setParallelism(5);

        env.execute();
    }

    private File createTempFile(String content) throws IOException {
        File tempFile = File.createTempFile("test_contents", "tmp");
        tempFile.deleteOnExit();

        OutputStreamWriter wrt =
                new OutputStreamWriter(new FileOutputStream(tempFile), 
StandardCharsets.UTF_8);
        wrt.write(content);
        wrt.close();

        return tempFile;
    }
{code}


> The value of 'readerParallelism' passed to ContinuousFileMonitoringFunction 
> is wrong
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-26576
>                 URL: https://issues.apache.org/jira/browse/FLINK-26576
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>            Reporter: Lijie Wang
>            Priority: Major
>
> In [StreamExecutionEnvironment#createFileInput 
> |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B],
>  the {{env.getParallelism()}} was passed to 
> {{ContinuousFileMonitoringFunction}} as the parallelism of downstream 
> readers. This value is incorrect when the parallelism of the downstream 
> readers is manually configured by the user.
> For example, in the test below, *1* will be passed as 
> {{{}readerParallelism{}}}, but the actual parallelism of readers is {*}5{*}.
> {code:java}
>     @Test
>     public void testContinuousFileMonitoringFunction() throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1);
>         final String fileContent = "line1\n" + "line2\n" + "line3\n" + 
> "line4\n" + "line5\n";
>         final File file = createTempFile(fileContent);
>         env.readTextFile(file.getPath()).name("TextSource").setParallelism(5)
>                 .forward()
>                 .addSink(new PrintSinkFunction<>()).setParallelism(5);
>         env.execute();
>     }
>     private File createTempFile(String content) throws IOException {
>         File tempFile = File.createTempFile("test_contents", "tmp");
>         tempFile.deleteOnExit();
>         OutputStreamWriter wrt =
>                 new OutputStreamWriter(new FileOutputStream(tempFile), 
> StandardCharsets.UTF_8);
>         wrt.write(content);
>         wrt.close();
>         return tempFile;
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to