[ https://issues.apache.org/jira/browse/FLINK-15194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995394#comment-16995394 ]
Wei Zhong commented on FLINK-15194: ----------------------------------- [~fly_in_gis] I have tested in a real Yarn cluster and it works. Great job! > Directories in distributed caches are not extracted in Yarn Per Job Cluster > Mode > -------------------------------------------------------------------------------- > > Key: FLINK-15194 > URL: https://issues.apache.org/jira/browse/FLINK-15194 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN > Affects Versions: 1.10.0 > Reporter: Wei Zhong > Assignee: Yang Wang > Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > If we insert such code into the word count batch examples: > {code:java} > File testDirectory = new File("test_directory"); > testDirectory.mkdirs(); > env.registerCachedFile(testDirectory.getAbsolutePath(), "test_directory"); > text = text.map(new RichMapFunction<String, String>() { > @Override > public String map(String value) throws Exception { > File testDirectory = > getRuntimeContext().getDistributedCache().getFile("test_directory"); > if (!testDirectory.isDirectory()) { > throw new RuntimeException( > String.format("the directory %s is not a directory!", > testDirectory.getAbsolutePath())); > } > return value; > } > }); > {code} > It works well in standalone mode but fails in Yarn Per Job Cluster Mode, the > exception is: > {code:java} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: org.apache.flink.client.program.ProgramInvocationException: > Job failed (JobID: da572c60eb63b13b7a90892f1958a7b7) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:146) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:671) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:933) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1006) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1006) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: da572c60eb63b13b7a90892f1958a7b7) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:93) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:115) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 11 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: da572c60eb63b13b7a90892f1958a7b7) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:532) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) > ... 19 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:188) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:183) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:177) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:452) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: the directory > /tmp/hadoop-zhongwei/nm-local-dir/usercache/zhongwei/appcache/application_1576030059607_0008/flink-dist-cache-bb275987-90cf-406a-9890-caed34983a04/da572c60eb63b13b7a90892f1958a7b7/test_directory.zip > is not a directory! > at > org.apache.flink.examples.java.wordcount.WordCount$1.map(WordCount.java:95) > at > org.apache.flink.examples.java.wordcount.WordCount$1.map(WordCount.java:89) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) > at java.lang.Thread.run(Thread.java:748) > {code} > It seems the zip file is not extracted in yarn per job mode. > Here is the complete code of the example: > {code:java} > public class WordCount { > public static void main(String[] args) throws Exception { > final MultipleParameterTool params = > MultipleParameterTool.fromArgs(args); > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(params); > DataSet<String> text = null; > if (params.has("input")) { > for (String input : params.getMultiParameterRequired("input")) { > if (text == null) { > text = env.readTextFile(input); > } else { > text = text.union(env.readTextFile(input)); > } > } > Preconditions.checkNotNull(text, "Input DataSet should not be > null."); > } else { > System.out.println("Executing WordCount example with default input > data set."); > System.out.println("Use --input to specify file input."); > text = WordCountData.getDefaultTextLineDataSet(env); > } > File testDirectory = new File("test_directory"); > testDirectory.mkdirs(); > env.registerCachedFile(testDirectory.getAbsolutePath(), > "test_directory"); > text = text.map(new RichMapFunction<String, String>() { > @Override > public String map(String value) throws Exception { > File testDirectory = > getRuntimeContext().getDistributedCache().getFile("test_directory"); > if (!testDirectory.isDirectory()) { > throw new RuntimeException( > String.format("the directory %s is not a directory!", > testDirectory.getAbsolutePath())); > } > return value; > } > }); > DataSet<Tuple2<String, Integer>> counts = > text.flatMap(new Tokenizer()) > .groupBy(0) > .sum(1); > if (params.has("output")) { > counts.writeAsCsv(params.get("output"), "\n", " "); > env.execute("WordCount Example"); > } else { > System.out.println("Printing result to stdout. Use --output to > specify output path."); > counts.print(); > } > } > public static final class Tokenizer implements FlatMapFunction<String, > Tuple2<String, Integer>> { > @Override > public void flatMap(String value, Collector<Tuple2<String, Integer>> > out) { > String[] tokens = value.toLowerCase().split("\\W+"); > for (String token : tokens) { > if (token.length() > 0) { > out.collect(new Tuple2<>(token, 1)); > } > } > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)