snuyanzin commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505763661


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -104,15 +110,139 @@ void testNonPartition() throws Exception {
                 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
     }
 
+    @Test
+    void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws 
Exception {
+        // sink1
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new 
AtomicReference<>();
+        AtomicReference<File> fileToCommitRef1 = new AtomicReference<>();
+        AtomicReference<String> contentRef1 = new AtomicReference<>();
+        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
+                createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef1)) {
+            writeUnorderedRecords(testHarness);
+            Map<File, String> content = getFileContentByPath(tmpPath);
+            assertThat(content).hasSize(1);
+            fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0));
+            contentRef1.set(new ArrayList<>(content.values()).get(0));
+        }
+
+        // sink2
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new 
AtomicReference<>();
+        AtomicReference<File> fileToCommitRef2 = new AtomicReference<>();
+        AtomicReference<String> contentRef2 = new AtomicReference<>();
+        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
+                createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef2)) {
+            // write different content
+            List<Row> records =
+                    Arrays.asList(
+                            Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), 
Row.of("b3", 3, "x3"));
+            writeUnorderedRecords(testHarness, records);
+            Map<File, String> content = getFileContentByPath(tmpPath);
+            assertThat(content).hasSize(1);
+            fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0));
+            contentRef2.set(new ArrayList<>(content.values()).get(0));
+        }
+
+        File fileToCommit1 = fileToCommitRef1.get();
+        File fileToCommit2 = fileToCommitRef2.get();
+        // because sink1 and sink2 are writing to the same staging dir
+        // the file generated by sink1 has been overwritten by sink2
+        
assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent());
+        assertThat(fileToCommit1).doesNotExist();

Review Comment:
   ```suggestion
           
assertThat(fileToCommit1).hasParent(fileToCommit2.getParent()).doesNotExist();
   ```
   could be merged and simlified a bit



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -104,15 +110,139 @@ void testNonPartition() throws Exception {
                 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
     }
 
+    @Test
+    void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws 
Exception {
+        // sink1
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new 
AtomicReference<>();
+        AtomicReference<File> fileToCommitRef1 = new AtomicReference<>();
+        AtomicReference<String> contentRef1 = new AtomicReference<>();
+        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
+                createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef1)) {
+            writeUnorderedRecords(testHarness);
+            Map<File, String> content = getFileContentByPath(tmpPath);
+            assertThat(content).hasSize(1);
+            fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0));
+            contentRef1.set(new ArrayList<>(content.values()).get(0));
+        }
+
+        // sink2
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new 
AtomicReference<>();
+        AtomicReference<File> fileToCommitRef2 = new AtomicReference<>();
+        AtomicReference<String> contentRef2 = new AtomicReference<>();
+        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
+                createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef2)) {
+            // write different content
+            List<Row> records =
+                    Arrays.asList(
+                            Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), 
Row.of("b3", 3, "x3"));
+            writeUnorderedRecords(testHarness, records);
+            Map<File, String> content = getFileContentByPath(tmpPath);
+            assertThat(content).hasSize(1);
+            fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0));
+            contentRef2.set(new ArrayList<>(content.values()).get(0));
+        }
+
+        File fileToCommit1 = fileToCommitRef1.get();
+        File fileToCommit2 = fileToCommitRef2.get();
+        // because sink1 and sink2 are writing to the same staging dir
+        // the file generated by sink1 has been overwritten by sink2
+        
assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent());
+        assertThat(fileToCommit1).doesNotExist();

Review Comment:
   ```suggestion
           
assertThat(fileToCommit1).hasParent(fileToCommit2.getParent()).doesNotExist();
   ```
   could be merged and simplified a bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to