Lijie Wang created FLINK-26576: ---------------------------------- Summary: The value of 'readerParallelism' passed to ContinuousFileMonitoringFunction is wrong Key: FLINK-26576 URL: https://issues.apache.org/jira/browse/FLINK-26576 Project: Flink Issue Type: Bug Components: API / DataStream Reporter: Lijie Wang
In [StreamExecutionEnvironment#createFileInput |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B], the {{env.getParallelism()}} was passed to {{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers. This value is incorrect when the parallelism of the downstream readers is manually configured by the user. For example, in the test below, *1* will be passed as {{readerParallelism}}, but the actual parallelism of readers is *5*. {code:java} // Some comments here @Test public void testContinuousFileMonitoringFunction() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); final String fileContent = "line1\n" + "line2\n" + "line3\n" + "line4\n" + "line5\n"; final File file = createTempFile(fileContent); env.readTextFile(file.getPath()).name("TextSource").setParallelism(5) .forward() .addSink(new PrintSinkFunction<>()).setParallelism(5); env.execute(); } private File createTempFile(String content) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8); wrt.write(content); wrt.close(); return tempFile; } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)