1996fanrui commented on code in PR #23219: URL: https://github.com/apache/flink/pull/23219#discussion_r1302400685
########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ########## @@ -156,9 +158,9 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { private final boolean sharedYarnClient; /** Lazily initialized list of files to ship. */ - private final List<File> shipFiles = new LinkedList<>(); + private final List<Path> shipFiles = new LinkedList<>(); - private final List<File> shipArchives = new LinkedList<>(); + private final List<Path> shipArchives = new LinkedList<>(); Review Comment: How about adding a comment to describe we have converted the option path str to the Path with schema and absolute path? It's more clear for other developers, and it's clear to use them. ########## flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java: ########## @@ -272,17 +272,25 @@ public class YarnConfigOptions { .noDefaultValue() .withDeprecatedKeys("yarn.ship-directories") .withDescription( - "A semicolon-separated list of files and/or directories to be shipped to the YARN cluster."); + "A semicolon-separated list of files and/or directories to be shipped to the YARN " + + "cluster. These files/directories can come from the local path of flink client " + + "or HDFS. For example, " Review Comment: How about updating `from the local client and/or HDFS` to `the local path of flink client or HDFS` as well? ########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ########## @@ -202,16 +204,27 @@ public YarnClusterDescriptor( this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } - private Optional<List<File>> decodeFilesToShipToCluster( + private Optional<List<Path>> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption<List<String>> configOption) { checkNotNull(configuration); checkNotNull(configOption); - final List<File> files = - ConfigUtils.decodeListFromConfig(configuration, configOption, File::new); + List<Path> files = ConfigUtils.decodeListFromConfig(configuration, configOption, Path::new); + files = files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList()); return files.isEmpty() ? Optional.empty() : Optional.of(files); } + private Path enrichPathSchemaIfNeeded(Path path) { + if (isWithoutSchema(path)) { + return new Path(new File(path.toString()).toURI()); Review Comment: This class has a couple of ` new Path(new File(pathStr).toURI())` to convert path from `localPathStr` to the hdfs `Path`, could we extract one method to do it? ########## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ########## @@ -202,16 +204,27 @@ public YarnClusterDescriptor( this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } - private Optional<List<File>> decodeFilesToShipToCluster( + private Optional<List<Path>> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption<List<String>> configOption) { checkNotNull(configuration); checkNotNull(configOption); - final List<File> files = - ConfigUtils.decodeListFromConfig(configuration, configOption, File::new); + List<Path> files = ConfigUtils.decodeListFromConfig(configuration, configOption, Path::new); + files = files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList()); return files.isEmpty() ? Optional.empty() : Optional.of(files); } + private Path enrichPathSchemaIfNeeded(Path path) { + if (isWithoutSchema(path)) { + return new Path(new File(path.toString()).toURI()); + } + return path; + } + + private boolean isWithoutSchema(Path path) { + return StringUtils.isNullOrWhitespaceOnly(path.toUri().getScheme()); + } Review Comment: How about updating the `Path enrichPathSchemaIfNeeded(Path path)` to the `Path createPathWithSchema(String path)`? If so, the `files = files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());` can be removed. We just call `List<Path> files = ConfigUtils.decodeListFromConfig(configuration, configOption, this::createPathWithSchema);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org