[ 
https://issues.apache.org/jira/browse/FLINK-21023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tang Yan updated FLINK-21023:
-----------------------------
    Description: 
I want to try to use option  -yt(yarnship) to distribute my config files to 
Yarn cluster, and read the file in code. I just used the flink example 
wordcount.

Here is my submit command:

/opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c 
org.apache.flink.examples.java.wordcount.WordCount 
/opt/Flink/examples/batch/WordCount.jar --input conf/cmp_online.cfg

Test Result:

I found that if the job manager and task manager are lunched on the same node, 
the job can run successfully. But when they're running on different node, the 
job will fail in the below ERRORs. I find the conf folder has been distributed 
to container cache dirs, such as 
[file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf]
 on job manager node, and 
[file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf]
 on task manager node. But why the task manager loads the conf file from the 
container_eXXX_000001 path (which is located on job manager node)?

_2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - 
Registering TaskManager with ResourceID 
container_e283_1609125504851_3620_01_000002 
(akka.tcp://fl...@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at 
ResourceManager 2021-01-19 04:19:11,506 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> 
FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from 
SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN 
DataSource (at main(WordCount.java:69) 
(org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) 
(attempt #0) to container_e283_1609125504851_3620_01_000002 @ 
rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> 
FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from 
DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO 
org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning 
remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> 
FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from 
RUNNING to FAILED on 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. 
java.io.IOException: Error opening the Input Split 
[file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg]
 [0,71]: 
/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
 (No such file or directory) at 
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) 
~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: 
/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
 (No such file or directory) at java.io.FileInputStream.open0(Native Method) 
~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) 
~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) 
~[?:1.8.0_272] at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]_

  was:
I want to try to use option  -yt(yarnship) to distribute my config files to 
Yarn cluster, and read the file in code. I just used the flink example 
wordcount.

Here is my submit command:

/opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c 
org.apache.flink.examples.java.wordcount.WordCount 
/opt/Flink/examples/batch/WordCount.jar --input conf/cmp_online.cfg

Test Result:

I found the if the job manager and task manager are lunched on the same node, 
the job can run successfully. But when they're running on different node, the 
job will fail in the below ERRORs. I find the conf folder has been distributed 
to container cache dirs, such as 
file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf
 on job manager node, and 
file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf
 on task manager node. But why the task manager loads the conf file from the 
container_eXXX_000001 path (which is located on job manager node)?

_2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - 
Registering TaskManager with ResourceID 
container_e283_1609125504851_3620_01_000002 
(akka.tcp://fl...@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at 
ResourceManager 2021-01-19 04:19:11,506 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> 
FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from 
SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN 
DataSource (at main(WordCount.java:69) 
(org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) 
(attempt #0) to container_e283_1609125504851_3620_01_000002 @ 
rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> 
FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from 
DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO 
org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning 
remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> 
FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from 
RUNNING to FAILED on 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. 
java.io.IOException: Error opening the Input Split 
file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
 [0,71]: 
/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
 (No such file or directory) at 
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) 
~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: 
/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
 (No such file or directory) at java.io.FileInputStream.open0(Native Method) 
~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) 
~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) 
~[?:1.8.0_272] at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]_


> Task Manager uses the container dir of Job Manager when running flink job on 
> yarn-cluster.
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21023
>                 URL: https://issues.apache.org/jira/browse/FLINK-21023
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.12.0, 1.11.1
>            Reporter: Tang Yan
>            Priority: Critical
>
> I want to try to use option  -yt(yarnship) to distribute my config files to 
> Yarn cluster, and read the file in code. I just used the flink example 
> wordcount.
> Here is my submit command:
> /opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c 
> org.apache.flink.examples.java.wordcount.WordCount 
> /opt/Flink/examples/batch/WordCount.jar --input conf/cmp_online.cfg
> Test Result:
> I found that if the job manager and task manager are lunched on the same 
> node, the job can run successfully. But when they're running on different 
> node, the job will fail in the below ERRORs. I find the conf folder has been 
> distributed to container cache dirs, such as 
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf]
>  on job manager node, and 
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf]
>  on task manager node. But why the task manager loads the conf file from the 
> container_eXXX_000001 path (which is located on job manager node)?
> _2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - 
> Registering TaskManager with ResourceID 
> container_e283_1609125504851_3620_01_000002 
> (akka.tcp://fl...@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at 
> ResourceManager 2021-01-19 04:19:11,506 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) 
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched 
> from SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN 
> DataSource (at main(WordCount.java:69) 
> (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at 
> main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) 
> (attempt #0) to container_e283_1609125504851_3620_01_000002 @ 
> rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) 
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched 
> from DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO 
> org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning 
> remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
> (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) 
> -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at 
> main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched 
> from RUNNING to FAILED on 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. 
> java.io.IOException: Error opening the Input Split 
> [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg]
>  [0,71]: 
> /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
>  (No such file or directory) at 
> org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) 
> ~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: 
> /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg
>  (No such file or directory) at java.io.FileInputStream.open0(Native Method) 
> ~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) 
> ~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) 
> ~[?:1.8.0_272] at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
> ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
> org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996)
>  ~[flink-dist_2.11-1.11.1.jar:1.11.1]_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to