Hi Nico, Thanks a lot for you help, but unfortunately, the workaround you suggested doesn't work for me. I tried to leverage the StreamExecutionEnvironment#registerCachedFile method but failed because this instance is created when the application master has already been started therefore the classpath to run the application somewhere on YARN cluster has already been created by means of org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to pass a local folder at the moment I submit the application so that it is included in the application YARN classpath. The option you suggested works well if I need to cache a file that is available for me at the moment I want to register it (for example a file on HDFS).
Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to pass user-specified folders to the YARN application classpath? Kind Regards, Mike Pryakhin > On 21 Jun 2017, at 16:55, Mikhail Pryakhin <m.prya...@gmail.com> wrote: > > Hi Nico! > Sounds great, will give it a try and return back with results soon. > > Thank you so much for your help!! > > Kind Regards, > Mike Pryakhin > >> On 21 Jun 2017, at 16:36, Nico Kruber <n...@data-artisans.com >> <mailto:n...@data-artisans.com>> wrote: >> >> A workaround may be to use the DistributedCache. It apparently is not >> documented much but the JavaDoc mentions roughly how to use it: >> >> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/ >> >> <https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/> >> flink/api/java/ExecutionEnvironment.java#L954 >> >> /** >> * Registers a file at the distributed cache under the given name. The file >> will >> be accessible >> * from any user-defined function in the (distributed) runtime under a local >> path. Files >> * may be local files (as long as all relevant workers have access to it), or >> files in a distributed file system. >> * The runtime will copy the files temporarily to a local cache, if needed. >> * <p> >> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be >> obtained inside UDFs via >> * {@link >> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and >> provides access >> * {@link org.apache.flink.api.common.cache.DistributedCache} via >> * {@link >> org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. >> * >> * @param filePath The path of the file, as a URI (e.g. "file:///some/path" >> or >> "hdfs://host:port/and/path") >> * @param name The name under which the file is registered. >> */ >> public void registerCachedFile(String filePath, String name){ >> registerCachedFile(filePath, name, false); >> } >> >> You could pass the actual file URL to use for each instance of your job that >> requires a different file via a simple job parameter: >> >> >> public static void main(String[] args) throws Exception { >> ParameterTool params = ParameterTool.fromArgs(args); >> >> ... >> >> env.registerCachedFile(params.get("config_file", <default/path>), >> "extConfig"); >> >> ... >> } >> >> Flink's DistributedCache will then cache the file locally and you can use it >> in >> a RichFunction like in >> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/ >> apache/flink/test/distributedCache/DistributedCacheTest.java#L99 >> >> public class MyFunction extends AbstractRichFunction { >> private static final long serialVersionUID = 1L; >> >> @Override >> public void open(Configuration conf) throws IOException { >> File file = >> getRuntimeContext().getDistributedCache().getFile("extConfig"); >> ... >> } >> } >> >> >> Nico >> >> On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote: >>> Hi guys, >>> >>> any news? >>> I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949 >>> <https://issues.apache.org/jira/browse/FLINK-6949>. >>> >>> >>> Kind Regards, >>> Mike Pryakhin >>> >>>> On 16 Jun 2017, at 16:35, Mikhail Pryakhin <m.prya...@gmail.com> wrote: >>>> >>>> Hi all, >>>> >>>> I run my flink job on yarn cluster and need to supply job configuration >>>> parameters via configuration file alongside with the job jar. >>>> (configuration file can't be packaged into jobs jar file). I tried to put >>>> the configuration file into the folder that is passed via --yarnship >>>> option to the flink run command, then this file is copied to the yarn >>>> cluster and added to JVM class path like 'path/application.conf' but is >>>> ignored by TM JVM as it is neither jar(zip) file nor directory... >>>> >>>> A looked through the YarnClusterDescriptor class where the >>>> ENV_FLINK_CLASSPATH is built and haven't found any option to to tell >>>> flink (YarnClusterDescriptor especially) to add my configuration file to >>>> the TM JVM classpath... Is there any way to do so? If not do you consider >>>> to have such an ability to add files? (like in spark I just can pass any >>>> files via --files option) >>>> >>>> Thanks in advance. >>>> >>>> Kind Regards, >>>> Mike Pryakhin >> >
smime.p7s
Description: S/MIME cryptographic signature