LadyForest commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1507187446
########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -104,15 +113,101 @@ void testNonPartition() throws Exception { .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStagingDirBehavior(boolean shareStagingDir) throws Exception { + // sink1 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new AtomicReference<>(); + AtomicReference<Map<File, String>> fileToCommitRef1 = new AtomicReference<>(); + writeRowsToSink( + sinkRef1, + fileToCommitRef1, + getStagingDir(shareStagingDir), + Row.of("a1", 1, "x1"), + Row.of("a2", 2, "x2")); + + // sink2 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new AtomicReference<>(); + AtomicReference<Map<File, String>> fileToCommitRef2 = new AtomicReference<>(); + writeRowsToSink( + sinkRef2, + fileToCommitRef2, + getStagingDir(shareStagingDir), + Row.of("b1", 1, "y1"), + Row.of("b2", 2, "y2")); + + assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, fileToCommitRef2, shareStagingDir); + } + + private void writeRowsToSink( + AtomicReference<FileSystemOutputFormat<Row>> sinkRef, + AtomicReference<Map<File, String>> contentRef, + Path stagingDir, + Row... records) + throws Exception { + try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = + createSink(false, false, false, stagingDir, new LinkedHashMap<>(), sinkRef)) { + writeUnorderedRecords(testHarness, Arrays.asList(records)); + contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath()))); + } + } + + private Path getStagingDir(boolean shareStagingDir) { + String pathPostfix = FileSystemTableSink.getStagingPathPostfix(shareStagingDir); + return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile()); + } + + private void assertSinkBehavior( + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1, + AtomicReference<Map<File, String>> fileToCommitRef1, + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2, + AtomicReference<Map<File, String>> fileToCommitRef2, + boolean shareStagingDir) + throws Exception { + Map<File, String> fileToCommit1 = fileToCommitRef1.get(); + Map<File, String> fileToCommit2 = fileToCommitRef2.get(); + assertThat(fileToCommit2.keySet()).allMatch(File::exists); + if (shareStagingDir) { + assertThat(fileToCommit1.keySet()).noneMatch(File::exists); + } else { + assertThat(fileToCommit1.keySet()).allMatch(File::exists); + } + sinkRef1.get().finalizeGlobal(finalizationContext); + sinkRef2.get().finalizeGlobal(finalizationContext); + Map<File, String> committedFiles = getFileContentByPath(outputPath); + if (shareStagingDir) { Review Comment: I understand it this way, and please correct me if I'm wrong: When verifying the correctness of a fix, it's common to build some test cases to reliably reproduce the issue, then propose a solution to address that problem, and rerun the tests to ensure that the fix is appropriate and has not introduced any new issues. Coming back to this case, since `System.currentTimeMillis()` is a non-deterministic function in the original method, in order to reproduce the issue stably, it is necessary to make it return a constant value, which is why an additional boolean variable was introduced. However, after noticing your comments in another pull request, I think your approach is more reasonable, as it would throw an exception at compile time rather than delaying the problem to runtime, which could cause unexpected consequences. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org