Hi Robert,

Thanks for your reply!

>I believe you should be able to load non-class files through the classloader 
>as well.
Could you please clarify what you mean by this?

>Did you see any code that excludes non-class files? 
No I didn't, but I did see the following code here [1]:

if (shipFile.isDirectory()) {
        // add directories to the classpath
        java.nio.file.Path shipPath = shipFile.toPath();
        final java.nio.file.Path parentPath = shipPath.getParent();

        Files.walkFileTree(shipPath, new 
SimpleFileVisitor<java.nio.file.Path>() {
                @Override
                public FileVisitResult visitFile(java.nio.file.Path file, 
BasicFileAttributes attrs)
                        throws IOException {
                        java.nio.file.Path relativePath = 
parentPath.relativize(file);

                        classPaths.add(relativePath.toString());

                        return FileVisitResult.CONTINUE;
                }
        });
} else {
        // add files to the classpath
        classPaths.add(shipFile.getName());
}

the code above traverses the folder's content I passed via --yarnship option 
and appends non class files to the classpath in case a shipfile is a directory. 
That eventually gives no results as we all know only the following files can be 
set as jvm classpath: .class files, .jar files, .zip files or folders.

I believe that in case the code above doesn't traverse directories contents 
then everything will work as expected.

For instance if I pass a file then it appends to the classpath as is, if I 
specify a folder then it goes to the classpath as folder.
By the meantime it is not possible to pass multiple yarnship options, but I 
also created another jira ticket [2] that proposes to add the ability to 
specify multiple yarnship folders.

What do you think about that?

[1] 
https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:
 
<https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:>
[2] https://issues.apache.org/jira/browse/FLINK-6950 
<https://issues.apache.org/jira/browse/FLINK-6950>


Thanks in advance

Kind Regards,
Mike Pryakhin



> On 27 Sep 2017, at 18:30, Robert Metzger <rmetz...@apache.org> wrote:
> 
> Hi Mike,
> 
> For using the DistributedCache approach, you need to have HDFS or another 
> distributed FS available to distribute the files.
> 
> I would actually like to understand why you said " then this file is copied 
> to the yarn cluster and added to JVM class  [...] but is ignored by TM JVM as 
> it is neither jar(zip) file nor directory..."
> I believe you should be able to load non-class files through the classloader 
> as well.
> Did you see any code that excludes non-class files? Afaik the Taskmanagers 
> have access to all files (of any type) that are passed using the --ship 
> command (or in the lib/ folder).
> 
> 
> On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <m.prya...@gmail.com 
> <mailto:m.prya...@gmail.com>> wrote:
> Hi Nico,
> 
> Thanks a lot for you help, but unfortunately, the workaround you suggested 
> doesn't work for me.
> I tried to leverage the StreamExecutionEnvironment#registerCachedFile method 
> but failed because this instance is created when the application master has 
> already been started therefore the classpath to run the application somewhere 
> on YARN cluster has already been created by means of 
> org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to 
> pass a local folder at the moment I submit the application so that it is 
> included in the application YARN classpath.
> The option you suggested works well if I need to cache a file that is 
> available for me at the moment I want to register it (for example a file on 
> HDFS).
> 
> Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to 
> pass user-specified folders to the YARN application classpath?
> 
> 
> 
> Kind Regards,
> Mike Pryakhin
> 
> 
> 
>> On 21 Jun 2017, at 16:55, Mikhail Pryakhin <m.prya...@gmail.com 
>> <mailto:m.prya...@gmail.com>> wrote:
>> 
>> Hi Nico!
>> Sounds great, will give it a try and return back with results soon.
>> 
>> Thank you so much for your help!!
>> 
>> Kind Regards,
>> Mike Pryakhin
>> 
>>> On 21 Jun 2017, at 16:36, Nico Kruber <n...@data-artisans.com 
>>> <mailto:n...@data-artisans.com>> wrote:
>>> 
>>> A workaround may be to use the DistributedCache. It apparently is not 
>>> documented much but the JavaDoc mentions roughly how to use it:
>>> 
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/>
>>> flink/api/java/ExecutionEnvironment.java#L954
>>> 
>>> /**
>>> * Registers a file at the distributed cache under the given name. The file 
>>> will 
>>> be accessible
>>> * from any user-defined function in the (distributed) runtime under a local 
>>> path. Files
>>> * may be local files (as long as all relevant workers have access to it), 
>>> or 
>>> files in a distributed file system.
>>> * The runtime will copy the files temporarily to a local cache, if needed.
>>> * <p>
>>> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be 
>>> obtained inside UDFs via
>>> * {@link 
>>> org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
>>> provides access
>>> * {@link org.apache.flink.api.common.cache.DistributedCache} via 
>>> * {@link 
>>> org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
>>> * 
>>> * @param filePath The path of the file, as a URI (e.g. "file:///some/path 
>>> <>" or 
>>> "hdfs://host:port/and/path <>")
>>> * @param name The name under which the file is registered.
>>> */
>>> public void registerCachedFile(String filePath, String name){
>>>     registerCachedFile(filePath, name, false);
>>> }
>>> 
>>> You could pass the actual file URL to use for each instance of your job 
>>> that 
>>> requires a different file via a simple job parameter:
>>> 
>>> 
>>> public static void main(String[] args) throws Exception {
>>>     ParameterTool params = ParameterTool.fromArgs(args);
>>> 
>>>     ...
>>> 
>>>     env.registerCachedFile(params.get("config_file", <default/path>), 
>>> "extConfig");
>>> 
>>>     ...
>>> }
>>> 
>>> Flink's DistributedCache will then cache the file locally and you can use 
>>> it in 
>>> a RichFunction like in
>>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/ 
>>> <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/>
>>> apache/flink/test/distributedCache/DistributedCacheTest.java#L99
>>> 
>>> public class MyFunction extends AbstractRichFunction {
>>>     private static final long serialVersionUID = 1L;
>>> 
>>>     @Override
>>>     public void open(Configuration conf) throws IOException {
>>>             File file = 
>>> getRuntimeContext().getDistributedCache().getFile("extConfig");
>>> ...
>>>     }
>>> }
>>> 
>>> 
>>> Nico
>>> 
>>> On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
>>>> Hi guys,
>>>> 
>>>> any news?
>>>> I’ve created a jira-ticket 
>>>> https://issues.apache.org/jira/browse/FLINK-6949 
>>>> <https://issues.apache.org/jira/browse/FLINK-6949>
>>>> <https://issues.apache.org/jira/browse/FLINK-6949 
>>>> <https://issues.apache.org/jira/browse/FLINK-6949>>.
>>>> 
>>>> 
>>>> Kind Regards,
>>>> Mike Pryakhin
>>>> 
>>>>> On 16 Jun 2017, at 16:35, Mikhail Pryakhin <m.prya...@gmail.com 
>>>>> <mailto:m.prya...@gmail.com>> wrote:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> I run my flink job on yarn cluster and need to supply job configuration
>>>>> parameters via configuration file alongside with the job jar.
>>>>> (configuration file can't be packaged into jobs jar file). I tried to put
>>>>> the configuration file into the folder that is passed via --yarnship
>>>>> option to the flink run command, then this file is copied to the yarn
>>>>> cluster and added to JVM class path like 'path/application.conf' but is
>>>>> ignored by TM JVM as it is neither jar(zip) file nor directory...
>>>>> 
>>>>> A looked through the YarnClusterDescriptor class where the
>>>>> ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
>>>>> flink (YarnClusterDescriptor especially) to add my configuration file to
>>>>> the TM JVM classpath... Is there any way to do so? If not do you consider
>>>>> to have such an ability to add files? (like in spark I just can pass any
>>>>> files via --files option)
>>>>> 
>>>>> Thanks in advance.
>>>>> 
>>>>> Kind Regards,
>>>>> Mike Pryakhin
>>> 
>> 
> 
> 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to