XComp commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1519417563
########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -83,153 +103,175 @@ void after() { @Test void testClosingWithoutInput() throws Exception { try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { + createTestHarness( + false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { testHarness.setup(); testHarness.open(); } } @Test void testNonPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - } - - private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, Object> testHarness) - throws Exception { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + checkWriteAndCommit( + false, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testOverrideNonPartition() throws Exception { testNonPartition(); - - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(true, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + true, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testStaticPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); LinkedHashMap<String, String> staticParts = new LinkedHashMap<>(); staticParts.put("c", "p1"); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, staticParts, ref)) { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1"); - assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + staticParts, + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a3", 3), 1L)), + () -> + Collections.singletonList( + "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n")); } @Test void testDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); - - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent) - .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), entry("c=p2", "a2,2\n")); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); } @Test void testGroupedDynamicPartition() throws Exception { + checkWriteAndCommit( + false, + true, + true, + new LinkedHashMap<>(), + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)), + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); + } + + @Test + void testGetUniqueStagingDirectory() { + final int expectedStagingDirectories = 100; + for (int i = 0; i < expectedStagingDirectories; i++) { + assertDoesNotThrow(() -> createSinkFormat(false, false, false, new LinkedHashMap<>())); + } + assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories); + } + + private void checkWriteAndCommit( + boolean override, + boolean partitioned, + boolean dynamicGrouped, + LinkedHashMap<String, String> staticPartitions, + Supplier<List<StreamRecord<Row>>> inputSupplier, + Supplier<List<String>> outputSupplier) + throws Exception { + List<String> expectedOutput = outputSupplier.get(); + int expectedFileNum = expectedOutput.size(); AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, true, new LinkedHashMap<>(), ref)) { + createTestHarness(override, partitioned, dynamicGrouped, staticPartitions, ref)) { testHarness.setup(); testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); + for (StreamRecord<Row> record : inputSupplier.get()) { + testHarness.processElement(record); + } + assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum); } ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); + assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist(); + + Map<File, String> fileToContent = getFileContentByPath(outputPath); + assertThat(fileToContent).hasSize(expectedFileNum); + if (partitioned) { Review Comment: Why do we handle partitioned output separately? Isn't the non-partitioned output just a special case of the partition output (with a single partition being present)? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java: ########## @@ -197,13 +200,37 @@ public void close() throws IOException { } } + private Path toStagingPath(Path path) { + // Add a random UUID to prevent multiple sinks from sharing the same staging dir. + // Please see FLINK-29114 for more details Review Comment: ```suggestion ``` nit: I'm not sure whether this comment is still necessary. In `FileSystemOutputFormat`, we have a dedicated folder now that is used for this specific instance and is, therefore, a implementation detail that's hidden from the "outside". ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -37,24 +37,39 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; /** Test for {@link FileSystemOutputFormat}. */ class FileSystemOutputFormatTest { - @TempDir private java.nio.file.Path tmpPath; @TempDir private java.nio.file.Path outputPath; private final TestingFinalizationContext finalizationContext = new TestingFinalizationContext(); + private static final Supplier<List<StreamRecord<Row>>> DEFAULT_INPUT_SUPPLIER = + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + + private static final Supplier<List<String>> DEFAULT_OUTPUT_SUPPLIER = + () -> Collections.singletonList("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); Review Comment: ```suggestion () -> Collections.singletonList( createFileContent("a1,1,p1", "a2,2,p1", "a2,2,p2", "a3,3,p1")); private static String createFileContent(String... rows) { return Arrays.stream(rows).collect(Collectors.joining("\n", "", "\n")); } ``` Looks like we could move the file content generation into its own utility method to hide the newline format since it's used in numerous places within this test. WDYT? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java: ########## @@ -197,13 +200,37 @@ public void close() throws IOException { } } + private Path toStagingPath(Path path) { Review Comment: ```suggestion private Path createStagingPath(Path path) { ``` nit: more descriptive name ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -70,6 +85,11 @@ private static Map<File, String> getFileContentByPath(java.nio.file.Path directo return contents; } + private static Map<File, String> getStagingFileContent( Review Comment: I'm wondering whether we actually need to check the staging content here. It's an implementation detail how the data is stored in the folder. What we want to make sure is that the class actually creates its own staging folder (rather than relying on the folder that is passed in through the constructor). That would make most of the refactoring of this test class obsolete, I guess. :thinking: ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -83,153 +103,175 @@ void after() { @Test void testClosingWithoutInput() throws Exception { try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { + createTestHarness( + false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { testHarness.setup(); testHarness.open(); } } @Test void testNonPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - } - - private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, Object> testHarness) - throws Exception { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + checkWriteAndCommit( + false, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testOverrideNonPartition() throws Exception { testNonPartition(); - - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(true, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + true, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testStaticPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); LinkedHashMap<String, String> staticParts = new LinkedHashMap<>(); staticParts.put("c", "p1"); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, staticParts, ref)) { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1"); - assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + staticParts, + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a3", 3), 1L)), + () -> + Collections.singletonList( + "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n")); } @Test void testDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); - - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent) - .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), entry("c=p2", "a2,2\n")); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); } @Test void testGroupedDynamicPartition() throws Exception { + checkWriteAndCommit( + false, + true, + true, + new LinkedHashMap<>(), + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)), + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); + } + + @Test + void testGetUniqueStagingDirectory() { + final int expectedStagingDirectories = 100; + for (int i = 0; i < expectedStagingDirectories; i++) { + assertDoesNotThrow(() -> createSinkFormat(false, false, false, new LinkedHashMap<>())); + } + assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories); + } + + private void checkWriteAndCommit( + boolean override, + boolean partitioned, + boolean dynamicGrouped, + LinkedHashMap<String, String> staticPartitions, + Supplier<List<StreamRecord<Row>>> inputSupplier, + Supplier<List<String>> outputSupplier) + throws Exception { + List<String> expectedOutput = outputSupplier.get(); + int expectedFileNum = expectedOutput.size(); AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, true, new LinkedHashMap<>(), ref)) { + createTestHarness(override, partitioned, dynamicGrouped, staticPartitions, ref)) { testHarness.setup(); testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); + for (StreamRecord<Row> record : inputSupplier.get()) { + testHarness.processElement(record); + } + assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum); } ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); + assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist(); + + Map<File, String> fileToContent = getFileContentByPath(outputPath); + assertThat(fileToContent).hasSize(expectedFileNum); + if (partitioned) { + assertPartitionedOutput(expectedOutput, fileToContent); + } else { + assertThat(fileToContent.values()).containsExactlyInAnyOrderElementsOf(expectedOutput); + } + } - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + "a3,3\n"); - assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + private void assertPartitionedOutput( + List<String> expectedOutput, Map<File, String> fileToContent) { Review Comment: ```suggestion Map<String, String> expectedOutput, Map<File, String> fileToContent) { ``` The expected output being encoded with a colon could be refactored by using a `Map<String, String>` as far as I can see. That would improve the readability of the test code. The sole purpose of the colon is to have the partition info and the content being stored together as far as I can see. ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -83,153 +103,175 @@ void after() { @Test void testClosingWithoutInput() throws Exception { try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { + createTestHarness( + false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { testHarness.setup(); testHarness.open(); } } @Test void testNonPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - } - - private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, Object> testHarness) - throws Exception { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + checkWriteAndCommit( + false, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testOverrideNonPartition() throws Exception { testNonPartition(); - - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(true, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + true, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testStaticPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); LinkedHashMap<String, String> staticParts = new LinkedHashMap<>(); staticParts.put("c", "p1"); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, staticParts, ref)) { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1"); - assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + staticParts, + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a3", 3), 1L)), + () -> + Collections.singletonList( + "c=p1:" + "a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n")); } @Test void testDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); - - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent) - .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), entry("c=p2", "a2,2\n")); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); } @Test void testGroupedDynamicPartition() throws Exception { + checkWriteAndCommit( + false, + true, + true, + new LinkedHashMap<>(), + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)), + () -> Arrays.asList("c=p1:" + "a1,1\n" + "a2,2\n" + "a3,3\n", "c=p2:" + "a2,2\n")); + } + + @Test + void testGetUniqueStagingDirectory() { + final int expectedStagingDirectories = 100; + for (int i = 0; i < expectedStagingDirectories; i++) { + assertDoesNotThrow(() -> createSinkFormat(false, false, false, new LinkedHashMap<>())); + } + assertThat(outputPath.toFile().listFiles()).hasSize(expectedStagingDirectories); + } + + private void checkWriteAndCommit( + boolean override, + boolean partitioned, + boolean dynamicGrouped, + LinkedHashMap<String, String> staticPartitions, + Supplier<List<StreamRecord<Row>>> inputSupplier, + Supplier<List<String>> outputSupplier) + throws Exception { + List<String> expectedOutput = outputSupplier.get(); + int expectedFileNum = expectedOutput.size(); AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, true, new LinkedHashMap<>(), ref)) { + createTestHarness(override, partitioned, dynamicGrouped, staticPartitions, ref)) { testHarness.setup(); testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); + for (StreamRecord<Row> record : inputSupplier.get()) { + testHarness.processElement(record); + } + assertThat(getStagingFileContent(ref)).hasSize(expectedFileNum); } ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); + assertThat(Paths.get(ref.get().getStagingPath().getPath())).doesNotExist(); + + Map<File, String> fileToContent = getFileContentByPath(outputPath); + assertThat(fileToContent).hasSize(expectedFileNum); + if (partitioned) { + assertPartitionedOutput(expectedOutput, fileToContent); + } else { + assertThat(fileToContent.values()).containsExactlyInAnyOrderElementsOf(expectedOutput); + } + } - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent.get("c=p1")).isEqualTo("a1,1\n" + "a2,2\n" + "a3,3\n"); - assertThat(sortedContent.get("c=p2")).isEqualTo("a2,2\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + private void assertPartitionedOutput( + List<String> expectedOutput, Map<File, String> fileToContent) { + final String delimiter = ":"; + Map<String, String> partitionToContent = + fileToContent.entrySet().stream() + .collect( + Collectors.toMap( + e -> e.getKey().getParentFile().getName(), + Map.Entry::getValue)); + + Map<String, String> expectedPartitionContent = + expectedOutput.stream() + .map(r -> r.split(delimiter)) + .collect(Collectors.toMap(splits -> splits[0], splits -> splits[1])); + + assertThat(partitionToContent).containsExactlyInAnyOrderEntriesOf(expectedPartitionContent); } - private OneInputStreamOperatorTestHarness<Row, Object> createSink( + private FileSystemOutputFormat<Row> createSinkFormat( boolean override, boolean partition, boolean dynamicGrouped, - LinkedHashMap<String, String> staticPartitions, - AtomicReference<FileSystemOutputFormat<Row>> sinkRef) - throws Exception { + LinkedHashMap<String, String> staticPartitions) { String[] columnNames = new String[] {"a", "b", "c"}; String[] partitionColumns = partition ? new String[] {"c"} : new String[0]; + Path path = new Path(outputPath.toString()); + TableMetaStoreFactory msFactory = new FileSystemCommitterTest.TestMetaStoreFactory(path); + return new FileSystemOutputFormat.Builder<Row>() + .setMetaStoreFactory(msFactory) + .setPath(path) + .setOverwrite(override) + .setPartitionColumns(partitionColumns) + .setPartitionComputer( + new RowPartitionComputer("default", columnNames, partitionColumns)) + .setFormatFactory(TextOutputFormat::new) + .setDynamicGrouped(dynamicGrouped) + .setStaticPartitions(staticPartitions) + .build(); + } - TableMetaStoreFactory msFactory = - new FileSystemCommitterTest.TestMetaStoreFactory(new Path(outputPath.toString())); + private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness( + boolean override, + boolean partition, + boolean dynamicGrouped, + LinkedHashMap<String, String> staticPartitions, + AtomicReference<FileSystemOutputFormat<Row>> sinkRef) + throws Exception { Review Comment: ```suggestion private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness( OutputFormat<Row> outputFormat) throws Exception { ``` The `AtomicReference` is actually not needed here. You're forwarding all the parameters of this method to `createSinkFormat` and pass the `AtomicReference` to capture the created instance. You could just create the `FileSystemOutputFormat` instance outside of this method and pass it into it achieving the same outcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org