Good point with the collect() docs. Would you mind opening a JIRA issue for that?
I'm not sure whether you can specify it via that key for YARN. Can you try to use -yjm 8192 when submitting the job? Looping in Robert who knows best whether this config key is picked up or not for YARN. – Ufuk On 8 December 2016 at 14:05:41, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote: > Hi Ufuk, > > Yes, I have a large set of data to collect for a data science job that cannot > be distributed > easily. Increasing the akka.framesize size do get rid of the collect hang > (maybe you > should highlight this parameter in the collect() documentation, 10Mb si not > that big), > thanks. > > However my job manager now fails with OutOfMemory. > > Despite the fact that I have setup > jobmanager.heap.mb: 8192 > > in my flink-conf.yaml, logs shows that it was created with less memory (1374 > Mb) : > > 2016-12-08 13:50:13,808 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - > -------------------------------------------------------------------------------- > > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3, Rev:8e8d454, > Date:10.10.2016 > @ 13:26:32 UTC) > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Current user: datcrypt > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08 > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Maximum heap size: 1374 MiBytes > 2016-12-08 13:50:13,810 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - JAVA_HOME: /usr/java/default > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Hadoop version: 2.6.3 > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - JVM Options: > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - -Xmx1434M > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - > -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/container_e17_1480512120243_3635_01_000001/jobmanager.log > > > > Is there a command line option of flink / env variable that overrides it or > am I missing > something ? > -- Arnaud > > -----Message d'origine----- > De : Ufuk Celebi [mailto:u...@apache.org] > Envoyé : jeudi 8 décembre 2016 10:49 > À : LINZ, Arnaud ; user@flink.apache.org > Objet : RE: Collect() freeze on yarn cluster on strange > recover/deserialization error > > I also don't get why the job is recovering, but the oversized message is very > likely the > cause for the freezing collect, because the data set is gather via Akka. > > You can configure the frame size via "akka.framesize", which defaults to > 10485760b > (10 MB). > > Is the collected result larger than that? Could you try to increase the frame > size and > report back? > > – Ufuk > > On 7 December 2016 at 17:57:22, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote: > > Hi, > > > > Any news? It's maybe caused by an oversized akka payload (many > > akka.remote.OversizedPayloadException: Discarding oversized payload > > sent to > > Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: > > max allowed size 10485760 bytes, actual size of encoded class > > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMess > > age > > was 69074412 bytes in the log) > > > > How do I set akka's maximum-payload-bytes in my flink cluster? > > > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about > > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but > > ExecutionEnvironment.getExecutionEnvironment(). > > > > Do I have to change the way I'm doing things ? How ? > > > > Thanks, > > Arnaud > > > > -----Message d'origine----- > > De : LINZ, Arnaud > > Envoyé : mercredi 30 novembre 2016 08:59 À : user@flink.apache.org > > Objet : RE: Collect() freeze on yarn cluster on strange > > recover/deserialization error > > > > Hi, > > > > Don't think so. I always delete the ZK path before launching the batch > > (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS > > rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears > > only before > the collect() phase, not at the beginning. > > > > Full log is availlable here : > > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r > > > > Thanks, > > Arnaud > > > > > > -----Message d'origine----- > > De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : mardi 29 novembre > > 2016 18:43 À : LINZ, Arnaud ; user@flink.apache.org Objet : Re: > > Collect() freeze on yarn cluster on strange recover/deserialization > > error > > > > Hey Arnaud, > > > > could this be a left over job that is recovered from ZooKeeper? > > Recovery only happens if the configured ZK root contains data. > > > > A job is removed from ZooKeeper only if it terminates (e.g. finishes, > > fails terminally w/o restarting, cancelled). If you just shut down the > > cluster this > is treated as a failure. > > > > – Ufuk > > > > The complete JM logs will be helpful to further check what's happening > > there. > > > > > > On 29 November 2016 at 18:15:16, LINZ, Arnaud (al...@bouyguestelecom.fr) > > wrote: > > > Hello, > > > > > > I have a Flink 1.1.3 batch application that makes a simple > > > aggregation but freezes when > > > collect() is called when the app is deployed on a ha-enabled yarn > > > cluster (it works on a local cluster). > > > Just before it hangs, I have the following deserialization error in the > > > logs : > > > > > > (...) > > > 2016-11-29 15:10:10,422 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > > > switched from DEPLOYING to RUNNING > > > 2016-11-29 15:10:13,175 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > > (Key Remover) > > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > > > FINISHED > > > 2016-11-29 15:10:17,816 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > > (Key Remover) > > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > > > FINISHED > > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > > > Attempting to recover all jobs. > > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - > > > Fatal > error: > > > Failed to recover jobs. > > > java.io.StreamCorruptedException: invalid type code: 00 at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > at java.util.HashMap.readObject(HashMap.java:1184) > > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcce > > > ss > > > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) > > > at > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:10 > > > 17 > > > ) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > at > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > getState(FileSerializableStateHandle.java:58) > > > at > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > getState(FileSerializableStateHandle.java:35) > > > at > > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore. > > > re > > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobMan > > > ag > > > er.scala:530) at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc > > > al > > > a:526) at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc > > > al > > > a:526) at > > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > at > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1 > > > (F > > > uture.scala:24) at > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.sc > > > al > > > a:24) at > > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > > at > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abs > > > tr > > > actDispatcher.scala:401) at > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > > > java:1339) at > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1 > > > 97 > > > 9) at > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThr > > > ea > > > d.java:107) > > > > > > > > > Do you have an idea of what can be wrong? I have no problems with > > > other batch applications, just with this one. Why is it trying to > > > recover the jobs In > > the first place ? > > > Thanks, > > > Arnaud > > > > > > ________________________________ > > > > > > L'intégrité de ce message n'étant pas assurée sur internet, la > > > société expéditrice ne peut être tenue responsable de son contenu ni > > > de ses pièces jointes. Toute utilisation ou diffusion non autorisée > > > est interdite. Si vous n'êtes pas destinataire de ce message, merci > > > de le détruire et d'avertir > > l'expéditeur. > > > > > > The integrity of this message cannot be guaranteed on the Internet. > > > The company that sent this message cannot therefore be held liable > > > for its content nor attachments. Any unauthorized use or > > > dissemination is prohibited. If you are not the intended recipient > > > of this message, then please delete > > it and notify the sender. > > > > > > > > >