LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505864009


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##########


Review Comment:
   > > [...] the issue is triggered when the 
FileSystemOutputFormat#finalizeGlobal method is called (triggered by the final 
block).
   > 
   > I cannot confirm that. The change code segment isn't covered by the newly 
added tests. As said, reverting your fix in `FileSystemTableSink` doesn't make 
any of the tests fail.
   
   You can verify the root cause by the following steps.
   
   1. temporarily make the toStagingPath a static method and modify the test 
case like the following snippet.
   ```java
       @Test
       void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws 
Exception {
           // sink1
           AtomicReference<FileSystemOutputFormat<Row>> sinkRef1 = new 
AtomicReference<>();
           AtomicReference<File> fileToCommitRef1 = new AtomicReference<>();
           AtomicReference<String> contentRef1 = new AtomicReference<>();
           try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
                   createSink(
                           false,
                           false,
                           false,
                           
FileSystemTableSink.toStagingPath(Path.fromLocalFile(tmpPath.toFile())),
                           new LinkedHashMap<>(),
                           sinkRef1)) {
               writeUnorderedRecords(testHarness);
               Map<File, String> content = getFileContentByPath(tmpPath);
               assertThat(content).hasSize(1);
               fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0));
               contentRef1.set(new ArrayList<>(content.values()).get(0));
           }
   
           // sink2
           AtomicReference<FileSystemOutputFormat<Row>> sinkRef2 = new 
AtomicReference<>();
           AtomicReference<File> fileToCommitRef2 = new AtomicReference<>();
           AtomicReference<String> contentRef2 = new AtomicReference<>();
           try (OneInputStreamOperatorTestHarness<Row, Object> testHarness =
                   createSink(
                           false,
                           false,
                           false,
                           
FileSystemTableSink.toStagingPath(Path.fromLocalFile(tmpPath.toFile())),
                           new LinkedHashMap<>(),
                           sinkRef2)) {
               // write different content
               List<Row> records =
                       Arrays.asList(
                               Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), 
Row.of("b3", 3, "x3"));
               writeUnorderedRecords(testHarness, records);
               Map<File, String> content = getFileContentByPath(tmpPath);
               assertThat(content).hasSize(1);
               fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0));
               contentRef2.set(new ArrayList<>(content.values()).get(0));
           }
   
           File fileToCommit1 = fileToCommitRef1.get();
           File fileToCommit2 = fileToCommitRef2.get();
           // because sink1 and sink2 are writing to the same staging dir
           // the file generated by sink1 has been overwritten by sink2
           
assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent());
           assertThat(fileToCommit1).doesNotExist();
   
           // lost records
           assertThat(contentRef1.get())
                   .isEqualTo("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
   
           assertThat(fileToCommit2).exists();
           assertThat(contentRef2.get()).isEqualTo("b1,1,x1\n" + "b2,2,x2\n" + 
"b3,3,x3\n");
   
           // let sink1 commit, and it actually commits the file generated by 
sink2
           sinkRef1.get().finalizeGlobal(finalizationContext);
           
assertThat(getFileContentByPath(outputPath).values()).containsExactly(contentRef2.get());
   
           // sink2 will commit nothing
           sinkRef2.get().finalizeGlobal(finalizationContext);
           
assertThat(getFileContentByPath(outputPath).values()).containsExactly(contentRef2.get());
       }
   ```
   If you run this case, it will fail because the staging dir generated by 
`FileSystemTableSink.toStagingPath` never returns the same path, and no data 
loss happens.
   
   
   2. Then, you change the code to return a constant path and rerun the 
previous case again; it'll be a success, which indicates one file is 
overwritten by another.
   ```java
       @VisibleForTesting
       public static Path toStagingPath(Path path) {
           // return a constant path to reproduce the issue
           Path stagingDir =
                   new Path(path, ".staging_foo");
          ...
       }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to