rkhachatryan commented on code in PR #25028: URL: https://github.com/apache/flink/pull/25028#discussion_r1668622658
########## flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java: ########## @@ -41,6 +42,41 @@ /** Base class for file system factories that create S3 file systems. */ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory { + public static final ConfigOption<String> ACCESS_KEY = + ConfigOptions.key("s3.access-key") Review Comment: I couldn't find generated docs for these options (all s3 options). Should we start generating in this hotfix commit? ########## flink-core/src/main/java/org/apache/flink/util/FileUtils.java: ########## @@ -138,17 +140,27 @@ public static String readFile(File file, String charsetName) throws IOException return new String(bytes, charsetName); } + public static String readFile(File file, Charset charset) throws IOException { + byte[] bytes = readAllBytes(file.toPath()); + return new String(bytes, charset); + } + public static String readFileUtf8(File file) throws IOException { - return readFile(file, "UTF-8"); + return readFile(file, StandardCharsets.UTF_8); } public static void writeFile(File file, String contents, String encoding) throws IOException { byte[] bytes = contents.getBytes(encoding); Files.write(file.toPath(), bytes, StandardOpenOption.WRITE); } Review Comment: This method seems to be unused now. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java: ########## @@ -38,6 +38,14 @@ public interface StreamStateHandle extends StateObject { /** @return Content of this handle as bytes array if it is already in memory. */ Optional<byte[]> asBytesIfInMemory(); + /** + * @return Path to an underlying file represented by this {@link StreamStateHandle} or {@link + * Optional#empty()} if there is no such file. + */ + default Optional<org.apache.flink.core.fs.Path> maybeGetPath() { + return Optional.empty(); + } Review Comment: Should this also be implemented by `DirectoryStreamStateHandle` and `SegmentFileStateHandle`? ########## flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java: ########## @@ -101,13 +237,154 @@ public FlinkS3FileSystem( this.s3AccessHelper = s3UploadHelper; this.uploadThreadPool = Executors.newCachedThreadPool(); - Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE); + checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE); this.s3uploadPartSize = s3uploadPartSize; this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; + LOG.info("Created Flink S3 FS, s5Cmd configuration: {}", s5CmdConfiguration); } // ------------------------------------------------------------------------ + @Override + public boolean canCopyPaths(Path source, Path destination) { + return canCopyPaths(); + } + + private boolean canCopyPaths() { + return s5CmdConfiguration != null; + } Review Comment: 1. Should we also check that one is remote and one is local? (IIRC, this is s5cmd requirement) 2. Use public method everywhere and inline the private one? ########## flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java: ########## @@ -113,16 +114,16 @@ private String getHttpEndpoint() { * relevant parameter to access the {@code Minio} instance. */ public void setS3ConfigOptions(Configuration config) { - config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint()); + config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint()); Review Comment: nit: re-order commits so that the use of options goes after their introduction? Currently, I see it as ``` [hotfix] Use newly defined ConfigOptions in MinioTestContainer [hotfix] Move CompressionUtils to flink-core [hotfix] Create ConfigOptions for s3 access/secret keys and endpoint ``` ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java: ########## @@ -94,46 +116,103 @@ public void transferAllStateDataToDirectory( } } - /** Asynchronously runs the specified download requests on executorService. */ - private Stream<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync( - Collection<StateHandleDownloadSpec> handleWithPaths, + private Collection<Runnable> createDownloadRunnables( + Collection<StateHandleDownloadSpec> downloadRequests, + CloseableRegistry closeableRegistry) + throws IOException { + // We need to support recovery from multiple FileSystems. At least one scenario that it can + // happen is when: + // 1. A checkpoint/savepoint is created on FileSystem_1 + // 2. Job terminates + // 3. Configuration is changed use checkpoint directory using FileSystem_2 + // 4. Job is restarted from checkpoint (1.) using claim mode + // 5. New incremental checkpoint is created, that can refer to files both from FileSystem_1 + // and FileSystem_2. + Map<FileSystem.FSKey, List<CopyRequest>> filesSystemsFilesToDownload = new HashMap<>(); + List<Runnable> runnables = new ArrayList<>(); + + for (StateHandleDownloadSpec downloadSpec : downloadRequests) { + for (HandleAndLocalPath handleAndLocalPath : getAllHandles(downloadSpec)) { + Path downloadDestination = + downloadSpec + .getDownloadDestination() + .resolve(handleAndLocalPath.getLocalPath()); + if (canCopyPaths(handleAndLocalPath)) { + org.apache.flink.core.fs.Path remotePath = + handleAndLocalPath.getHandle().maybeGetPath().get(); + FileSystem.FSKey newFSKey = new FileSystem.FSKey(remotePath.toUri()); + List<CopyRequest> filesToDownload = + filesSystemsFilesToDownload.computeIfAbsent( + newFSKey, fsKey -> new ArrayList<>()); + filesToDownload.add( + CopyRequest.of( + remotePath, + new org.apache.flink.core.fs.Path( + downloadDestination.toUri()))); Review Comment: nit: inlining `filesToDownload` makes this code more readable for me ########## flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java: ########## @@ -101,13 +237,154 @@ public FlinkS3FileSystem( this.s3AccessHelper = s3UploadHelper; this.uploadThreadPool = Executors.newCachedThreadPool(); - Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE); + checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE); this.s3uploadPartSize = s3uploadPartSize; this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; + LOG.info("Created Flink S3 FS, s5Cmd configuration: {}", s5CmdConfiguration); } // ------------------------------------------------------------------------ + @Override + public boolean canCopyPaths(Path source, Path destination) { + return canCopyPaths(); + } + + private boolean canCopyPaths() { + return s5CmdConfiguration != null; + } + + @Override + public void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry) + throws IOException { + checkState(canCopyPaths(), "#downloadFiles has been called illegally"); + List<String> artefacts = new ArrayList<>(); + artefacts.add(s5CmdConfiguration.path); + artefacts.addAll(s5CmdConfiguration.args); + artefacts.add("run"); + castSpell(convertToSpells(requests), closeableRegistry, artefacts.toArray(new String[0])); + } + + private List<String> convertToSpells(List<CopyRequest> requests) throws IOException { + List<String> spells = new ArrayList<>(); + for (CopyRequest request : requests) { + Files.createDirectories(Paths.get(request.getDestination().toUri()).getParent()); + spells.add( + String.format( + "cp %s %s", + request.getSource().toUri().toString(), + request.getDestination().getPath())); + } + return spells; + } + + private void castSpell( + List<String> spells, ICloseableRegistry closeableRegistry, String... artefacts) + throws IOException { + LOG.info("Casting spell: {}", Arrays.toString(artefacts)); + int exitCode = 0; + final AtomicReference<IOException> maybeCloseableRegistryException = + new AtomicReference<>(); + + // Setup temporary working directory for the process + File tmpWorkingDir = new File(localTmpDir, "s5cmd_" + UUID.randomUUID()); + java.nio.file.Path tmpWorkingPath = Files.createDirectories(tmpWorkingDir.toPath()); + + try { + // Redirect the process input/output to files. Communicating directly through a + // stream can lead to blocking and undefined behavior if the underlying process is + // killed (known Java problem). + ProcessBuilder hogwart = new ProcessBuilder(artefacts).directory(tmpWorkingDir); + s5CmdConfiguration.configureEnvironment(hogwart.environment()); + File inScrolls = new File(tmpWorkingDir, "s5cmd_input"); + Preconditions.checkState(inScrolls.createNewFile()); + File outScrolls = new File(tmpWorkingDir, "s5cmd_output"); + Preconditions.checkState(outScrolls.createNewFile()); + + FileUtils.writeFileUtf8(inScrolls, String.join(System.lineSeparator(), spells)); Review Comment: I think, a line separator after the last string is necessary because the file content serves as input to a process and similar to input from a terminal it needs a newline to take effect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org