XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1519417563


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -83,153 +103,175 @@ void after() {
     @Test
     void testClosingWithoutInput() throws Exception {
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+                createTestHarness(
+                        false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
             testHarness.setup();
             testHarness.open();
         }
     }
 
     @Test
     void testNonPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-    }
-
-    private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, 
Object> testHarness)
-            throws Exception {
-        testHarness.setup();
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+        checkWriteAndCommit(
+                false,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testOverrideNonPartition() throws Exception {
         testNonPartition();
-
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                true,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testStaticPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
         staticParts.put("c", "p1");
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, staticParts, ref)) {
-            testHarness.setup();
-            testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
 
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-        assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                staticParts,
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a3", 3), 1L)),
+                () ->
+                        Collections.singletonList(
+                                "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + 
"a3,3\n"));
     }
 
     @Test
     void testDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
-
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent)
-                .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), 
entry("c=p2", "a2,2\n"));
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
     }
 
     @Test
     void testGroupedDynamicPartition() throws Exception {
+        checkWriteAndCommit(
+                false,
+                true,
+                true,
+                new LinkedHashMap<>(),
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a3", 3, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)),
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
+    }
+
+    @Test
+    void testGetUniqueStagingDirectory() {
+        final int expectedStagingDirectories = 100;
+        for (int i = 0; i < expectedStagingDirectories; i++) {
+            assertDoesNotThrow(() -> createSinkFormat(false, false, false, new 
LinkedHashMap<>()));
+        }
+        
assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories);
+    }
+
+    private void checkWriteAndCommit(
+            boolean override,
+            boolean partitioned,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            Supplier<List<StreamRecord<Row>>> inputSupplier,
+            Supplier<List<String>> outputSupplier)
+            throws Exception {
+        List<String> expectedOutput = outputSupplier.get();
+        int expectedFileNum = expectedOutput.size();
         AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, true, new LinkedHashMap<>(), ref)) {
+                createTestHarness(override, partitioned, dynamicGrouped, 
staticPartitions, ref)) {
             testHarness.setup();
             testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p2"), 1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
+            for (StreamRecord<Row> record : inputSupplier.get()) {
+                testHarness.processElement(record);
+            }
+            assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum);
         }
 
         ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
+        
assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist();
+
+        Map<File, String> fileToContent = getFileContentByPath(outputPath);
+        assertThat(fileToContent).hasSize(expectedFileNum);
+        if (partitioned) {

Review Comment:
   Why do we handle partitioned output separately? Isn't the non-partitioned 
output just a special case of the partition output (with a single partition 
being present)?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##########
@@ -197,13 +200,37 @@ public void close() throws IOException {
         }
     }
 
+    private Path toStagingPath(Path path) {
+        // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+        // Please see FLINK-29114 for more details

Review Comment:
   ```suggestion
   ```
   nit: I'm not sure whether this comment is still necessary. In 
`FileSystemOutputFormat`, we have a dedicated folder now that is used for this 
specific instance and is, therefore, a implementation detail that's hidden from 
the "outside".



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -37,24 +37,39 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 /** Test for {@link FileSystemOutputFormat}. */
 class FileSystemOutputFormatTest {
 
-    @TempDir private java.nio.file.Path tmpPath;
     @TempDir private java.nio.file.Path outputPath;
 
     private final TestingFinalizationContext finalizationContext = new 
TestingFinalizationContext();
 
+    private static final Supplier<List<StreamRecord<Row>>> 
DEFAULT_INPUT_SUPPLIER =
+            () ->
+                    Arrays.asList(
+                            new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                            new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                            new StreamRecord<>(Row.of("a2", 2, "p2"), 1L),
+                            new StreamRecord<>(Row.of("a3", 3, "p1"), 1L));
+
+    private static final Supplier<List<String>> DEFAULT_OUTPUT_SUPPLIER =
+            () -> Collections.singletonList("a1,1,p1\n" + "a2,2,p1\n" + 
"a2,2,p2\n" + "a3,3,p1\n");

Review Comment:
   ```suggestion
                        () ->
                       Collections.singletonList(
                               createFileContent("a1,1,p1", "a2,2,p1", 
"a2,2,p2", "a3,3,p1"));
   
       private static String createFileContent(String... rows) {
           return Arrays.stream(rows).collect(Collectors.joining("\n", "", 
"\n"));
       }
   ```
   Looks like we could move the file content generation into its own utility 
method to hide the newline format since it's used in numerous places within 
this test. WDYT?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##########
@@ -197,13 +200,37 @@ public void close() throws IOException {
         }
     }
 
+    private Path toStagingPath(Path path) {

Review Comment:
   ```suggestion
       private Path createStagingPath(Path path) {
   ```
   nit: more descriptive name



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -70,6 +85,11 @@ private static Map<File, String> 
getFileContentByPath(java.nio.file.Path directo
         return contents;
     }
 
+    private static Map<File, String> getStagingFileContent(

Review Comment:
   I'm wondering whether we actually need to check the staging content here. 
It's an implementation detail how the data is stored in the folder. What we 
want to make sure is that the class actually creates its own staging folder 
(rather than relying on the folder that is passed in through the constructor).
   
   That would make most of the refactoring of this test class obsolete, I 
guess. :thinking: 



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -83,153 +103,175 @@ void after() {
     @Test
     void testClosingWithoutInput() throws Exception {
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+                createTestHarness(
+                        false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
             testHarness.setup();
             testHarness.open();
         }
     }
 
     @Test
     void testNonPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-    }
-
-    private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, 
Object> testHarness)
-            throws Exception {
-        testHarness.setup();
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+        checkWriteAndCommit(
+                false,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testOverrideNonPartition() throws Exception {
         testNonPartition();
-
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                true,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testStaticPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
         staticParts.put("c", "p1");
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, staticParts, ref)) {
-            testHarness.setup();
-            testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
 
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-        assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                staticParts,
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a3", 3), 1L)),
+                () ->
+                        Collections.singletonList(
+                                "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + 
"a3,3\n"));
     }
 
     @Test
     void testDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
-
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent)
-                .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), 
entry("c=p2", "a2,2\n"));
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
     }
 
     @Test
     void testGroupedDynamicPartition() throws Exception {
+        checkWriteAndCommit(
+                false,
+                true,
+                true,
+                new LinkedHashMap<>(),
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a3", 3, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)),
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
+    }
+
+    @Test
+    void testGetUniqueStagingDirectory() {
+        final int expectedStagingDirectories = 100;
+        for (int i = 0; i < expectedStagingDirectories; i++) {
+            assertDoesNotThrow(() -> createSinkFormat(false, false, false, new 
LinkedHashMap<>()));
+        }
+        
assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories);
+    }
+
+    private void checkWriteAndCommit(
+            boolean override,
+            boolean partitioned,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            Supplier<List<StreamRecord<Row>>> inputSupplier,
+            Supplier<List<String>> outputSupplier)
+            throws Exception {
+        List<String> expectedOutput = outputSupplier.get();
+        int expectedFileNum = expectedOutput.size();
         AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, true, new LinkedHashMap<>(), ref)) {
+                createTestHarness(override, partitioned, dynamicGrouped, 
staticPartitions, ref)) {
             testHarness.setup();
             testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p2"), 1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
+            for (StreamRecord<Row> record : inputSupplier.get()) {
+                testHarness.processElement(record);
+            }
+            assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum);
         }
 
         ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
+        
assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist();
+
+        Map<File, String> fileToContent = getFileContentByPath(outputPath);
+        assertThat(fileToContent).hasSize(expectedFileNum);
+        if (partitioned) {
+            assertPartitionedOutput(expectedOutput, fileToContent);
+        } else {
+            
assertThat(fileToContent.values()).containsExactlyInAnyOrderElementsOf(expectedOutput);
+        }
+    }
 
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + 
"a3,3\n");
-        assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+    private void assertPartitionedOutput(
+            List<String> expectedOutput, Map<File, String> fileToContent) {

Review Comment:
   ```suggestion
               Map<String, String> expectedOutput, Map<File, String> 
fileToContent) {
   ```
   The expected output being encoded with a colon could be refactored by using 
a `Map<String, String>` as far as I can see. That would improve the readability 
of the test code. The sole purpose of the colon is to have the partition info 
and the content being stored together as far as I can see.



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -83,153 +103,175 @@ void after() {
     @Test
     void testClosingWithoutInput() throws Exception {
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+                createTestHarness(
+                        false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
             testHarness.setup();
             testHarness.open();
         }
     }
 
     @Test
     void testNonPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-    }
-
-    private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, 
Object> testHarness)
-            throws Exception {
-        testHarness.setup();
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+        checkWriteAndCommit(
+                false,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testOverrideNonPartition() throws Exception {
         testNonPartition();
-
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                true,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testStaticPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
         staticParts.put("c", "p1");
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, staticParts, ref)) {
-            testHarness.setup();
-            testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
 
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-        assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                staticParts,
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a3", 3), 1L)),
+                () ->
+                        Collections.singletonList(
+                                "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + 
"a3,3\n"));
     }
 
     @Test
     void testDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
-
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent)
-                .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), 
entry("c=p2", "a2,2\n"));
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
     }
 
     @Test
     void testGroupedDynamicPartition() throws Exception {
+        checkWriteAndCommit(
+                false,
+                true,
+                true,
+                new LinkedHashMap<>(),
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a3", 3, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)),
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
+    }
+
+    @Test
+    void testGetUniqueStagingDirectory() {
+        final int expectedStagingDirectories = 100;
+        for (int i = 0; i < expectedStagingDirectories; i++) {
+            assertDoesNotThrow(() -> createSinkFormat(false, false, false, new 
LinkedHashMap<>()));
+        }
+        
assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories);
+    }
+
+    private void checkWriteAndCommit(
+            boolean override,
+            boolean partitioned,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            Supplier<List<StreamRecord<Row>>> inputSupplier,
+            Supplier<List<String>> outputSupplier)
+            throws Exception {
+        List<String> expectedOutput = outputSupplier.get();
+        int expectedFileNum = expectedOutput.size();
         AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, true, new LinkedHashMap<>(), ref)) {
+                createTestHarness(override, partitioned, dynamicGrouped, 
staticPartitions, ref)) {
             testHarness.setup();
             testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p2"), 1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
+            for (StreamRecord<Row> record : inputSupplier.get()) {
+                testHarness.processElement(record);
+            }
+            assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum);
         }
 
         ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
+        
assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist();
+
+        Map<File, String> fileToContent = getFileContentByPath(outputPath);
+        assertThat(fileToContent).hasSize(expectedFileNum);
+        if (partitioned) {
+            assertPartitionedOutput(expectedOutput, fileToContent);
+        } else {
+            
assertThat(fileToContent.values()).containsExactlyInAnyOrderElementsOf(expectedOutput);
+        }
+    }
 
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + 
"a3,3\n");
-        assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+    private void assertPartitionedOutput(
+            List<String> expectedOutput, Map<File, String> fileToContent) {
+        final String delimiter = ":";
+        Map<String, String> partitionToContent =
+                fileToContent.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        e -> 
e.getKey().getParentFile().getName(),
+                                        Map.Entry::getValue));
+
+        Map<String, String> expectedPartitionContent =
+                expectedOutput.stream()
+                        .map(r -> r.split(delimiter))
+                        .collect(Collectors.toMap(splits -> splits[0], splits 
-> splits[1]));
+
+        
assertThat(partitionToContent).containsExactlyInAnyOrderEntriesOf(expectedPartitionContent);
     }
 
-    private OneInputStreamOperatorTestHarness<Row, Object> createSink(
+    private FileSystemOutputFormat<Row> createSinkFormat(
             boolean override,
             boolean partition,
             boolean dynamicGrouped,
-            LinkedHashMap<String, String> staticPartitions,
-            AtomicReference<FileSystemOutputFormat<Row>> sinkRef)
-            throws Exception {
+            LinkedHashMap<String, String> staticPartitions) {
         String[] columnNames = new String[] {"a", "b", "c"};
         String[] partitionColumns = partition ? new String[] {"c"} : new 
String[0];
+        Path path = new Path(outputPath.toString());
+        TableMetaStoreFactory msFactory = new 
FileSystemCommitterTest.TestMetaStoreFactory(path);
+        return new FileSystemOutputFormat.Builder<Row>()
+                .setMetaStoreFactory(msFactory)
+                .setPath(path)
+                .setOverwrite(override)
+                .setPartitionColumns(partitionColumns)
+                .setPartitionComputer(
+                        new RowPartitionComputer("default", columnNames, 
partitionColumns))
+                .setFormatFactory(TextOutputFormat::new)
+                .setDynamicGrouped(dynamicGrouped)
+                .setStaticPartitions(staticPartitions)
+                .build();
+    }
 
-        TableMetaStoreFactory msFactory =
-                new FileSystemCommitterTest.TestMetaStoreFactory(new 
Path(outputPath.toString()));
+    private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness(
+            boolean override,
+            boolean partition,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            AtomicReference<FileSystemOutputFormat<Row>> sinkRef)
+            throws Exception {

Review Comment:
   ```suggestion
       private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness(
               OutputFormat<Row> outputFormat)
               throws Exception {
   ```
   The `AtomicReference` is actually not needed here. You're forwarding all the 
parameters of this method to `createSinkFormat` and pass the `AtomicReference` 
to capture the created instance. You could just create the 
`FileSystemOutputFormat` instance outside of this method and pass it into it 
achieving the same outcome.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to