LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507187446


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
                 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+        // sink1
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new 
AtomicReference<>();
+        AtomicReference<Map<File, String>> fileToCommitRef1 = new 
AtomicReference<>();
+        writeRowsToSink(
+                sinkRef1,
+                fileToCommitRef1,
+                getStagingDir(shareStagingDir),
+                Row.of("a1", 1, "x1"),
+                Row.of("a2", 2, "x2"));
+
+        // sink2
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new 
AtomicReference<>();
+        AtomicReference<Map<File, String>> fileToCommitRef2 = new 
AtomicReference<>();
+        writeRowsToSink(
+                sinkRef2,
+                fileToCommitRef2,
+                getStagingDir(shareStagingDir),
+                Row.of("b1", 1, "y1"),
+                Row.of("b2", 2, "y2"));
+
+        assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, 
fileToCommitRef2, shareStagingDir);
+    }
+
+    private void writeRowsToSink(
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef,
+            AtomicReference<Map<File, String>> contentRef,
+            Path stagingDir,
+            Row... records)
+            throws Exception {
+        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
+                createSink(false, false, false, stagingDir, new 
LinkedHashMap<>(), sinkRef)) {
+            writeUnorderedRecords(testHarness, Arrays.asList(records));
+            
contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath())));
+        }
+    }
+
+    private Path getStagingDir(boolean shareStagingDir) {
+        String pathPostfix = 
FileSystemTableSink.getStagingPathPostfix(shareStagingDir);
+        return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile());
+    }
+
+    private void assertSinkBehavior(
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef1,
+            AtomicReference<Map<File, String>> fileToCommitRef1,
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef2,
+            AtomicReference<Map<File, String>> fileToCommitRef2,
+            boolean shareStagingDir)
+            throws Exception {
+        Map<File, String> fileToCommit1 = fileToCommitRef1.get();
+        Map<File, String> fileToCommit2 = fileToCommitRef2.get();
+        assertThat(fileToCommit2.keySet()).allMatch(File::exists);
+        if (shareStagingDir) {
+            assertThat(fileToCommit1.keySet()).noneMatch(File::exists);
+        } else {
+            assertThat(fileToCommit1.keySet()).allMatch(File::exists);
+        }
+        sinkRef1.get().finalizeGlobal(finalizationContext);
+        sinkRef2.get().finalizeGlobal(finalizationContext);
+        Map<File, String> committedFiles = getFileContentByPath(outputPath);
+        if (shareStagingDir) {

Review Comment:
   I understand it this way, and please correct me if I'm wrong: When verifying 
the correctness of a fix, it's common to build some test cases to reliably 
reproduce the issue, then propose a solution to address that problem, and rerun 
the tests to ensure that the fix is appropriate and has not introduced any new 
issues. Coming back to this case, since `System.currentTimeMillis()` is a 
non-deterministic function in the original method, in order to reproduce the 
issue stably, it is necessary to make it return a constant value, which is why 
an additional boolean variable was introduced. However, after noticing your 
comments in another pull request, I think your approach is more reasonable, as 
it would throw an exception at compile time rather than delaying the problem to 
runtime, which could cause unexpected consequences.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to