[ https://issues.apache.org/jira/browse/FLINK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lijie Wang updated FLINK-26576: ------------------------------- Description: In [StreamExecutionEnvironment#createFileInput |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B], the {{env.getParallelism()}} was passed to {{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers. This value is incorrect when the parallelism of the downstream readers is manually configured by the user. For example, in the test below, *1* will be passed as {{{}readerParallelism{}}}, but the actual parallelism of downstream readers is {*}5{*}. {code:java} @Test public void testContinuousFileMonitoringFunction() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); final String fileContent = "line1\n" + "line2\n" + "line3\n" + "line4\n" + "line5\n"; final File file = createTempFile(fileContent); env.readTextFile(file.getPath()).name("TextSource").setParallelism(5) .forward() .addSink(new PrintSinkFunction<>()).setParallelism(5); env.execute(); } private File createTempFile(String content) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8); wrt.write(content); wrt.close(); return tempFile; } {code} was: In [StreamExecutionEnvironment#createFileInput |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B], the {{env.getParallelism()}} was passed to {{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers. This value is incorrect when the parallelism of the downstream readers is manually configured by the user. For example, in the test below, *1* will be passed as {{{}readerParallelism{}}}, but the actual parallelism of readers is {*}5{*}. {code:java} @Test public void testContinuousFileMonitoringFunction() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); final String fileContent = "line1\n" + "line2\n" + "line3\n" + "line4\n" + "line5\n"; final File file = createTempFile(fileContent); env.readTextFile(file.getPath()).name("TextSource").setParallelism(5) .forward() .addSink(new PrintSinkFunction<>()).setParallelism(5); env.execute(); } private File createTempFile(String content) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8); wrt.write(content); wrt.close(); return tempFile; } {code} > The value of 'readerParallelism' passed to ContinuousFileMonitoringFunction > is wrong > ------------------------------------------------------------------------------------ > > Key: FLINK-26576 > URL: https://issues.apache.org/jira/browse/FLINK-26576 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Reporter: Lijie Wang > Priority: Major > > In [StreamExecutionEnvironment#createFileInput > |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B], > the {{env.getParallelism()}} was passed to > {{ContinuousFileMonitoringFunction}} as the parallelism of downstream > readers. This value is incorrect when the parallelism of the downstream > readers is manually configured by the user. > For example, in the test below, *1* will be passed as > {{{}readerParallelism{}}}, but the actual parallelism of downstream readers > is {*}5{*}. > {code:java} > @Test > public void testContinuousFileMonitoringFunction() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1); > final String fileContent = "line1\n" + "line2\n" + "line3\n" + > "line4\n" + "line5\n"; > final File file = createTempFile(fileContent); > env.readTextFile(file.getPath()).name("TextSource").setParallelism(5) > .forward() > .addSink(new PrintSinkFunction<>()).setParallelism(5); > env.execute(); > } > private File createTempFile(String content) throws IOException { > File tempFile = File.createTempFile("test_contents", "tmp"); > tempFile.deleteOnExit(); > OutputStreamWriter wrt = > new OutputStreamWriter(new FileOutputStream(tempFile), > StandardCharsets.UTF_8); > wrt.write(content); > wrt.close(); > return tempFile; > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)