1996fanrui commented on code in PR #23219: URL: https://github.com/apache/flink/pull/23219#discussion_r1296881231
########## flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java: ########## @@ -272,17 +272,24 @@ public class YarnConfigOptions { .noDefaultValue() .withDeprecatedKeys("yarn.ship-directories") .withDescription( - "A semicolon-separated list of files and/or directories to be shipped to the YARN cluster."); + "A semicolon-separated list of files and/or directories to be shipped to the YARN " + + "cluster. These files/directories can come from the local client and/or remote " Review Comment: ```suggestion + "cluster. These files/directories can come from the local path of flink client or remote " ``` ########## flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java: ########## @@ -272,17 +272,24 @@ public class YarnConfigOptions { .noDefaultValue() .withDeprecatedKeys("yarn.ship-directories") .withDescription( - "A semicolon-separated list of files and/or directories to be shipped to the YARN cluster."); + "A semicolon-separated list of files and/or directories to be shipped to the YARN " + + "cluster. These files/directories can come from the local client and/or remote " + + "file system. For example, \"/path/to/local/file;/path/to/local/directory;" + + "hdfs://$namenode_address/path/of/file;" + + "hdfs://$namenode_address/path/of/directory\""); public static final ConfigOption<List<String>> SHIP_ARCHIVES = key("yarn.ship-archives") .stringType() .asList() .noDefaultValue() .withDescription( - "A semicolon-separated list of archives to be shipped to the YARN cluster." - + " These archives will be un-packed when localizing and they can be any of the following types: " - + "\".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\"."); + "A semicolon-separated list of archives to be shipped to the YARN cluster. " + + "These archives can come from the local client and/or remote file system. " + + "They will be un-packed when localizing and they can be any of the following " + + "types: \".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\". " + + "For example, \"/path/to/local/archive.jar;" + + "hdfs://$namenode_address/path/to/archive.jar\""); Review Comment: Same 2 comments as the last option. ########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ########## @@ -911,31 +929,21 @@ private ApplicationReport startAppMaster( final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources(); final List<String> uploadedDependencies = fileUploader.registerMultipleLocalResources( - systemShipFiles.stream() - .map(e -> new Path(e.toURI())) - .collect(Collectors.toSet()), - Path.CUR_DIR, - LocalResourceType.FILE); + systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE); Review Comment: `systemShipFiles` includes 3 types of files: - ShipFiles - logConfigFilePath - LibFolders After this PR, the latter two types do not add schema, does it work as expected? ########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ########## @@ -202,16 +204,27 @@ public YarnClusterDescriptor( this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } - private Optional<List<File>> decodeFilesToShipToCluster( + private Optional<List<Path>> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption<List<String>> configOption) { checkNotNull(configuration); checkNotNull(configOption); - final List<File> files = - ConfigUtils.decodeListFromConfig(configuration, configOption, File::new); + List<Path> files = ConfigUtils.decodeListFromConfig(configuration, configOption, Path::new); + files = + files.stream() + .map( + path -> + isLocalPath(path) + ? new Path(new File(path.toString()).toURI()) + : path) Review Comment: These code can be extracted to a method: ``` Path enrichPathSchemaIfNeeded(Path path) { if(isWithoutSchema(path)) { return new Path(new File(path.toString()).toURI()); } return path; } ``` ########## flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java: ########## @@ -272,17 +272,24 @@ public class YarnConfigOptions { .noDefaultValue() .withDeprecatedKeys("yarn.ship-directories") .withDescription( - "A semicolon-separated list of files and/or directories to be shipped to the YARN cluster."); + "A semicolon-separated list of files and/or directories to be shipped to the YARN " + + "cluster. These files/directories can come from the local client and/or remote " + + "file system. For example, \"/path/to/local/file;/path/to/local/directory;" Review Comment: > remote file system The `YarnApplicationFileUploader#registerMultipleLocalResources` checks the path[1], and if the path is the remote path, It will check the path by the hdfs `FileSystem`. So If I understand correctly, the remote path just supports hdfs path, right? If so, the doc should be `hdfs path` instead of `remote path`. [1] https://github.com/apache/flink/blob/56bbd1dd6f83652d2d2c29efef9f21dbcca7b6e3/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java#L243 ########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ########## @@ -202,16 +204,27 @@ public YarnClusterDescriptor( this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } - private Optional<List<File>> decodeFilesToShipToCluster( + private Optional<List<Path>> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption<List<String>> configOption) { checkNotNull(configuration); checkNotNull(configOption); - final List<File> files = - ConfigUtils.decodeListFromConfig(configuration, configOption, File::new); + List<Path> files = ConfigUtils.decodeListFromConfig(configuration, configOption, Path::new); + files = + files.stream() + .map( + path -> + isLocalPath(path) + ? new Path(new File(path.toString()).toURI()) + : path) + .collect(Collectors.toList()); return files.isEmpty() ? Optional.empty() : Optional.of(files); } + private boolean isLocalPath(Path path) { Review Comment: The method name and implementation are not consistent. The method name is `isLocalPath(Path path)`, however the implementation is `isWithoutSchema()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org