snuyanzin commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1505763661
########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -104,15 +110,139 @@ void testNonPartition() throws Exception { .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); } + @Test + void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws Exception { + // sink1 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new AtomicReference<>(); + AtomicReference<File> fileToCommitRef1 = new AtomicReference<>(); + AtomicReference<String> contentRef1 = new AtomicReference<>(); + try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = + createSink(false, false, false, new LinkedHashMap<>(), sinkRef1)) { + writeUnorderedRecords(testHarness); + Map<File, String> content = getFileContentByPath(tmpPath); + assertThat(content).hasSize(1); + fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0)); + contentRef1.set(new ArrayList<>(content.values()).get(0)); + } + + // sink2 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new AtomicReference<>(); + AtomicReference<File> fileToCommitRef2 = new AtomicReference<>(); + AtomicReference<String> contentRef2 = new AtomicReference<>(); + try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = + createSink(false, false, false, new LinkedHashMap<>(), sinkRef2)) { + // write different content + List<Row> records = + Arrays.asList( + Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), Row.of("b3", 3, "x3")); + writeUnorderedRecords(testHarness, records); + Map<File, String> content = getFileContentByPath(tmpPath); + assertThat(content).hasSize(1); + fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0)); + contentRef2.set(new ArrayList<>(content.values()).get(0)); + } + + File fileToCommit1 = fileToCommitRef1.get(); + File fileToCommit2 = fileToCommitRef2.get(); + // because sink1 and sink2 are writing to the same staging dir + // the file generated by sink1 has been overwritten by sink2 + assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent()); + assertThat(fileToCommit1).doesNotExist(); Review Comment: ```suggestion assertThat(fileToCommit1).hasParent(fileToCommit2.getParent()).doesNotExist(); ``` could be merged and simlified a bit ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -104,15 +110,139 @@ void testNonPartition() throws Exception { .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); } + @Test + void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws Exception { + // sink1 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new AtomicReference<>(); + AtomicReference<File> fileToCommitRef1 = new AtomicReference<>(); + AtomicReference<String> contentRef1 = new AtomicReference<>(); + try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = + createSink(false, false, false, new LinkedHashMap<>(), sinkRef1)) { + writeUnorderedRecords(testHarness); + Map<File, String> content = getFileContentByPath(tmpPath); + assertThat(content).hasSize(1); + fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0)); + contentRef1.set(new ArrayList<>(content.values()).get(0)); + } + + // sink2 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new AtomicReference<>(); + AtomicReference<File> fileToCommitRef2 = new AtomicReference<>(); + AtomicReference<String> contentRef2 = new AtomicReference<>(); + try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = + createSink(false, false, false, new LinkedHashMap<>(), sinkRef2)) { + // write different content + List<Row> records = + Arrays.asList( + Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), Row.of("b3", 3, "x3")); + writeUnorderedRecords(testHarness, records); + Map<File, String> content = getFileContentByPath(tmpPath); + assertThat(content).hasSize(1); + fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0)); + contentRef2.set(new ArrayList<>(content.values()).get(0)); + } + + File fileToCommit1 = fileToCommitRef1.get(); + File fileToCommit2 = fileToCommitRef2.get(); + // because sink1 and sink2 are writing to the same staging dir + // the file generated by sink1 has been overwritten by sink2 + assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent()); + assertThat(fileToCommit1).doesNotExist(); Review Comment: ```suggestion assertThat(fileToCommit1).hasParent(fileToCommit2.getParent()).doesNotExist(); ``` could be merged and simplified a bit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org