Good point with the collect() docs. Would you mind opening a JIRA issue for 
that?

I'm not sure whether you can specify it via that key for YARN. Can you try to 
use -yjm 8192 when submitting the job?

Looping in Robert who knows best whether this config key is picked up or not 
for YARN.

– Ufuk

On 8 December 2016 at 14:05:41, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote:
> Hi Ufuk,
>  
> Yes, I have a large set of data to collect for a data science job that cannot 
> be distributed  
> easily. Increasing the akka.framesize size do get rid of the collect hang 
> (maybe you  
> should highlight this parameter in the collect() documentation, 10Mb si not 
> that big),  
> thanks.
>  
> However my job manager now fails with OutOfMemory.
>  
> Despite the fact that I have setup
> jobmanager.heap.mb: 8192
>  
> in my flink-conf.yaml, logs shows that it was created with less memory (1374 
> Mb) :
>  
> 2016-12-08 13:50:13,808 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - 
> --------------------------------------------------------------------------------
>   
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3, Rev:8e8d454, 
> Date:10.10.2016  
> @ 13:26:32 UTC)
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Current user: datcrypt
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
> 2016-12-08 13:50:13,809 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Maximum heap size: 1374 MiBytes
> 2016-12-08 13:50:13,810 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - JAVA_HOME: /usr/java/default
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - Hadoop version: 2.6.3
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - JVM Options:
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - -Xmx1434M
> 2016-12-08 13:50:13,811 INFO 
> org.apache.flink.yarn.YarnApplicationMasterRunner  
> - 
> -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/container_e17_1480512120243_3635_01_000001/jobmanager.log
>   
>  
>  
> Is there a command line option of flink / env variable that overrides it or 
> am I missing  
> something ?
> -- Arnaud
>  
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:u...@apache.org]
> Envoyé : jeudi 8 décembre 2016 10:49
> À : LINZ, Arnaud ; user@flink.apache.org
> Objet : RE: Collect() freeze on yarn cluster on strange 
> recover/deserialization error  
>  
> I also don't get why the job is recovering, but the oversized message is very 
> likely the  
> cause for the freezing collect, because the data set is gather via Akka.
>  
> You can configure the frame size via "akka.framesize", which defaults to 
> 10485760b  
> (10 MB).
>  
> Is the collected result larger than that? Could you try to increase the frame 
> size and  
> report back?
>  
> – Ufuk
>  
> On 7 December 2016 at 17:57:22, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote:
> > Hi,
> >
> > Any news? It's maybe caused by an oversized akka payload (many
> > akka.remote.OversizedPayloadException: Discarding oversized payload
> > sent to 
> > Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]:  
> > max allowed size 10485760 bytes, actual size of encoded class
> > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMess  
> > age
> > was 69074412 bytes in the log)
> >
> > How do I set akka's maximum-payload-bytes in my flink cluster?
> >
> > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about
> > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but 
> > ExecutionEnvironment.getExecutionEnvironment().  
> >
> > Do I have to change the way I'm doing things ? How ?
> >
> > Thanks,
> > Arnaud
> >
> > -----Message d'origine-----
> > De : LINZ, Arnaud
> > Envoyé : mercredi 30 novembre 2016 08:59 À : user@flink.apache.org
> > Objet : RE: Collect() freeze on yarn cluster on strange
> > recover/deserialization error
> >
> > Hi,
> >
> > Don't think so. I always delete the ZK path before launching the batch
> > (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS
> > rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears 
> > only before  
> the collect() phase, not at the beginning.
> >
> > Full log is availlable here :
> > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r
> >
> > Thanks,
> > Arnaud
> >
> >
> > -----Message d'origine-----
> > De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : mardi 29 novembre
> > 2016 18:43 À : LINZ, Arnaud ; user@flink.apache.org Objet : Re:
> > Collect() freeze on yarn cluster on strange recover/deserialization
> > error
> >
> > Hey Arnaud,
> >
> > could this be a left over job that is recovered from ZooKeeper?
> > Recovery only happens if the configured ZK root contains data.
> >
> > A job is removed from ZooKeeper only if it terminates (e.g. finishes,
> > fails terminally w/o restarting, cancelled). If you just shut down the 
> > cluster this  
> is treated as a failure.
> >
> > – Ufuk
> >
> > The complete JM logs will be helpful to further check what's happening 
> > there.
> >
> >
> > On 29 November 2016 at 18:15:16, LINZ, Arnaud (al...@bouyguestelecom.fr) 
> > wrote:  
> > > Hello,
> > >
> > > I have a Flink 1.1.3 batch application that makes a simple
> > > aggregation but freezes when
> > > collect() is called when the app is deployed on a ha-enabled yarn
> > > cluster (it works on a local cluster).
> > > Just before it hangs, I have the following deserialization error in the 
> > > logs :
> > >
> > > (...)
> > > 2016-11-29 15:10:10,422 INFO
> > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96)
> > > switched from DEPLOYING to RUNNING
> > > 2016-11-29 15:10:13,175 INFO
> > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> > > (Key Remover)
> > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to
> > > FINISHED
> > > 2016-11-29 15:10:17,816 INFO
> > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> > > (Key Remover)
> > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to
> > > FINISHED
> > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager -
> > > Attempting to recover all jobs.
> > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - 
> > > Fatal  
> error:
> > > Failed to recover jobs.
> > > java.io.StreamCorruptedException: invalid type code: 00 at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
> > > at
> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1
> > > 99
> > > 0) at
> > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915
> > > )
> > > at
> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
> > > 17
> > > 98) at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > at
> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1
> > > 99
> > > 0) at
> > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915
> > > )
> > > at
> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
> > > 17
> > > 98) at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > > at java.util.HashMap.readObject(HashMap.java:1184)
> > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at
> > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcce
> > > ss
> > > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
> > > at
> > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:10
> > > 17
> > > ) at
> > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893
> > > )
> > > at
> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
> > > 17
> > > 98) at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > at
> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1
> > > 99
> > > 0) at
> > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915
> > > )
> > > at
> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
> > > 17
> > > 98) at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > at
> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1
> > > 99
> > > 0) at
> > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915
> > > )
> > > at
> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
> > > 17
> > > 98) at
> > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > > at
> > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.  
> > > getState(FileSerializableStateHandle.java:58)
> > > at
> > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.  
> > > getState(FileSerializableStateHandle.java:35)
> > > at
> > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.
> > > re
> > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
> > > at
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag
> > > e$
> > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobMan
> > > ag
> > > er.scala:530) at
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag
> > > e$
> > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc
> > > al
> > > a:526) at
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag
> > > e$
> > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc
> > > al
> > > a:526) at
> > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > > at
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag
> > > e$
> > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526)
> > > at
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag
> > > e$
> > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> > > at
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag
> > > e$
> > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> > > at
> > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1
> > > (F
> > > uture.scala:24) at
> > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.sc
> > > al
> > > a:24) at
> > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > > at
> > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abs
> > > tr
> > > actDispatcher.scala:401) at
> > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > at
> > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.  
> > > java:1339) at
> > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1
> > > 97
> > > 9) at
> > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThr
> > > ea
> > > d.java:107)
> > >
> > >
> > > Do you have an idea of what can be wrong? I have no problems with
> > > other batch applications, just with this one. Why is it trying to
> > > recover the jobs In
> > the first place ?
> > > Thanks,
> > > Arnaud
> > >
> > > ________________________________
> > >
> > > L'intégrité de ce message n'étant pas assurée sur internet, la
> > > société expéditrice ne peut être tenue responsable de son contenu ni
> > > de ses pièces jointes. Toute utilisation ou diffusion non autorisée
> > > est interdite. Si vous n'êtes pas destinataire de ce message, merci
> > > de le détruire et d'avertir
> > l'expéditeur.
> > >
> > > The integrity of this message cannot be guaranteed on the Internet.
> > > The company that sent this message cannot therefore be held liable
> > > for its content nor attachments. Any unauthorized use or
> > > dissemination is prohibited. If you are not the intended recipient
> > > of this message, then please delete
> > it and notify the sender.
> > >
> >
> >
>  
>  

Reply via email to