rkhachatryan commented on code in PR #25028:
URL: https://github.com/apache/flink/pull/25028#discussion_r1668622658


##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java:
##########
@@ -41,6 +42,41 @@
 /** Base class for file system factories that create S3 file systems. */
 public abstract class AbstractS3FileSystemFactory implements FileSystemFactory 
{
 
+    public static final ConfigOption<String> ACCESS_KEY =
+            ConfigOptions.key("s3.access-key")

Review Comment:
   I couldn't find generated docs for these options (all s3 options).
   Should we start generating in this hotfix commit?



##########
flink-core/src/main/java/org/apache/flink/util/FileUtils.java:
##########
@@ -138,17 +140,27 @@ public static String readFile(File file, String 
charsetName) throws IOException
         return new String(bytes, charsetName);
     }
 
+    public static String readFile(File file, Charset charset) throws 
IOException {
+        byte[] bytes = readAllBytes(file.toPath());
+        return new String(bytes, charset);
+    }
+
     public static String readFileUtf8(File file) throws IOException {
-        return readFile(file, "UTF-8");
+        return readFile(file, StandardCharsets.UTF_8);
     }
 
     public static void writeFile(File file, String contents, String encoding) 
throws IOException {
         byte[] bytes = contents.getBytes(encoding);
         Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
     }

Review Comment:
   This method seems to be unused now.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java:
##########
@@ -38,6 +38,14 @@ public interface StreamStateHandle extends StateObject {
     /** @return Content of this handle as bytes array if it is already in 
memory. */
     Optional<byte[]> asBytesIfInMemory();
 
+    /**
+     * @return Path to an underlying file represented by this {@link 
StreamStateHandle} or {@link
+     *     Optional#empty()} if there is no such file.
+     */
+    default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
+        return Optional.empty();
+    }

Review Comment:
   Should this also be implemented by `DirectoryStreamStateHandle` and 
`SegmentFileStateHandle`?
   



##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java:
##########
@@ -101,13 +237,154 @@ public FlinkS3FileSystem(
         this.s3AccessHelper = s3UploadHelper;
         this.uploadThreadPool = Executors.newCachedThreadPool();
 
-        Preconditions.checkArgument(s3uploadPartSize >= 
S3_MULTIPART_MIN_PART_SIZE);
+        checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
         this.s3uploadPartSize = s3uploadPartSize;
         this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+        LOG.info("Created Flink S3 FS, s5Cmd configuration: {}", 
s5CmdConfiguration);
     }
 
     // ------------------------------------------------------------------------
 
+    @Override
+    public boolean canCopyPaths(Path source, Path destination) {
+        return canCopyPaths();
+    }
+
+    private boolean canCopyPaths() {
+        return s5CmdConfiguration != null;
+    }

Review Comment:
   1. Should we also check that one is remote and one is local? (IIRC, this is 
s5cmd requirement)
   2. Use public method everywhere and inline the private one?



##########
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java:
##########
@@ -113,16 +114,16 @@ private String getHttpEndpoint() {
      * relevant parameter to access the {@code Minio} instance.
      */
     public void setS3ConfigOptions(Configuration config) {
-        config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+        config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint());

Review Comment:
   nit: re-order commits so that the use of options goes after their 
introduction?
   
   Currently, I see it as
   ```
   [hotfix] Use newly defined ConfigOptions in MinioTestContainer 
   [hotfix] Move CompressionUtils to flink-core
   [hotfix] Create ConfigOptions for s3 access/secret keys and endpoint 
   ```



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##########
@@ -94,46 +116,103 @@ public void transferAllStateDataToDirectory(
         }
     }
 
-    /** Asynchronously runs the specified download requests on 
executorService. */
-    private Stream<CompletableFuture<Void>> 
transferAllStateDataToDirectoryAsync(
-            Collection<StateHandleDownloadSpec> handleWithPaths,
+    private Collection<Runnable> createDownloadRunnables(
+            Collection<StateHandleDownloadSpec> downloadRequests,
+            CloseableRegistry closeableRegistry)
+            throws IOException {
+        // We need to support recovery from multiple FileSystems. At least one 
scenario that it can
+        // happen is when:
+        // 1. A checkpoint/savepoint is created on FileSystem_1
+        // 2. Job terminates
+        // 3. Configuration is changed use checkpoint directory using 
FileSystem_2
+        // 4. Job is restarted from checkpoint (1.) using claim mode
+        // 5. New incremental checkpoint is created, that can refer to files 
both from FileSystem_1
+        // and FileSystem_2.
+        Map<FileSystem.FSKey, List<CopyRequest>> filesSystemsFilesToDownload = 
new HashMap<>();
+        List<Runnable> runnables = new ArrayList<>();
+
+        for (StateHandleDownloadSpec downloadSpec : downloadRequests) {
+            for (HandleAndLocalPath handleAndLocalPath : 
getAllHandles(downloadSpec)) {
+                Path downloadDestination =
+                        downloadSpec
+                                .getDownloadDestination()
+                                .resolve(handleAndLocalPath.getLocalPath());
+                if (canCopyPaths(handleAndLocalPath)) {
+                    org.apache.flink.core.fs.Path remotePath =
+                            
handleAndLocalPath.getHandle().maybeGetPath().get();
+                    FileSystem.FSKey newFSKey = new 
FileSystem.FSKey(remotePath.toUri());
+                    List<CopyRequest> filesToDownload =
+                            filesSystemsFilesToDownload.computeIfAbsent(
+                                    newFSKey, fsKey -> new ArrayList<>());
+                    filesToDownload.add(
+                            CopyRequest.of(
+                                    remotePath,
+                                    new org.apache.flink.core.fs.Path(
+                                            downloadDestination.toUri())));

Review Comment:
   nit: inlining `filesToDownload` makes this code more readable for me



##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java:
##########
@@ -101,13 +237,154 @@ public FlinkS3FileSystem(
         this.s3AccessHelper = s3UploadHelper;
         this.uploadThreadPool = Executors.newCachedThreadPool();
 
-        Preconditions.checkArgument(s3uploadPartSize >= 
S3_MULTIPART_MIN_PART_SIZE);
+        checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
         this.s3uploadPartSize = s3uploadPartSize;
         this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+        LOG.info("Created Flink S3 FS, s5Cmd configuration: {}", 
s5CmdConfiguration);
     }
 
     // ------------------------------------------------------------------------
 
+    @Override
+    public boolean canCopyPaths(Path source, Path destination) {
+        return canCopyPaths();
+    }
+
+    private boolean canCopyPaths() {
+        return s5CmdConfiguration != null;
+    }
+
+    @Override
+    public void copyFiles(List<CopyRequest> requests, ICloseableRegistry 
closeableRegistry)
+            throws IOException {
+        checkState(canCopyPaths(), "#downloadFiles has been called illegally");
+        List<String> artefacts = new ArrayList<>();
+        artefacts.add(s5CmdConfiguration.path);
+        artefacts.addAll(s5CmdConfiguration.args);
+        artefacts.add("run");
+        castSpell(convertToSpells(requests), closeableRegistry, 
artefacts.toArray(new String[0]));
+    }
+
+    private List<String> convertToSpells(List<CopyRequest> requests) throws 
IOException {
+        List<String> spells = new ArrayList<>();
+        for (CopyRequest request : requests) {
+            
Files.createDirectories(Paths.get(request.getDestination().toUri()).getParent());
+            spells.add(
+                    String.format(
+                            "cp %s %s",
+                            request.getSource().toUri().toString(),
+                            request.getDestination().getPath()));
+        }
+        return spells;
+    }
+
+    private void castSpell(
+            List<String> spells, ICloseableRegistry closeableRegistry, 
String... artefacts)
+            throws IOException {
+        LOG.info("Casting spell: {}", Arrays.toString(artefacts));
+        int exitCode = 0;
+        final AtomicReference<IOException> maybeCloseableRegistryException =
+                new AtomicReference<>();
+
+        // Setup temporary working directory for the process
+        File tmpWorkingDir = new File(localTmpDir, "s5cmd_" + 
UUID.randomUUID());
+        java.nio.file.Path tmpWorkingPath = 
Files.createDirectories(tmpWorkingDir.toPath());
+
+        try {
+            // Redirect the process input/output to files. Communicating 
directly through a
+            // stream can lead to blocking and undefined behavior if the 
underlying process is
+            // killed (known Java problem).
+            ProcessBuilder hogwart = new 
ProcessBuilder(artefacts).directory(tmpWorkingDir);
+            s5CmdConfiguration.configureEnvironment(hogwart.environment());
+            File inScrolls = new File(tmpWorkingDir, "s5cmd_input");
+            Preconditions.checkState(inScrolls.createNewFile());
+            File outScrolls = new File(tmpWorkingDir, "s5cmd_output");
+            Preconditions.checkState(outScrolls.createNewFile());
+
+            FileUtils.writeFileUtf8(inScrolls, 
String.join(System.lineSeparator(), spells));

Review Comment:
   I think,
   a line separator after the last string is necessary
   because the file content serves as input to a process
   and similar to input from a terminal it needs a newline to take
   effect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to