[ https://issues.apache.org/jira/browse/FLINK-14037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934315#comment-16934315 ]
liupengcheng commented on FLINK-14037: -------------------------------------- [~zhuzh] I retest my case, I enabled `-verbose:class` in client, and run apps with/without my PR, and compare the class loading info. I finally found that the conflicts does no matter with the `FileInputFormat` but the guava classes. I rechecked my user code, it uses the guava classes, and shaded the package to `org.spark-project.xxx`, then it conflicts with the jar of the runtime hadoop cluster(spark external shuffle service jar). So that's why my fix can resolve this issue, and keep the classloader of client and remove consistent is the right thing we should do. I also checked the `FileInputFormat` class, it has always being loaded by parent loader, so I think your understanding of the _alwaysParentLoaderFirst_ is right. Thank you for your help [~zhuzh] {code:java} 6125,6129c6087,6093 < [Loaded org.spark-project.guava.primitives.Longs from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar] < [Loaded org.spark-project.guava.primitives.UnsignedBytes from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar] < [Loaded org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar] < [Loaded org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1 from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar] < [Loaded org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar] --- > [Loaded org.spark-project.guava.base.Converter from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar] > [Loaded org.spark-project.guava.primitives.Longs from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar] > [Loaded org.spark-project.guava.primitives.Longs$LongConverter from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar] > [Loaded org.spark-project.guava.primitives.UnsignedBytes from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar] > [Loaded org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar] > [Loaded org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1 from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar] > [Loaded org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator from file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar] 8279a8244,8246 {code} > Deserializing the input/output formats failed: unread block data > ---------------------------------------------------------------- > > Key: FLINK-14037 > URL: https://issues.apache.org/jira/browse/FLINK-14037 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.9.0 > Environment: flink 1.9.0 > app jar use `flink-shaded-hadoop-2` dependencies to avoid some confilicts > > Reporter: liupengcheng > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Recently, we encountered the following issue when testing flink 1.9.0: > {code:java} > org.apache.flink.client.program.ProgramInvocationException: Could not > retrieve the execution result. (JobID: 8ffbc071dda81d6f8005c02be8adde6b) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539) > at > com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89) > at > com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal > server error., <Exception on server side: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > ... 6 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSource (at > org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390) > (org.apache.flink.api.scala.hadoop.mapreduce.HadoopInpu)': Loading the > input/output formats failed: > org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184) > at > org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) > at > org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) > ... 10 more > Caused by: java.lang.Exception: Loading the input/output formats failed: > org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214) > ... 20 more > Caused by: java.lang.RuntimeException: Deserializing the input/output formats > failed: unread block data > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153) > ... 22 more > Caused by: java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > at > org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) > ... 23 more > End of exception on server side>] > at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 4 more > {code} > I checked the classpath: > {noformat} > Classpath: > lib/flink-table-blink_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/flink-table_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:logback.xml:plugins/README.txt:flink.jar:flink-conf.yaml: > {OTHER HADOOP CLASSPATH}{noformat} > Our app code: > {code:java} > val env = ExecutionEnvironment.getExecutionEnvironment > // env.getConfig.enableObjectReuse() > val dataSet = env.createInput(HadoopInputs.readHadoopFile( > new TeraInputFormat, classOf[Array[Byte]], classOf[Array[Byte]], > inputFile)) > .partitionCustom(new FlinkTeraSortPartitioner(new > TeraSortPartitioner(partitions)), 0) > .sortPartition(0, Order.ASCENDING) > val job = Job.getInstance(new JobConf) > val outputFormat = new HadoopOutputFormat[Array[Byte], Array[Byte]]( > new TeraOutputFormat, job) > FileOutputFormat.setOutputPath(job, new Path(outputFile)) > dataSet.output(outputFormat) > env.execute("TeraSort") > {code} > I'm trying to find out the root cause, now I suspect that we are missing the > `flink-hadoop-compatibility_2.11` package under the flink-dist directory. > > And what's more, I think we should provide better useful information to users > rather than such messages which are hard to understand. > -- This message was sent by Atlassian Jira (v8.3.4#803005)