LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1970685919

   What do you think if we move `toStagingPath` to `FileSystemOutputFormat`?
   Sth. like
   ```java
       private FileSystemOutputFormat(
               FileSystemFactory fsFactory,
               TableMetaStoreFactory msFactory,
               boolean overwrite,
               boolean isToLocal,
   +          Path path,
   -          Path tmpPath,
               String[] partitionColumns,
               boolean dynamicGrouped,
               LinkedHashMap<String, String> staticPartitions,
               OutputFormatFactory<T> formatFactory,
               PartitionComputer<T> computer,
               OutputFileConfig outputFileConfig,
               ObjectIdentifier identifier,
               PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
           this.fsFactory = fsFactory;
           this.msFactory = msFactory;
           this.overwrite = overwrite;
           this.isToLocal = isToLocal;
   +      this.path = path;
   +      this.tmpPath = toStagingPath();
           this.partitionColumns = partitionColumns;
           this.dynamicGrouped = dynamicGrouped;
           this.staticPartitions = staticPartitions;
           this.formatFactory = formatFactory;
           this.computer = computer;
           this.outputFileConfig = outputFileConfig;
           this.identifier = identifier;
           this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
       }
   
       private Path toStagingPath() {
           // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
           // Please see FLINK-29114 for more details
           Path stagingDir =
                   new Path(
                           path,
                           String.join(
                                   "_",
                                   ".staging_",
                                   String.valueOf(System.currentTimeMillis()),
                                   UUID.randomUUID().toString()));
           try {
               FileSystem fs = stagingDir.getFileSystem();
               Preconditions.checkState(
                       fs.mkdirs(stagingDir), "Failed to create staging dir " + 
stagingDir);
               return stagingDir;
           } catch (IOException e) {
               throw new RuntimeException(e);
           }
       }
   
       @VisibleForTesting
       public Path getTmpPath() {
           return tmpPath;
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to