XComp commented on code in PR #24390: URL: https://github.com/apache/flink/pull/24390#discussion_r1507150180
########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java: ########## @@ -386,6 +389,11 @@ private Path toStagingPath() { } } + @VisibleForTesting Review Comment: [Looks like](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57965&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23032) the architecture test don't allow this. :thinking: ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java: ########## @@ -374,7 +375,9 @@ public DynamicTableSource.DataStructureConverter createDataStructureConverter( } private Path toStagingPath() { - Path stagingDir = new Path(path, ".staging_" + System.currentTimeMillis()); + // Add a random UUID to prevent multiple sinks from sharing the same staging dir. + // Please see FLINK-29114 for more details + Path stagingDir = new Path(path, ".staging_" + getStagingPathPostfix(false)); try { FileSystem fs = stagingDir.getFileSystem(); Preconditions.checkState( Review Comment: The precondition should be updated. Or did I miss something and you don't agree with [what we discussed in the other PR](https://github.com/LadyForest/flink/pull/1#discussion_r1505972482)? ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -104,15 +113,101 @@ void testNonPartition() throws Exception { .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStagingDirBehavior(boolean shareStagingDir) throws Exception { Review Comment: Generally speaking, unit tests should be as small as possible to focus on the specific aspect of the class contract (in this case `FileSystemTableSink#toStagingPath` not returning the same folder twice. This test adds quite a bit of extra context here which might make it harder to grasp the intention for code readers. What's your opinion on that? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java: ########## @@ -386,6 +389,11 @@ private Path toStagingPath() { } } + @VisibleForTesting + static String getStagingPathPostfix(boolean constant) { Review Comment: You introduce the parameter `constant` to make the test parameterizable. But ideally, you don't want to make production code "to please test code". I feel like this `constant` parameter is not necessary (because the production code only uses `false`). The decision whether you return a fixed suffix (or if you follow the comment above to return the actual path, instead: a fixed path) or the random suffix should live in the test code. WDYT? ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java: ########## @@ -374,7 +375,9 @@ public DynamicTableSource.DataStructureConverter createDataStructureConverter( } private Path toStagingPath() { - Path stagingDir = new Path(path, ".staging_" + System.currentTimeMillis()); + // Add a random UUID to prevent multiple sinks from sharing the same staging dir. + // Please see FLINK-29114 for more details + Path stagingDir = new Path(path, ".staging_" + getStagingPathPostfix(false)); Review Comment: Why did you decide on creating the `getStagingPathPostfix` method? This creates unnecessary code in the test because we still have to generate the Path there. Alternatively, you could move the entire content of `toStagingPath()` into a static method `generateStagingPath(Path parent, boolean constant)`. That way you just need to call the static method in the test without any path resolution. ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java: ########## @@ -104,15 +113,101 @@ void testNonPartition() throws Exception { .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStagingDirBehavior(boolean shareStagingDir) throws Exception { + // sink1 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new AtomicReference<>(); + AtomicReference<Map<File, String>> fileToCommitRef1 = new AtomicReference<>(); + writeRowsToSink( + sinkRef1, + fileToCommitRef1, + getStagingDir(shareStagingDir), + Row.of("a1", 1, "x1"), + Row.of("a2", 2, "x2")); + + // sink2 + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new AtomicReference<>(); + AtomicReference<Map<File, String>> fileToCommitRef2 = new AtomicReference<>(); + writeRowsToSink( + sinkRef2, + fileToCommitRef2, + getStagingDir(shareStagingDir), + Row.of("b1", 1, "y1"), + Row.of("b2", 2, "y2")); + + assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, fileToCommitRef2, shareStagingDir); + } + + private void writeRowsToSink( + AtomicReference<FileSystemOutputFormat<Row>> sinkRef, + AtomicReference<Map<File, String>> contentRef, + Path stagingDir, + Row... records) + throws Exception { + try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = + createSink(false, false, false, stagingDir, new LinkedHashMap<>(), sinkRef)) { + writeUnorderedRecords(testHarness, Arrays.asList(records)); + contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath()))); + } + } + + private Path getStagingDir(boolean shareStagingDir) { + String pathPostfix = FileSystemTableSink.getStagingPathPostfix(shareStagingDir); + return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile()); + } + + private void assertSinkBehavior( + AtomicReference<FileSystemOutputFormat<Row>> sinkRef1, + AtomicReference<Map<File, String>> fileToCommitRef1, + AtomicReference<FileSystemOutputFormat<Row>> sinkRef2, + AtomicReference<Map<File, String>> fileToCommitRef2, + boolean shareStagingDir) + throws Exception { + Map<File, String> fileToCommit1 = fileToCommitRef1.get(); + Map<File, String> fileToCommit2 = fileToCommitRef2.get(); + assertThat(fileToCommit2.keySet()).allMatch(File::exists); + if (shareStagingDir) { + assertThat(fileToCommit1.keySet()).noneMatch(File::exists); + } else { + assertThat(fileToCommit1.keySet()).allMatch(File::exists); + } + sinkRef1.get().finalizeGlobal(finalizationContext); + sinkRef2.get().finalizeGlobal(finalizationContext); + Map<File, String> committedFiles = getFileContentByPath(outputPath); + if (shareStagingDir) { Review Comment: I still don't get this test case? Are there scenarios where this is actually desired? Isn't that actually what we want to avoid? And essentially we're fixing it in this PR to make this scenario never appear? :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org