Hey Arnaud, could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the configured ZK root contains data.
A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure. – Ufuk The complete JM logs will be helpful to further check what's happening there. On 29 November 2016 at 18:15:16, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote: > Hello, > > I have a Flink 1.1.3 batch application that makes a simple aggregation but > freezes when > collect() is called when the app is deployed on a ha-enabled yarn cluster (it > works on > a local cluster). > Just before it hangs, I have the following deserialization error in the logs : > > (...) > 2016-11-29 15:10:10,422 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) switched from > > DEPLOYING to RUNNING > 2016-11-29 15:10:13,175 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key > Remover) > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to FINISHED > 2016-11-29 15:10:17,816 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key > Remover) > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to FINISHED > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > Attempting > to recover all jobs. > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal > error: > Failed to recover jobs. > java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58) > > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:530) > > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526) > > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > Do you have an idea of what can be wrong? I have no problems with other batch > applications, > just with this one. Why is it trying to recover the jobs In the first place ? > Thanks, > Arnaud > > ________________________________ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice > ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute > utilisation > ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de > ce message, > merci de le détruire et d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that > sent this message cannot therefore be held liable for its content nor > attachments. Any > unauthorized use or dissemination is prohibited. If you are not the intended > recipient > of this message, then please delete it and notify the sender. >