Hi Robert, Thanks for your reply!
>I believe you should be able to load non-class files through the classloader >as well. Could you please clarify what you mean by this? >Did you see any code that excludes non-class files? No I didn't, but I did see the following code here [1]: if (shipFile.isDirectory()) { // add directories to the classpath java.nio.file.Path shipPath = shipFile.toPath(); final java.nio.file.Path parentPath = shipPath.getParent(); Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() { @Override public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException { java.nio.file.Path relativePath = parentPath.relativize(file); classPaths.add(relativePath.toString()); return FileVisitResult.CONTINUE; } }); } else { // add files to the classpath classPaths.add(shipFile.getName()); } the code above traverses the folder's content I passed via --yarnship option and appends non class files to the classpath in case a shipfile is a directory. That eventually gives no results as we all know only the following files can be set as jvm classpath: .class files, .jar files, .zip files or folders. I believe that in case the code above doesn't traverse directories contents then everything will work as expected. For instance if I pass a file then it appends to the classpath as is, if I specify a folder then it goes to the classpath as folder. By the meantime it is not possible to pass multiple yarnship options, but I also created another jira ticket [2] that proposes to add the ability to specify multiple yarnship folders. What do you think about that? [1] https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003: <https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:> [2] https://issues.apache.org/jira/browse/FLINK-6950 <https://issues.apache.org/jira/browse/FLINK-6950> Thanks in advance Kind Regards, Mike Pryakhin > On 27 Sep 2017, at 18:30, Robert Metzger <rmetz...@apache.org> wrote: > > Hi Mike, > > For using the DistributedCache approach, you need to have HDFS or another > distributed FS available to distribute the files. > > I would actually like to understand why you said " then this file is copied > to the yarn cluster and added to JVM class [...] but is ignored by TM JVM as > it is neither jar(zip) file nor directory..." > I believe you should be able to load non-class files through the classloader > as well. > Did you see any code that excludes non-class files? Afaik the Taskmanagers > have access to all files (of any type) that are passed using the --ship > command (or in the lib/ folder). > > > On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <m.prya...@gmail.com > <mailto:m.prya...@gmail.com>> wrote: > Hi Nico, > > Thanks a lot for you help, but unfortunately, the workaround you suggested > doesn't work for me. > I tried to leverage the StreamExecutionEnvironment#registerCachedFile method > but failed because this instance is created when the application master has > already been started therefore the classpath to run the application somewhere > on YARN cluster has already been created by means of > org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to > pass a local folder at the moment I submit the application so that it is > included in the application YARN classpath. > The option you suggested works well if I need to cache a file that is > available for me at the moment I want to register it (for example a file on > HDFS). > > Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to > pass user-specified folders to the YARN application classpath? > > > > Kind Regards, > Mike Pryakhin > > > >> On 21 Jun 2017, at 16:55, Mikhail Pryakhin <m.prya...@gmail.com >> <mailto:m.prya...@gmail.com>> wrote: >> >> Hi Nico! >> Sounds great, will give it a try and return back with results soon. >> >> Thank you so much for your help!! >> >> Kind Regards, >> Mike Pryakhin >> >>> On 21 Jun 2017, at 16:36, Nico Kruber <n...@data-artisans.com >>> <mailto:n...@data-artisans.com>> wrote: >>> >>> A workaround may be to use the DistributedCache. It apparently is not >>> documented much but the JavaDoc mentions roughly how to use it: >>> >>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/ >>> >>> <https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/> >>> flink/api/java/ExecutionEnvironment.java#L954 >>> >>> /** >>> * Registers a file at the distributed cache under the given name. The file >>> will >>> be accessible >>> * from any user-defined function in the (distributed) runtime under a local >>> path. Files >>> * may be local files (as long as all relevant workers have access to it), >>> or >>> files in a distributed file system. >>> * The runtime will copy the files temporarily to a local cache, if needed. >>> * <p> >>> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be >>> obtained inside UDFs via >>> * {@link >>> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and >>> provides access >>> * {@link org.apache.flink.api.common.cache.DistributedCache} via >>> * {@link >>> org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. >>> * >>> * @param filePath The path of the file, as a URI (e.g. "file:///some/path >>> <>" or >>> "hdfs://host:port/and/path <>") >>> * @param name The name under which the file is registered. >>> */ >>> public void registerCachedFile(String filePath, String name){ >>> registerCachedFile(filePath, name, false); >>> } >>> >>> You could pass the actual file URL to use for each instance of your job >>> that >>> requires a different file via a simple job parameter: >>> >>> >>> public static void main(String[] args) throws Exception { >>> ParameterTool params = ParameterTool.fromArgs(args); >>> >>> ... >>> >>> env.registerCachedFile(params.get("config_file", <default/path>), >>> "extConfig"); >>> >>> ... >>> } >>> >>> Flink's DistributedCache will then cache the file locally and you can use >>> it in >>> a RichFunction like in >>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/ >>> <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/> >>> apache/flink/test/distributedCache/DistributedCacheTest.java#L99 >>> >>> public class MyFunction extends AbstractRichFunction { >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public void open(Configuration conf) throws IOException { >>> File file = >>> getRuntimeContext().getDistributedCache().getFile("extConfig"); >>> ... >>> } >>> } >>> >>> >>> Nico >>> >>> On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote: >>>> Hi guys, >>>> >>>> any news? >>>> I’ve created a jira-ticket >>>> https://issues.apache.org/jira/browse/FLINK-6949 >>>> <https://issues.apache.org/jira/browse/FLINK-6949> >>>> <https://issues.apache.org/jira/browse/FLINK-6949 >>>> <https://issues.apache.org/jira/browse/FLINK-6949>>. >>>> >>>> >>>> Kind Regards, >>>> Mike Pryakhin >>>> >>>>> On 16 Jun 2017, at 16:35, Mikhail Pryakhin <m.prya...@gmail.com >>>>> <mailto:m.prya...@gmail.com>> wrote: >>>>> >>>>> Hi all, >>>>> >>>>> I run my flink job on yarn cluster and need to supply job configuration >>>>> parameters via configuration file alongside with the job jar. >>>>> (configuration file can't be packaged into jobs jar file). I tried to put >>>>> the configuration file into the folder that is passed via --yarnship >>>>> option to the flink run command, then this file is copied to the yarn >>>>> cluster and added to JVM class path like 'path/application.conf' but is >>>>> ignored by TM JVM as it is neither jar(zip) file nor directory... >>>>> >>>>> A looked through the YarnClusterDescriptor class where the >>>>> ENV_FLINK_CLASSPATH is built and haven't found any option to to tell >>>>> flink (YarnClusterDescriptor especially) to add my configuration file to >>>>> the TM JVM classpath... Is there any way to do so? If not do you consider >>>>> to have such an ability to add files? (like in spark I just can pass any >>>>> files via --files option) >>>>> >>>>> Thanks in advance. >>>>> >>>>> Kind Regards, >>>>> Mike Pryakhin >>> >> > >
smime.p7s
Description: S/MIME cryptographic signature