XComp commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1507274806
########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -104,15 +113,101 @@ void testNonPartition() throws Exception { .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStagingDirBehavior(boolean shareStagingDir) throws Exception { + // sink1 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new AtomicReference<>(); + AtomicReference<Map<File, String>> fileToCommitRef1 = new AtomicReference<>(); + writeRowsToSink( + sinkRef1, + fileToCommitRef1, + getStagingDir(shareStagingDir), + Row.of("a1", 1, "x1"), + Row.of("a2", 2, "x2")); + + // sink2 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new AtomicReference<>(); + AtomicReference<Map<File, String>> fileToCommitRef2 = new AtomicReference<>(); + writeRowsToSink( + sinkRef2, + fileToCommitRef2, + getStagingDir(shareStagingDir), + Row.of("b1", 1, "y1"), + Row.of("b2", 2, "y2")); + + assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, fileToCommitRef2, shareStagingDir); + } + + private void writeRowsToSink( + AtomicReference<FileSystemOutputFormat<Row>> sinkRef, + AtomicReference<Map<File, String>> contentRef, + Path stagingDir, + Row... records) + throws Exception { + try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = + createSink(false, false, false, stagingDir, new LinkedHashMap<>(), sinkRef)) { + writeUnorderedRecords(testHarness, Arrays.asList(records)); + contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath()))); + } + } + + private Path getStagingDir(boolean shareStagingDir) { + String pathPostfix = FileSystemTableSink.getStagingPathPostfix(shareStagingDir); + return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile()); + } + + private void assertSinkBehavior( + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1, + AtomicReference<Map<File, String>> fileToCommitRef1, + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2, + AtomicReference<Map<File, String>> fileToCommitRef2, + boolean shareStagingDir) + throws Exception { + Map<File, String> fileToCommit1 = fileToCommitRef1.get(); + Map<File, String> fileToCommit2 = fileToCommitRef2.get(); + assertThat(fileToCommit2.keySet()).allMatch(File::exists); + if (shareStagingDir) { + assertThat(fileToCommit1.keySet()).noneMatch(File::exists); + } else { + assertThat(fileToCommit1.keySet()).allMatch(File::exists); + } + sinkRef1.get().finalizeGlobal(finalizationContext); + sinkRef2.get().finalizeGlobal(finalizationContext); + Map<File, String> committedFiles = getFileContentByPath(outputPath); + if (shareStagingDir) { Review Comment: Your understanding is correct. It's just that this test is actually testing `FileSystemOutputFormat` for a scenario where some other class (in our case `FileSystemTableSink`) has a bug. This test is fine if you think that `FileSystemOutputFormat` should be able to handle this case. But I'm wondering whether it would be more appropriate to fail in such a case where multiple instances are accessing the same folder (but there's the question how easily a `FileSystemOutputFormat` instance can detect something like that. Anyway, resolving this test scenario easily indicates that the scope of the different classes is not well-defined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org