XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521036729


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -83,157 +110,193 @@ void after() {
     @Test
     void testClosingWithoutInput() throws Exception {
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+                createTestHarness(createSinkFormat(false, false, false, new 
LinkedHashMap<>()))) {
             testHarness.setup();
             testHarness.open();
         }
     }
 
     @Test
     void testNonPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-    }
-
-    private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, 
Object> testHarness)
-            throws Exception {
-        testHarness.setup();
-        testHarness.open();
-
-        testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-        testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+        checkWriteAndCommit(
+                false,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testOverrideNonPartition() throws Exception {
         testNonPartition();
-
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        assertThat(content.values())
-                .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                true,
+                false,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                DEFAULT_OUTPUT_SUPPLIER);
     }
 
     @Test
     void testStaticPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
         LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
         staticParts.put("c", "p1");
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, staticParts, ref)) {
-            testHarness.setup();
-            testHarness.open();
-
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-        }
 
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        assertThat(content).hasSize(1);
-        
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-        assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                staticParts,
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a2", 2), 1L),
+                                new StreamRecord<>(Row.of("a3", 3), 1L)),
+                () ->
+                        Collections.singletonMap(
+                                "c=p1", createFileContent("a1,1", "a2,2", 
"a2,2", "a3,3")));
     }
 
     @Test
     void testDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
-        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, false, new LinkedHashMap<>(), ref)) {
-            writeUnorderedRecords(testHarness);
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
-        }
-
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
-
-        assertThat(sortedContent).hasSize(2);
-        assertThat(sortedContent)
-                .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), 
entry("c=p2", "a2,2\n"));
-        assertThat(new File(tmpPath.toUri())).doesNotExist();
+        checkWriteAndCommit(
+                false,
+                true,
+                false,
+                new LinkedHashMap<>(),
+                DEFAULT_INPUT_SUPPLIER,
+                () ->
+                        ImmutableMap.of(
+                                "c=p1",
+                                createFileContent("a1,1", "a2,2", "a3,3"),
+                                "c=p2",
+                                createFileContent("a2,2")));
     }
 
     @Test
     void testGroupedDynamicPartition() throws Exception {
-        AtomicReference<FileSystemOutputFormat<Row>> ref = new 
AtomicReference<>();
+        checkWriteAndCommit(
+                false,
+                true,
+                true,
+                new LinkedHashMap<>(),
+                () ->
+                        Arrays.asList(
+                                new StreamRecord<>(Row.of("a1", 1, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a3", 3, "p1"), 1L),
+                                new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)),
+                () ->
+                        ImmutableMap.of(
+                                "c=p1",
+                                createFileContent("a1,1", "a2,2", "a3,3"),
+                                "c=p2",
+                                createFileContent("a2,2")));
+    }
+
+    @Test
+    void testGetUniqueStagingDirectory() throws IOException {
+        final Path alreadyExistingStagingDir = new 
Path(outputPath.toFile().getAbsolutePath());
+        
assertThat(alreadyExistingStagingDir.getFileSystem().exists(alreadyExistingStagingDir))
+                .as("The staging folder should already exist.")
+                .isTrue();
+
+        final FileSystemOutputFormat.Builder<Row> builder =
+                new FileSystemOutputFormat.Builder<Row>()
+                        .setPartitionColumns(new String[0])
+                        .setFormatFactory(TextOutputFormat::new)
+                        .setMetaStoreFactory(
+                                new 
FileSystemCommitterTest.TestMetaStoreFactory(
+                                        new 
Path(outputPath.toFile().getAbsolutePath())))
+                        .setPartitionComputer(
+                                new RowPartitionComputer("default", new 
String[0], new String[0]))
+                        .setStagingPath(alreadyExistingStagingDir);
+
+        assertThatThrownBy(builder::build)
+                .as("Reusing a folder should cause an error.")
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkWriteAndCommit(
+            boolean override,
+            boolean partitioned,
+            boolean dynamicGrouped,
+            LinkedHashMap<String, String> staticPartitions,
+            Supplier<List<StreamRecord<Row>>> inputSupplier,
+            Supplier<?> outputSupplier)
+            throws Exception {
+        Object expectedOutput = outputSupplier.get();
+        int expectedFileNum =
+                (partitioned)
+                        ? ((Map<String, String>) expectedOutput).size()
+                        : ((List<String>) expectedOutput).size();
+        FileSystemOutputFormat<Row> outputFormat =
+                createSinkFormat(override, partitioned, dynamicGrouped, 
staticPartitions);
         try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
-                createSink(false, true, true, new LinkedHashMap<>(), ref)) {
+                createTestHarness(outputFormat)) {
             testHarness.setup();
             testHarness.open();
+            for (StreamRecord<Row> record : inputSupplier.get()) {
+                testHarness.processElement(record);
+            }
+            
assertThat(getStagingFileContent(outputFormat)).hasSize(expectedFileNum);
+        }
+
+        outputFormat.finalizeGlobal(finalizationContext);
+        
assertThat(Paths.get(outputFormat.getStagingPath().getPath())).doesNotExist();
 
-            testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, 
"p1"), 1L));
-            testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, 
"p2"), 1L));
-            assertThat(getFileContentByPath(tmpPath)).hasSize(2);
+        Map<File, String> fileToContent = getFileContentByPath(outputPath);
+        assertThat(fileToContent).hasSize(expectedFileNum);
+        if (partitioned) {
+            assertPartitionedOutput((Map<String, String>) expectedOutput, 
fileToContent);
+        } else {
+            assertThat(fileToContent.values())
+                    .containsExactlyInAnyOrderElementsOf((List<String>) 
expectedOutput);
         }
+    }
 
-        ref.get().finalizeGlobal(finalizationContext);
-        Map<File, String> content = getFileContentByPath(outputPath);
-        Map<String, String> sortedContent = new TreeMap<>();
-        content.forEach((file, s) -> 
sortedContent.put(file.getParentFile().getName(), s));
+    private void assertPartitionedOutput(

Review Comment:
   nit: that method could be removed and inlined in the `if (partitioned)` 
block of the calling method to improve readbility. 



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########
@@ -70,6 +85,11 @@ private static Map<File, String> 
getFileContentByPath(java.nio.file.Path directo
         return contents;
     }
 
+    private static Map<File, String> getStagingFileContent(

Review Comment:
   hm, the existence and non-existence of files in the staging folder can be 
verified without revealing the staging directory. I guess, the mistake we made 
is that we reuse the `outputPath` temporary folder in for the 
`TestMetaStoreFactory` and the `FileSystemOutputFormat`. If we would use two 
separate folders (like it was done before our change), we could just check 
recursively that, indeed, the expected amount of intermediate files is created 
by the `FileSystemOutputFormat` and that all content is deleted after the 
`finalizeGlobal` call in `checkWriteAndCommt`.
   
   The reason I am stressing on that part is that I'd like to remove the 
`FileSystemOutputFormat#getStagingFolder` method. That seems to reveal internal 
state without a reason. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to