Hi Alex, could you share with us the full logs of the client and the cluster entrypoint? That would be tremendously helpful.
Cheers, Till On Wed, Jul 25, 2018 at 4:08 AM vino yang <[email protected]> wrote: > Hi Alex, > > Is it possible that the data has been corrupted? > > Or have you confirmed that the avro version is consistent in different > Flink versions? > > Also, if you don't upgrade Flink and still use version 1.3.1, can it be > recovered? > > Thanks, vino. > > > 2018-07-25 8:32 GMT+08:00 Alex Vinnik <[email protected]>: > >> Vino, >> >> Upgraded flink to Hadoop 2.8.1 >> >> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep >> entrypoint | grep 'Hadoop version' >> 2018-07-25T00:19:46.142+0000 >> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop >> version: 2.8.1 >> >> but job still fails to start >> >> Ideas? >> >> Caused by: org.apache.flink.util.FlinkException: Failed to submit job >> d84cccd3bffcba1f243352a5e5ef99a9. >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> ... 4 more >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could >> not set up JobManager >> at >> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169) >> at >> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885) >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287) >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277) >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262) >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249) >> ... 21 more >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot >> initialize task 'DataSink >> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)': >> Deserializing the OutputFormat >> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) >> failed: unread block data >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130) >> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298) >> at >> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151) >> ... 26 more >> Caused by: java.lang.Exception: Deserializing the OutputFormat >> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) >> failed: unread block data >> at >> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) >> ... 31 more >> Caused by: java.lang.IllegalStateException: unread block data >> at >> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463) >> at >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424) >> at >> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) >> at >> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60) >> ... 32 more >> >> >> On Tue, Jul 24, 2018 at 10:32 AM vino yang <[email protected]> wrote: >> >>> Hi Alex, >>> >>> Based on your log information, the potential reason is Hadoop version. >>> To troubleshoot the exception comes from different Hadoop version. I >>> suggest you match the both side of Hadoop version. >>> >>> You can : >>> >>> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's >>> official website provides the binary binding Hadoop 2.8.[1] >>> 2. downgrade your fat jar's Hadoop client dependency's version to match >>> Flink Cluster's hadoop dependency's version. >>> >>> [1]: >>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz >>> >>> Thanks, vino. >>> >>> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <[email protected]>: >>> >>>> Hi Till, >>>> >>>> Thanks for responding. Below is entrypoint logs. One thing I noticed >>>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. >>>> Could it be a reason for that error? If so how can i use same hadoop >>>> version 2.8 on flink server side? BTW job runs fine locally reading from >>>> the same s3a buckets when executed using createLocalEnvironment via java >>>> -jar my-fat.jar --input s3a://foo --output s3a://bar >>>> >>>> Regarding java version. The job is submitted via Flink UI, so it should >>>> not be a problem. >>>> >>>> Thanks a lot in advance. >>>> >>>> 2018-07-24T12:09:38.083+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> -------------------------------------------------------------------------------- >>>> 2018-07-24T12:09:38.085+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting >>>> StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, >>>> Date:24.05.2018 @ 14:54:44 UTC) >>>> 2018-07-24T12:09:38.085+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current >>>> user: flink >>>> 2018-07-24T12:09:38.844+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current >>>> Hadoop/Kerberos user: flink >>>> 2018-07-24T12:09:38.844+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK >>>> 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11 >>>> 2018-07-24T12:09:38.844+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap >>>> size: 1963 MiBytes >>>> 2018-07-24T12:09:38.844+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME: >>>> /docker-java-home/jre >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop >>>> version: 2.7.3 >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options: >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> -Dcom.amazonaws.sdk.disableCertChecking >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015 >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml >>>> 2018-07-24T12:09:38.851+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program >>>> Arguments: >>>> 2018-07-24T12:09:38.852+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> --configDir >>>> 2018-07-24T12:09:38.852+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> /opt/flink/conf >>>> 2018-07-24T12:09:38.852+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> --executionMode >>>> 2018-07-24T12:09:38.853+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster >>>> 2018-07-24T12:09:38.853+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host >>>> 2018-07-24T12:09:38.853+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster >>>> 2018-07-24T12:09:38.853+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath: >>>> /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar::: >>>> 2018-07-24T12:09:38.853+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO >>>> -------------------------------------------------------------------------------- >>>> 2018-07-24T12:09:38.854+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered >>>> UNIX signal handlers for [TERM, HUP, INT] >>>> 2018-07-24T12:09:38.895+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting >>>> StandaloneSessionClusterEntrypoint. >>>> 2018-07-24T12:09:38.895+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install >>>> default filesystem. >>>> 2018-07-24T12:09:38.927+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install >>>> security context. >>>> 2018-07-24T12:09:39.034+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing >>>> cluster services. >>>> 2018-07-24T12:09:39.059+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to >>>> start actor system at flink-jobmanager:6123 >>>> 2018-07-24T12:09:40.335+0000 >>>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system >>>> started at akka.tcp://flink@flink-jobmanager:6123 >>>> >>>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[email protected]> >>>> wrote: >>>> >>>>> Hi Alex, >>>>> >>>>> I'm not entirely sure what causes this problem because it is the first >>>>> time I see it. >>>>> >>>>> First question would be if the problem also arises if using a >>>>> different Hadoop version. >>>>> >>>>> Are you using the same Java versions on the client as well as on the >>>>> server? >>>>> >>>>> Could you provide us with the cluster entrypoint logs? >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[email protected]> >>>>> wrote: >>>>> >>>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink >>>>>> 1.5 and getting a weird exception. >>>>>> >>>>>> Job reads json from s3a and writes parquet files to s3a with avro >>>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have >>>>>> access to S3AFileSystem class. >>>>>> >>>>>> Fails here >>>>>> >>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288 >>>>>> with >>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat >>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) >>>>>> failed: unread block data >>>>>> >>>>>> To be exact it fails right on that line. >>>>>> >>>>>> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488 >>>>>> >>>>>> Not sure how to resolve this problem. Looking for an advice. Let me >>>>>> know if more info is needed. Full stack is below. Thanks. >>>>>> >>>>>> org.apache.flink.runtime.rest.handler.RestHandlerException: >>>>>> org.apache.flink.util.FlinkException: Failed to submit job >>>>>> 13a1478cbc7ec20f93f9ee0947856bfd. >>>>>> at >>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>>>> at >>>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811) >>>>>> at akka.dispatch.OnComplete.internal(Future.scala:258) >>>>>> at akka.dispatch.OnComplete.internal(Future.scala:256) >>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) >>>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) >>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>>>> at >>>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) >>>>>> at >>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>>>>> at >>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) >>>>>> at >>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) >>>>>> at >>>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) >>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>>>> at >>>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>>>>> at >>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>>>>> at >>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>>>> at >>>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>>>> at >>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>>>>> at >>>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> Caused by: java.util.concurrent.CompletionException: >>>>>> org.apache.flink.util.FlinkException: Failed to submit job >>>>>> 13a1478cbc7ec20f93f9ee0947856bfd. >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) >>>>>> at >>>>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) >>>>>> ... 29 more >>>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job >>>>>> 13a1478cbc7ec20f93f9ee0947856bfd. >>>>>> at >>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >>>>>> at >>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>>> ... 4 more >>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>> Could not set up JobManager >>>>>> at >>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169) >>>>>> at >>>>>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885) >>>>>> at >>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287) >>>>>> at >>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277) >>>>>> at >>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262) >>>>>> at >>>>>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249) >>>>>> ... 21 more >>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>> Cannot initialize task 'DataSink >>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': >>>>>> Deserializing the OutputFormat >>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) >>>>>> failed: unread block data >>>>>> at >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220) >>>>>> at >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) >>>>>> at >>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150) >>>>>> at >>>>>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130) >>>>>> at >>>>>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298) >>>>>> at >>>>>> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151) >>>>>> ... 26 more >>>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat >>>>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) >>>>>> failed: unread block data >>>>>> at >>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63) >>>>>> at >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) >>>>>> ... 31 more >>>>>> Caused by: java.lang.IllegalStateException: unread block data >>>>>> at >>>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603) >>>>>> at >>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) >>>>>> at >>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) >>>>>> at >>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) >>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) >>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424) >>>>>> at >>>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) >>>>>> at >>>>>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60) >>>>>> ... 32 more >>>>>> >>>>>> >>> >
