LadyForest commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1505864009
########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## Review Comment: > > [...] the issue is triggered when the FileSystemOutputFormat#finalizeGlobal method is called (triggered by the final block). > > I cannot confirm that. The change code segment isn't covered by the newly added tests. As said, reverting your fix in `FileSystemTableSink` doesn't make any of the tests fail. You can verify the root cause by the following steps. 1. temporarily make the toStagingPath a static method and modify the test case like the following snippet. ```java @Test void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws Exception { // sink1 AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new AtomicReference<>(); AtomicReference<File> fileToCommitRef1 = new AtomicReference<>(); AtomicReference<String> contentRef1 = new AtomicReference<>(); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = createSink( false, false, false, FileSystemTableSink.toStagingPath(Path.fromLocalFile(tmpPath.toFile())), new LinkedHashMap<>(), sinkRef1)) { writeUnorderedRecords(testHarness); Map<File, String> content = getFileContentByPath(tmpPath); assertThat(content).hasSize(1); fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0)); contentRef1.set(new ArrayList<>(content.values()).get(0)); } // sink2 AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new AtomicReference<>(); AtomicReference<File> fileToCommitRef2 = new AtomicReference<>(); AtomicReference<String> contentRef2 = new AtomicReference<>(); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = createSink( false, false, false, FileSystemTableSink.toStagingPath(Path.fromLocalFile(tmpPath.toFile())), new LinkedHashMap<>(), sinkRef2)) { // write different content List<Row> records = Arrays.asList( Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), Row.of("b3", 3, "x3")); writeUnorderedRecords(testHarness, records); Map<File, String> content = getFileContentByPath(tmpPath); assertThat(content).hasSize(1); fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0)); contentRef2.set(new ArrayList<>(content.values()).get(0)); } File fileToCommit1 = fileToCommitRef1.get(); File fileToCommit2 = fileToCommitRef2.get(); // because sink1 and sink2 are writing to the same staging dir // the file generated by sink1 has been overwritten by sink2 assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent()); assertThat(fileToCommit1).doesNotExist(); // lost records assertThat(contentRef1.get()) .isEqualTo("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); assertThat(fileToCommit2).exists(); assertThat(contentRef2.get()).isEqualTo("b1,1,x1\n" + "b2,2,x2\n" + "b3,3,x3\n"); // let sink1 commit, and it actually commits the file generated by sink2 sinkRef1.get().finalizeGlobal(finalizationContext); assertThat(getFileContentByPath(outputPath).values()).containsExactly(contentRef2.get()); // sink2 will commit nothing sinkRef2.get().finalizeGlobal(finalizationContext); assertThat(getFileContentByPath(outputPath).values()).containsExactly(contentRef2.get()); } ``` If you run this case, it will fail because the staging dir generated by `FileSystemTableSink.toStagingPath` never returns the same path, and no data loss happens. 2. Then, you change the code to return a constant path and rerun the previous case again; it'll be a success, which indicates one file is overwritten by another. ```java @VisibleForTesting public static Path toStagingPath(Path path) { // return a constant path to reproduce the issue Path stagingDir = new Path(path, ".staging_foo"); ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org