1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1302400685


##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -156,9 +158,9 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
     private final boolean sharedYarnClient;
 
     /** Lazily initialized list of files to ship. */
-    private final List<File> shipFiles = new LinkedList<>();
+    private final List<Path> shipFiles = new LinkedList<>();
 
-    private final List<File> shipArchives = new LinkedList<>();
+    private final List<Path> shipArchives = new LinkedList<>();

Review Comment:
   How about adding a comment to describe we have converted the option path str 
to the Path with schema and absolute path? It's more clear for other 
developers, and it's clear to use them.



##########
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##########
@@ -272,17 +272,25 @@ public class YarnConfigOptions {
                     .noDefaultValue()
                     .withDeprecatedKeys("yarn.ship-directories")
                     .withDescription(
-                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+                            "A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
+                                    + "cluster. These files/directories can 
come from the local path of flink client "
+                                    + "or HDFS. For example, "

Review Comment:
   How about updating `from the local client and/or HDFS` to `the local path of 
flink client or HDFS` as well?



##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
         this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
     }
 
-    private Optional<List<File>> decodeFilesToShipToCluster(
+    private Optional<List<Path>> decodeFilesToShipToCluster(
             final Configuration configuration, final 
ConfigOption<List<String>> configOption) {
         checkNotNull(configuration);
         checkNotNull(configOption);
 
-        final List<File> files =
-                ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+        List<Path> files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+        files = 
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());
         return files.isEmpty() ? Optional.empty() : Optional.of(files);
     }
 
+    private Path enrichPathSchemaIfNeeded(Path path) {
+        if (isWithoutSchema(path)) {
+            return new Path(new File(path.toString()).toURI());

Review Comment:
   This class has a couple of ` new Path(new File(pathStr).toURI())` to convert 
path from `localPathStr` to the hdfs `Path`, could we extract one method to do 
it?



##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##########
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
         this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
     }
 
-    private Optional<List<File>> decodeFilesToShipToCluster(
+    private Optional<List<Path>> decodeFilesToShipToCluster(
             final Configuration configuration, final 
ConfigOption<List<String>> configOption) {
         checkNotNull(configuration);
         checkNotNull(configOption);
 
-        final List<File> files =
-                ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+        List<Path> files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+        files = 
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());
         return files.isEmpty() ? Optional.empty() : Optional.of(files);
     }
 
+    private Path enrichPathSchemaIfNeeded(Path path) {
+        if (isWithoutSchema(path)) {
+            return new Path(new File(path.toString()).toURI());
+        }
+        return path;
+    }
+
+    private boolean isWithoutSchema(Path path) {
+        return StringUtils.isNullOrWhitespaceOnly(path.toUri().getScheme());
+    }

Review Comment:
   How about updating the `Path enrichPathSchemaIfNeeded(Path path)` to the 
`Path createPathWithSchema(String path)`?
   
   If so, the `files = 
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());`
 can be removed. We just call `List<Path> files = 
ConfigUtils.decodeListFromConfig(configuration, configOption, 
this::createPathWithSchema);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to