LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1520696809


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -83,153 +103,175 @@ void after() {
     @Test
     void testClosingWithoutInput() throws Exception {
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+                createTestHarness(
+                        false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
             testHarness.setup();
             testHarness.open();
         }
     }
 
     @Test
     void testNonPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-    }
-
-    private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, 
Object> testHarness)
-            throws Exception {
-        testHarness.setup();
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+        checkWriteAndCommit(
+                false,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testOverrideNonPartition() throws Exception {
         testNonPartition();
-
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                true,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testStaticPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
         staticParts.put("c", "p1");
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, staticParts, ref)) {
-            testHarness.setup();
-            testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
 
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-        assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                staticParts,
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a3", 3), 1L)),
+                () ->
+                        Collections.singletonList(
+                                "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + 
"a3,3\n"));
     }
 
     @Test
     void testDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
-
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent)
-                .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), 
entry("c=p2", "a2,2\n"));
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
     }
 
     @Test
     void testGroupedDynamicPartition() throws Exception {
+        checkWriteAndCommit(
+                false,
+                true,
+                true,
+                new LinkedHashMap<>(),
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a3", 3, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)),
+                () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", 
"c=p2:" + "a2,2\n"));
+    }
+
+    @Test
+    void testGetUniqueStagingDirectory() {
+        final int expectedStagingDirectories = 100;
+        for (int i = 0; i < expectedStagingDirectories; i++) {
+            assertDoesNotThrow(() -> createSinkFormat(false, false, false, new 
LinkedHashMap<>()));
+        }
+        
assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories);
+    }
+
+    private void checkWriteAndCommit(
+            boolean override,
+            boolean partitioned,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            Supplier<List<StreamRecord<Row>>> inputSupplier,
+            Supplier<List<String>> outputSupplier)
+            throws Exception {
+        List<String> expectedOutput = outputSupplier.get();
+        int expectedFileNum = expectedOutput.size();
         AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, true, new LinkedHashMap<>(), ref)) {
+                createTestHarness(override, partitioned, dynamicGrouped, 
staticPartitions, ref)) {
             testHarness.setup();
             testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p2"), 1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
+            for (StreamRecord<Row> record : inputSupplier.get()) {
+                testHarness.processElement(record);
+            }
+            assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum);
         }
 
         ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
+        
assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist();
+
+        Map<File, String> fileToContent = getFileContentByPath(outputPath);
+        assertThat(fileToContent).hasSize(expectedFileNum);
+        if (partitioned) {

Review Comment:
   For non-partitioned tables, the output file is directly under the 
outputPath, like `outputPath/part-xxx`. For partitioned tables, each partition 
will have its own subdirectory, such as `outputPath/year=2024/part-xxx`. I 
understand that the purpose of this distinction is to ensure that when writing 
to multiple partitions at the same time, the data for each partition is as 
expected during testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to