[ https://issues.apache.org/jira/browse/FLINK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lijie Wang updated FLINK-26576: ------------------------------- Affects Version/s: 1.14.3 1.13.6 1.15.0 > The value of 'readerParallelism' passed to ContinuousFileMonitoringFunction > is wrong > ------------------------------------------------------------------------------------ > > Key: FLINK-26576 > URL: https://issues.apache.org/jira/browse/FLINK-26576 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.15.0, 1.13.6, 1.14.3 > Reporter: Lijie Wang > Priority: Major > > In [StreamExecutionEnvironment#createFileInput > |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B], > the {{env.getParallelism()}} was passed to > {{ContinuousFileMonitoringFunction}} as the parallelism of downstream > readers. This value is incorrect when the parallelism of the downstream > readers is manually configured by the user. > For example, in the test below, *1* will be passed as > {{{}readerParallelism{}}}, but the actual parallelism of downstream readers > is {*}5{*}. This will result in only one split being generated, even though > there are 5 downstream readers. > {code:java} > @Test > public void testContinuousFileMonitoringFunction() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1); > final String fileContent = "line1\n" + "line2\n" + "line3\n" + > "line4\n" + "line5\n"; > final File file = createTempFile(fileContent); > env.readTextFile(file.getPath()).name("TextSource").setParallelism(5) > .forward() > .addSink(new PrintSinkFunction<>()).setParallelism(5); > env.execute(); > } > private File createTempFile(String content) throws IOException { > File tempFile = File.createTempFile("test_contents", "tmp"); > tempFile.deleteOnExit(); > OutputStreamWriter wrt = > new OutputStreamWriter(new FileOutputStream(tempFile), > StandardCharsets.UTF_8); > wrt.write(content); > wrt.close(); > return tempFile; > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)