XComp commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1521036729
########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -83,157 +110,193 @@ void after() { @Test void testClosingWithoutInput() throws Exception { try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), new AtomicReference<>())) { + createTestHarness(createSinkFormat(false, false, false, new LinkedHashMap<>()))) { testHarness.setup(); testHarness.open(); } } @Test void testNonPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - } - - private void writeUnorderedRecords(OneInputStreamOperatorTestHarness<Row, Object> testHarness) - throws Exception { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); + checkWriteAndCommit( + false, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testOverrideNonPartition() throws Exception { testNonPartition(); - - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(true, false, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.values()) - .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + true, + false, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + DEFAULT_OUTPUT_SUPPLIER); } @Test void testStaticPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); LinkedHashMap<String, String> staticParts = new LinkedHashMap<>(); staticParts.put("c", "p1"); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, staticParts, ref)) { - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(1); - } - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - assertThat(content).hasSize(1); - assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1"); - assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + "a2,2\n" + "a3,3\n"); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + staticParts, + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a2", 2), 1L), + new StreamRecord<>(Row.of("a3", 3), 1L)), + () -> + Collections.singletonMap( + "c=p1", createFileContent("a1,1", "a2,2", "a2,2", "a3,3"))); } @Test void testDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); - try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, false, new LinkedHashMap<>(), ref)) { - writeUnorderedRecords(testHarness); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); - } - - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); - - assertThat(sortedContent).hasSize(2); - assertThat(sortedContent) - .contains(entry("c=p1", "a1,1\n" + "a2,2\n" + "a3,3\n"), entry("c=p2", "a2,2\n")); - assertThat(new File(tmpPath.toUri())).doesNotExist(); + checkWriteAndCommit( + false, + true, + false, + new LinkedHashMap<>(), + DEFAULT_INPUT_SUPPLIER, + () -> + ImmutableMap.of( + "c=p1", + createFileContent("a1,1", "a2,2", "a3,3"), + "c=p2", + createFileContent("a2,2"))); } @Test void testGroupedDynamicPartition() throws Exception { - AtomicReference<FileSystemOutputFormat<Row>> ref = new AtomicReference<>(); + checkWriteAndCommit( + false, + true, + true, + new LinkedHashMap<>(), + () -> + Arrays.asList( + new StreamRecord<>(Row.of("a1", 1, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p1"), 1L), + new StreamRecord<>(Row.of("a3", 3, "p1"), 1L), + new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)), + () -> + ImmutableMap.of( + "c=p1", + createFileContent("a1,1", "a2,2", "a3,3"), + "c=p2", + createFileContent("a2,2"))); + } + + @Test + void testGetUniqueStagingDirectory() throws IOException { + final Path alreadyExistingStagingDir = new Path(outputPath.toFile().getAbsolutePath()); + assertThat(alreadyExistingStagingDir.getFileSystem().exists(alreadyExistingStagingDir)) + .as("The staging folder should already exist.") + .isTrue(); + + final FileSystemOutputFormat.Builder<Row> builder = + new FileSystemOutputFormat.Builder<Row>() + .setPartitionColumns(new String[0]) + .setFormatFactory(TextOutputFormat::new) + .setMetaStoreFactory( + new FileSystemCommitterTest.TestMetaStoreFactory( + new Path(outputPath.toFile().getAbsolutePath()))) + .setPartitionComputer( + new RowPartitionComputer("default", new String[0], new String[0])) + .setStagingPath(alreadyExistingStagingDir); + + assertThatThrownBy(builder::build) + .as("Reusing a folder should cause an error.") + .isInstanceOf(IllegalStateException.class); + } + + @SuppressWarnings("unchecked") + private void checkWriteAndCommit( + boolean override, + boolean partitioned, + boolean dynamicGrouped, + LinkedHashMap<String, String> staticPartitions, + Supplier<List<StreamRecord<Row>>> inputSupplier, + Supplier<?> outputSupplier) + throws Exception { + Object expectedOutput = outputSupplier.get(); + int expectedFileNum = + (partitioned) + ? ((Map<String, String>) expectedOutput).size() + : ((List<String>) expectedOutput).size(); + FileSystemOutputFormat<Row> outputFormat = + createSinkFormat(override, partitioned, dynamicGrouped, staticPartitions); try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = - createSink(false, true, true, new LinkedHashMap<>(), ref)) { + createTestHarness(outputFormat)) { testHarness.setup(); testHarness.open(); + for (StreamRecord<Row> record : inputSupplier.get()) { + testHarness.processElement(record); + } + assertThat(getStagingFileContent(outputFormat)).hasSize(expectedFileNum); + } + + outputFormat.finalizeGlobal(finalizationContext); + assertThat(Paths.get(outputFormat.getStagingPath().getPath())).doesNotExist(); - testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 1L)); - testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 1L)); - assertThat(getFileContentByPath(tmpPath)).hasSize(2); + Map<File, String> fileToContent = getFileContentByPath(outputPath); + assertThat(fileToContent).hasSize(expectedFileNum); + if (partitioned) { + assertPartitionedOutput((Map<String, String>) expectedOutput, fileToContent); + } else { + assertThat(fileToContent.values()) + .containsExactlyInAnyOrderElementsOf((List<String>) expectedOutput); } + } - ref.get().finalizeGlobal(finalizationContext); - Map<File, String> content = getFileContentByPath(outputPath); - Map<String, String> sortedContent = new TreeMap<>(); - content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s)); + private void assertPartitionedOutput( Review Comment: nit: that method could be removed and inlined in the `if (partitioned)` block of the calling method to improve readbility. ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -70,6 +85,11 @@ private static Map<File, String> getFileContentByPath(java.nio.file.Path directo return contents; } + private static Map<File, String> getStagingFileContent( Review Comment: hm, the existence and non-existence of files in the staging folder can be verified without revealing the staging directory. I guess, the mistake we made is that we reuse the `outputPath` temporary folder in for the `TestMetaStoreFactory` and the `FileSystemOutputFormat`. If we would use two separate folders (like it was done before our change), we could just check recursively that, indeed, the expected amount of intermediate files is created by the `FileSystemOutputFormat` and that all content is deleted after the `finalizeGlobal` call in `checkWriteAndCommt`. The reason I am stressing on that part is that I'd like to remove the `FileSystemOutputFormat#getStagingFolder` method. That seems to reveal internal state without a reason. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org