LadyForest commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1520696809
########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -83,153 +103,175 @@ void after() { @Test void testClosingWithoutInput() throws Exception { try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { + createTestHarness( + false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { testHarness.setup(); testHarness.open(); } } @Test void testNonPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - } - - private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, Object> testHarness) - throws Exception { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + checkWriteAndCommit( + false, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testOverrideNonPartition() throws Exception { testNonPartition(); - - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(true, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + true, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testStaticPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); LinkedHashMap<String, String> staticParts = new LinkedHashMap<>(); staticParts.put("c", "p1"); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, staticParts, ref)) { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1"); - assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + staticParts, + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a3", 3), 1L)), + () -> + Collections.singletonList( + "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n")); } @Test void testDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); - - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent) - .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), entry("c=p2", "a2,2\n")); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); } @Test void testGroupedDynamicPartition() throws Exception { + checkWriteAndCommit( + false, + true, + true, + new LinkedHashMap<>(), + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)), + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); + } + + @Test + void testGetUniqueStagingDirectory() { + final int expectedStagingDirectories = 100; + for (int i = 0; i < expectedStagingDirectories; i++) { + assertDoesNotThrow(() -> createSinkFormat(false, false, false, new LinkedHashMap<>())); + } + assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories); + } + + private void checkWriteAndCommit( + boolean override, + boolean partitioned, + boolean dynamicGrouped, + LinkedHashMap<String, String> staticPartitions, + Supplier<List<StreamRecord<Row>>> inputSupplier, + Supplier<List<String>> outputSupplier) + throws Exception { + List<String> expectedOutput = outputSupplier.get(); + int expectedFileNum = expectedOutput.size(); AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, true, new LinkedHashMap<>(), ref)) { + createTestHarness(override, partitioned, dynamicGrouped, staticPartitions, ref)) { testHarness.setup(); testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); + for (StreamRecord<Row> record : inputSupplier.get()) { + testHarness.processElement(record); + } + assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum); } ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); + assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist(); + + Map<File, String> fileToContent = getFileContentByPath(outputPath); + assertThat(fileToContent).hasSize(expectedFileNum); + if (partitioned) { Review Comment: For non-partitioned tables, the output file is directly under the outputPath, like `outputPath/part-xxx`. For partitioned tables, each partition will have its own subdirectory, such as `outputPath/year=2024/part-xxx`. I understand that the purpose of this distinction is to ensure that when writing to multiple partitions at the same time, the data for each partition is as expected during testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org