1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1296881231


##########
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##########
@@ -272,17 +272,24 @@ public class YarnConfigOptions {
                     .noDefaultValue()
                     .withDeprecatedKeys("yarn.ship-directories")
                     .withDescription(
-                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
+                                    + "cluster. These files/directories can 
come from the local client and/or remote "

Review Comment:
   ```suggestion
                                       + "cluster. These files/directories can 
come from the local path of flink client or remote "
   ```



##########
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##########
@@ -272,17 +272,24 @@ public class YarnConfigOptions {
                     .noDefaultValue()
                     .withDeprecatedKeys("yarn.ship-directories")
                     .withDescription(
-                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
+                                    + "cluster. These files/directories can 
come from the local client and/or remote "
+                                    + "file system. For example, 
\"/path/to/local/file;/path/to/local/directory;"
+                                    + "hdfs://$namenode_address/path/of/file;"
+                                    + 
"hdfs://$namenode_address/path/of/directory\"");
 
     public static final ConfigOption<List<String>> SHIP_ARCHIVES =
             key("yarn.ship-archives")
                     .stringType()
                     .asList()
                     .noDefaultValue()
                     .withDescription(
-                            "A semicolon-separated list of archives to be 
shipped to the YARN cluster."
-                                    + " These archives will be un-packed when 
localizing and they can be any of the following types: "
-                                    + "\".tar.gz\", \".tar\", \".tgz\", 
\".dst\", \".jar\", \".zip\".");
+                            "A semicolon-separated list of archives to be 
shipped to the YARN cluster. "
+                                    + "These archives can come from the local 
client and/or remote file system. "
+                                    + "They will be un-packed when localizing 
and they can be any of the following "
+                                    + "types: \".tar.gz\", \".tar\", \".tgz\", 
\".dst\", \".jar\", \".zip\". "
+                                    + "For example, 
\"/path/to/local/archive.jar;"
+                                    + 
"hdfs://$namenode_address/path/to/archive.jar\"");

Review Comment:
   Same 2 comments as the last option.



##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -911,31 +929,21 @@ private ApplicationReport startAppMaster(
         final List<String> systemClassPaths = 
fileUploader.registerProvidedLocalResources();
         final List<String> uploadedDependencies =
                 fileUploader.registerMultipleLocalResources(
-                        systemShipFiles.stream()
-                                .map(e -> new Path(e.toURI()))
-                                .collect(Collectors.toSet()),
-                        Path.CUR_DIR,
-                        LocalResourceType.FILE);
+                        systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);

Review Comment:
   `systemShipFiles` includes 3 types of files:
   
   - ShipFiles
   - logConfigFilePath
   - LibFolders
   
   After this PR, the latter two types do not add schema, does it work as 
expected?



##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
         this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
     }
 
-    private Optional<List<File>> decodeFilesToShipToCluster(
+    private Optional<List<Path>> decodeFilesToShipToCluster(
             final Configuration configuration, final 
ConfigOption<List<String>> configOption) {
         checkNotNull(configuration);
         checkNotNull(configOption);
 
-        final List<File> files =
-                ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+        List<Path> files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+        files =
+                files.stream()
+                        .map(
+                                path ->
+                                        isLocalPath(path)
+                                                ? new Path(new 
File(path.toString()).toURI())
+                                                : path)

Review Comment:
   These code can be extracted to a method:
   
   ```
   Path enrichPathSchemaIfNeeded(Path path) {
       if(isWithoutSchema(path)) {
           return new Path(new File(path.toString()).toURI());
       }
       return path;
   }
   ```



##########
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##########
@@ -272,17 +272,24 @@ public class YarnConfigOptions {
                     .noDefaultValue()
                     .withDeprecatedKeys("yarn.ship-directories")
                     .withDescription(
-                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
+                                    + "cluster. These files/directories can 
come from the local client and/or remote "
+                                    + "file system. For example, 
\"/path/to/local/file;/path/to/local/directory;"

Review Comment:
   > remote file system
   
   The `YarnApplicationFileUploader#registerMultipleLocalResources` checks the 
path[1], and if the path is the remote path, It will check the path by the hdfs 
`FileSystem`. So If I understand correctly, the remote path just supports hdfs 
path, right?
   
   If so, the doc should be `hdfs path` instead of `remote path`.
   
   
   [1] 
https://github.com/apache/flink/blob/56bbd1dd6f83652d2d2c29efef9f21dbcca7b6e3/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java#L243



##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
         this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
     }
 
-    private Optional<List<File>> decodeFilesToShipToCluster(
+    private Optional<List<Path>> decodeFilesToShipToCluster(
             final Configuration configuration, final 
ConfigOption<List<String>> configOption) {
         checkNotNull(configuration);
         checkNotNull(configOption);
 
-        final List<File> files =
-                ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+        List<Path> files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+        files =
+                files.stream()
+                        .map(
+                                path ->
+                                        isLocalPath(path)
+                                                ? new Path(new 
File(path.toString()).toURI())
+                                                : path)
+                        .collect(Collectors.toList());
         return files.isEmpty() ? Optional.empty() : Optional.of(files);
     }
 
+    private boolean isLocalPath(Path path) {

Review Comment:
   The method name and implementation are not consistent.
   
   The method name is `isLocalPath(Path path)`, however the implementation is 
`isWithoutSchema()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to