[ 
https://issues.apache.org/jira/browse/FLINK-14037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934315#comment-16934315
 ] 

liupengcheng commented on FLINK-14037:
--------------------------------------

[~zhuzh] I retest my case, I enabled `-verbose:class` in client, and run apps 
with/without my PR, and compare the class loading info. I finally found that 
the conflicts does no matter with the `FileInputFormat` but the guava classes. 
I rechecked my user code, it uses the guava classes, and shaded the package to 
`org.spark-project.xxx`, then it conflicts with the jar of the runtime hadoop 
cluster(spark external shuffle service jar).

So that's why my fix can resolve this issue, and keep the classloader of client 
and remove consistent is the right thing we should do.

I also checked the `FileInputFormat` class, it has always being loaded by 
parent loader, so I think your understanding of the _alwaysParentLoaderFirst_ 
is right. Thank you for your help [~zhuzh] 
{code:java}
6125,6129c6087,6093 < [Loaded org.spark-project.guava.primitives.Longs from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar]
 < [Loaded org.spark-project.guava.primitives.UnsignedBytes from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar]
 < [Loaded 
org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder
 from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar]
 < [Loaded 
org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1
 from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar]
 < [Loaded 
org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator
 from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/zjyprc-hadoop-flink1.7-hadoop-pack-2.6.0-mdh2.6.0.4/share/hadoop/yarn/lib/spark-network-common_2.10-1.6.1-mdh1.6.1.11-20180404.052559-1.jar]
 --- > [Loaded org.spark-project.guava.base.Converter from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar]
 > [Loaded org.spark-project.guava.primitives.Longs from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar]
 > [Loaded org.spark-project.guava.primitives.Longs$LongConverter from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar]
 > [Loaded org.spark-project.guava.primitives.UnsignedBytes from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar]
 > [Loaded 
org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder
 from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar]
 > [Loaded 
org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator$1
 from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar]
 > [Loaded 
org.spark-project.guava.primitives.UnsignedBytes$LexicographicalComparatorHolder$UnsafeComparator
 from 
file:/home/liupengcheng/git/infra-client/bin/packages/common-infra_client-pack-openjdk8/bin/packages/flink-1.9.0-mdh1.9.0.0-SNAPSHOT/benchmark-test-1.1-SNAPSHOT-shaded.jar]
 8279a8244,8246
{code}

> Deserializing the input/output formats failed: unread block data
> ----------------------------------------------------------------
>
>                 Key: FLINK-14037
>                 URL: https://issues.apache.org/jira/browse/FLINK-14037
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.9.0
>         Environment: flink 1.9.0
> app jar use `flink-shaded-hadoop-2` dependencies to avoid some confilicts
>  
>            Reporter: liupengcheng
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Recently, we encountered the following issue when testing flink 1.9.0:
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: 8ffbc071dda81d6f8005c02be8adde6b)
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>       at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>       at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
>       at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
> server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>       ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
>       at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>       ... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
> initialize task 'DataSource (at 
> org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390)
>  (org.apache.flink.api.scala.hadoop.mapreduce.HadoopInpu)': Loading the 
> input/output formats failed: 
> org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
>       at 
> org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
>       at 
> org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
>       ... 10 more
> Caused by: java.lang.Exception: Loading the input/output formats failed: 
> org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat@2e179f3e
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
>       ... 20 more
> Caused by: java.lang.RuntimeException: Deserializing the input/output formats 
> failed: unread block data
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
>       ... 22 more
> Caused by: java.lang.IllegalStateException: unread block data
>       at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>       at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>       at 
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
>       at 
> org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
>       ... 23 more
> End of exception on server side>]
>       at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>       at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>       ... 4 more
> {code}
> I checked the classpath:
> {noformat}
> Classpath: 
> lib/flink-table-blink_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/flink-table_2.11-1.9.0-mdh1.9.0.0-SNAPSHOT.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:logback.xml:plugins/README.txt:flink.jar:flink-conf.yaml:
>  {OTHER HADOOP CLASSPATH}{noformat}
> Our app code:
> {code:java}
>     val env = ExecutionEnvironment.getExecutionEnvironment
> //    env.getConfig.enableObjectReuse()
>     val dataSet = env.createInput(HadoopInputs.readHadoopFile(
>       new TeraInputFormat, classOf[Array[Byte]], classOf[Array[Byte]], 
> inputFile))
>       .partitionCustom(new FlinkTeraSortPartitioner(new 
> TeraSortPartitioner(partitions)), 0)
>       .sortPartition(0, Order.ASCENDING)
>     val job = Job.getInstance(new JobConf)
>     val outputFormat = new HadoopOutputFormat[Array[Byte], Array[Byte]](
>       new TeraOutputFormat, job)
>     FileOutputFormat.setOutputPath(job, new Path(outputFile))
>     dataSet.output(outputFormat)
>     env.execute("TeraSort")
> {code}
> I'm trying to find out the root cause, now I suspect that we are missing the 
> `flink-hadoop-compatibility_2.11` package under the flink-dist directory.
>  
> And what's more, I think we should provide better useful information to users 
> rather than such messages which are hard to understand.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to