[ 
https://issues.apache.org/jira/browse/FLINK-15194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994356#comment-16994356
 ] 

Yang Wang commented on FLINK-15194:
-----------------------------------

[~zhongwei] Hi Wei, thanks for creating this ticket.

 

I have took a look the code and find that flink could not support registering a 
directory as a cached file in yarn per-job cluster. When we generate the job 
graph, if a cached file is directory, it will be zipped and then uploaded to 
the hdfs. Then the user artifact is updated to remote file. However, on the 
taskmanager side, it does not unzip the remote file. It is a bug and i will 
give a fix asap.

 

For session cluster(Yarn session and standalone), since all the cached file are 
distributed via blob, so it works fine.

 

> Directories in distributed caches are not extracted in Yarn Per Job Cluster 
> Mode
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-15194
>                 URL: https://issues.apache.org/jira/browse/FLINK-15194
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / YARN
>    Affects Versions: 1.10.0
>            Reporter: Wei Zhong
>            Assignee: Yang Wang
>            Priority: Blocker
>             Fix For: 1.10.0
>
>
> If we insert such code into the word count batch examples:
> {code:java}
> File testDirectory = new File("test_directory");
> testDirectory.mkdirs();
> env.registerCachedFile(testDirectory.getAbsolutePath(), "test_directory");
> text = text.map(new RichMapFunction<String, String>() {
>    @Override
>    public String map(String value) throws Exception {
>       File testDirectory = 
> getRuntimeContext().getDistributedCache().getFile("test_directory");
>       if (!testDirectory.isDirectory()) {
>          throw new RuntimeException(
>             String.format("the directory %s is not a directory!", 
> testDirectory.getAbsolutePath()));
>       }
>       return value;
>    }
> });
> {code}
> It works well in standalone mode but fails in Yarn Per Job Cluster Mode, the 
> exception is:
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: org.apache.flink.client.program.ProgramInvocationException: 
> Job failed (JobID: da572c60eb63b13b7a90892f1958a7b7)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:146)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:671)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:933)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1006)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1006)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: da572c60eb63b13b7a90892f1958a7b7)
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:93)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
>       at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>       at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>       at 
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:115)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>       ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: da572c60eb63b13b7a90892f1958a7b7)
>       at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:532)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>       at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>       ... 19 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:188)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:183)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:177)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:452)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: the directory 
> /tmp/hadoop-zhongwei/nm-local-dir/usercache/zhongwei/appcache/application_1576030059607_0008/flink-dist-cache-bb275987-90cf-406a-9890-caed34983a04/da572c60eb63b13b7a90892f1958a7b7/test_directory.zip
>  is not a directory!
>       at 
> org.apache.flink.examples.java.wordcount.WordCount$1.map(WordCount.java:95)
>       at 
> org.apache.flink.examples.java.wordcount.WordCount$1.map(WordCount.java:89)
>       at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>       at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> It seems the zip file is not extracted in yarn per job mode.
> Here is the complete code of the example:
> {code:java}
> public class WordCount {
>    public static void main(String[] args) throws Exception {
>       final MultipleParameterTool params = 
> MultipleParameterTool.fromArgs(args);
>       final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>       env.getConfig().setGlobalJobParameters(params);
>       DataSet<String> text = null;
>       if (params.has("input")) {
>          for (String input : params.getMultiParameterRequired("input")) {
>             if (text == null) {
>                text = env.readTextFile(input);
>             } else {
>                text = text.union(env.readTextFile(input));
>             }
>          }
>          Preconditions.checkNotNull(text, "Input DataSet should not be 
> null.");
>       } else {
>          System.out.println("Executing WordCount example with default input 
> data set.");
>          System.out.println("Use --input to specify file input.");
>          text = WordCountData.getDefaultTextLineDataSet(env);
>       }
>       File testDirectory = new File("test_directory");
>       testDirectory.mkdirs();
>       env.registerCachedFile(testDirectory.getAbsolutePath(), 
> "test_directory");
>       text = text.map(new RichMapFunction<String, String>() {
>          @Override
>          public String map(String value) throws Exception {
>             File testDirectory = 
> getRuntimeContext().getDistributedCache().getFile("test_directory");
>             if (!testDirectory.isDirectory()) {
>                throw new RuntimeException(
>                   String.format("the directory %s is not a directory!", 
> testDirectory.getAbsolutePath()));
>             }
>             return value;
>          }
>       });
>       DataSet<Tuple2<String, Integer>> counts =
>             text.flatMap(new Tokenizer())
>             .groupBy(0)
>             .sum(1);
>       if (params.has("output")) {
>          counts.writeAsCsv(params.get("output"), "\n", " ");
>          env.execute("WordCount Example");
>       } else {
>          System.out.println("Printing result to stdout. Use --output to 
> specify output path.");
>          counts.print();
>       }
>    }
>    public static final class Tokenizer implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>       @Override
>       public void flatMap(String value, Collector<Tuple2<String, Integer>> 
> out) {
>          String[] tokens = value.toLowerCase().split("\\W+");
>          for (String token : tokens) {
>             if (token.length() > 0) {
>                out.collect(new Tuple2<>(token, 1));
>             }
>          }
>       }
>    }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to