XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507150180


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##########
@@ -386,6 +389,11 @@ private Path toStagingPath() {
         }
     }
 
+    @VisibleForTesting

Review Comment:
   [Looks 
like](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57965&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23032)
 the architecture test don't allow this. :thinking: 



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##########
@@ -374,7 +375,9 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
     }
 
     private Path toStagingPath() {
-        Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+        // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+        // Please see FLINK-29114 for more details
+        Path stagingDir = new Path(path, ".staging_" + 
getStagingPathPostfix(false));
         try {
             FileSystem fs = stagingDir.getFileSystem();
             Preconditions.checkState(

Review Comment:
   The precondition should be updated. Or did I miss something and you don't 
agree with [what we discussed in the other 
PR](https://github.com/LadyForest/flink/pull/1#discussion_r1505972482)?



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
                 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testStagingDirBehavior(boolean shareStagingDir) throws Exception {

Review Comment:
   Generally speaking, unit tests should be as small as possible to focus on 
the specific aspect of the class contract (in this case 
`FileSystemTableSink#toStagingPath` not returning the same folder twice. This 
test adds quite a bit of extra context here which might make it harder to grasp 
the intention for code readers. What's your opinion on that?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##########
@@ -386,6 +389,11 @@ private Path toStagingPath() {
         }
     }
 
+    @VisibleForTesting
+    static String getStagingPathPostfix(boolean constant) {

Review Comment:
   You introduce the parameter `constant` to make the test parameterizable. But 
ideally, you don't want to make production code "to please test code". I feel 
like this `constant` parameter is not necessary (because the production code 
only uses `false`). The decision whether you return a fixed suffix (or if you 
follow the comment above to return the actual path, instead: a fixed path) or 
the random suffix should live in the test code. WDYT?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##########
@@ -374,7 +375,9 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
     }
 
     private Path toStagingPath() {
-        Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+        // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+        // Please see FLINK-29114 for more details
+        Path stagingDir = new Path(path, ".staging_" + 
getStagingPathPostfix(false));

Review Comment:
   Why did you decide on creating the `getStagingPathPostfix` method? This 
creates unnecessary code in the test because we still have to generate the Path 
there. Alternatively, you could move the entire content of `toStagingPath()` 
into a static method `generateStagingPath(Path parent, boolean constant)`. That 
way you just need to call the static method in the test without any path 
resolution.



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
                 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+        // sink1
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new 
AtomicReference<>();
+        AtomicReference<Map<File, String>> fileToCommitRef1 = new 
AtomicReference<>();
+        writeRowsToSink(
+                sinkRef1,
+                fileToCommitRef1,
+                getStagingDir(shareStagingDir),
+                Row.of("a1", 1, "x1"),
+                Row.of("a2", 2, "x2"));
+
+        // sink2
+        AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new 
AtomicReference<>();
+        AtomicReference<Map<File, String>> fileToCommitRef2 = new 
AtomicReference<>();
+        writeRowsToSink(
+                sinkRef2,
+                fileToCommitRef2,
+                getStagingDir(shareStagingDir),
+                Row.of("b1", 1, "y1"),
+                Row.of("b2", 2, "y2"));
+
+        assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, 
fileToCommitRef2, shareStagingDir);
+    }
+
+    private void writeRowsToSink(
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef,
+            AtomicReference<Map<File, String>> contentRef,
+            Path stagingDir,
+            Row... records)
+            throws Exception {
+        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
+                createSink(false, false, false, stagingDir, new 
LinkedHashMap<>(), sinkRef)) {
+            writeUnorderedRecords(testHarness, Arrays.asList(records));
+            
contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath())));
+        }
+    }
+
+    private Path getStagingDir(boolean shareStagingDir) {
+        String pathPostfix = 
FileSystemTableSink.getStagingPathPostfix(shareStagingDir);
+        return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile());
+    }
+
+    private void assertSinkBehavior(
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef1,
+            AtomicReference<Map<File, String>> fileToCommitRef1,
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef2,
+            AtomicReference<Map<File, String>> fileToCommitRef2,
+            boolean shareStagingDir)
+            throws Exception {
+        Map<File, String> fileToCommit1 = fileToCommitRef1.get();
+        Map<File, String> fileToCommit2 = fileToCommitRef2.get();
+        assertThat(fileToCommit2.keySet()).allMatch(File::exists);
+        if (shareStagingDir) {
+            assertThat(fileToCommit1.keySet()).noneMatch(File::exists);
+        } else {
+            assertThat(fileToCommit1.keySet()).allMatch(File::exists);
+        }
+        sinkRef1.get().finalizeGlobal(finalizationContext);
+        sinkRef2.get().finalizeGlobal(finalizationContext);
+        Map<File, String> committedFiles = getFileContentByPath(outputPath);
+        if (shareStagingDir) {

Review Comment:
   I still don't get this test case? Are there scenarios where this is actually 
desired? Isn't that actually what we want to avoid? And essentially we're 
fixing it in this PR to make this scenario never appear? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to