XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507274806


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
                 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+        // sink1
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new 
AtomicReference<>();
+        AtomicReference<Map<File, String>> fileToCommitRef1 = new 
AtomicReference<>();
+        writeRowsToSink(
+                sinkRef1,
+                fileToCommitRef1,
+                getStagingDir(shareStagingDir),
+                Row.of("a1", 1, "x1"),
+                Row.of("a2", 2, "x2"));
+
+        // sink2
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new 
AtomicReference<>();
+        AtomicReference<Map<File, String>> fileToCommitRef2 = new 
AtomicReference<>();
+        writeRowsToSink(
+                sinkRef2,
+                fileToCommitRef2,
+                getStagingDir(shareStagingDir),
+                Row.of("b1", 1, "y1"),
+                Row.of("b2", 2, "y2"));
+
+        assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, 
fileToCommitRef2, shareStagingDir);
+    }
+
+    private void writeRowsToSink(
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef,
+            AtomicReference<Map<File, String>> contentRef,
+            Path stagingDir,
+            Row... records)
+            throws Exception {
+        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
+                createSink(false, false, false, stagingDir, new 
LinkedHashMap<>(), sinkRef)) {
+            writeUnorderedRecords(testHarness, Arrays.asList(records));
+            
contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath())));
+        }
+    }
+
+    private Path getStagingDir(boolean shareStagingDir) {
+        String pathPostfix = 
FileSystemTableSink.getStagingPathPostfix(shareStagingDir);
+        return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile());
+    }
+
+    private void assertSinkBehavior(
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef1,
+            AtomicReference<Map<File, String>> fileToCommitRef1,
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef2,
+            AtomicReference<Map<File, String>> fileToCommitRef2,
+            boolean shareStagingDir)
+            throws Exception {
+        Map<File, String> fileToCommit1 = fileToCommitRef1.get();
+        Map<File, String> fileToCommit2 = fileToCommitRef2.get();
+        assertThat(fileToCommit2.keySet()).allMatch(File::exists);
+        if (shareStagingDir) {
+            assertThat(fileToCommit1.keySet()).noneMatch(File::exists);
+        } else {
+            assertThat(fileToCommit1.keySet()).allMatch(File::exists);
+        }
+        sinkRef1.get().finalizeGlobal(finalizationContext);
+        sinkRef2.get().finalizeGlobal(finalizationContext);
+        Map<File, String> committedFiles = getFileContentByPath(outputPath);
+        if (shareStagingDir) {

Review Comment:
   Your understanding is correct. It's just that this test is actually testing 
`FileSystemOutputFormat` for a scenario where some other class (in our case 
`FileSystemTableSink`) has a bug.
   
   This test is fine if you think that `FileSystemOutputFormat` should be able 
to handle this case. But I'm wondering whether it would be more appropriate to 
fail in such a case where multiple instances are accessing the same folder (but 
there's the question how easily a `FileSystemOutputFormat` instance can detect 
something like that. 
   
   Anyway, resolving this test scenario easily indicates that the scope of the 
different classes is not well-defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to