For point 2. there exists already a JIRA issue [1] and a PR. I hope that we can merge it during this release cycle.
[1] https://issues.apache.org/jira/browse/FLINK-13184 Cheers, Till On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaoga...@gmail.com> wrote: > Hi Datashov, > > We faced similar problems in our production clusters. > > Now both lauching and stopping of containers are performed in the main > thread of YarnResourceManager. As containers are launched and stopped one > after another, it usually takes long time to boostrap large jobs. Things > get worse when some node managers get lost. Yarn will retry many times to > communicate with them, leading to heartbeat timeout of TaskManagers. > > Following are some efforts we made to help Flink deal with large jobs. > > 1. We provision some common jars in all cluster nodes and ask our users > not to include these jars in their uberjar. When containers bootstrap, > these jars are added to the classpath via JVM options. That way, we can > efficiently reduce the size of uberjars. > > 2. We deploys some asynchronous threads to launch and stop containers in > YarnResourceManager. The bootstrap time can be efficiently reduced when > launching a large amount of containers. We'd like to contribute it to the > community very soon. > > 3. We deploys a timeout timer for each launching container. If a task > manager does not register in time after its container has been launched, a > new container will be allocated and launched. That will lead to certain > waste of resources, but can reduce the effects caused by slow or > problematic nodes. > > Now the community is considering the refactoring of ResourceManager. I > think it will be the time for improving its efficiency. > > Regards, > Xiaogang > > Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道: > >> Dear Flink developers, >> >> Having difficulty of getting a Flink job started. >> >> The job's uberjar/fat jar is around 400MB, and I need to kick 800+ >> containers. >> >> The default HDFS replication is 3. >> >> *The Yarn queue is empty, and 800 containers are allocated >> almost immediately by Yarn RM.* >> >> It takes very long time until all 800 nodes (node managers) will download >> Uberjar from HDFS to local machines. >> >> *Q1:* >> >> a) Do all those 800 nodes download of batch of 3 at a time ? ( batch >> size = HDFS replication size) >> >> b) Or Do Flink TM's can replicate from each other ? or already started >> TM's replicate to yet-started nodes? >> >> Most probably answer is (a), but want to confirm. >> >> *Q2:* >> >> What is the recommended way of handling 400MB+ Uberjar with 800+ >> containers ? >> >> Any specific params to tune? >> >> Thanks. >> >> Because downloading the UberJar takes really long time, after around 15 >> minutes since the job kicked, facing this exception: >> >> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to >> start container. >> This token is expired. current time is 1567116179193 found 1567116001610 >> Note: System times on machines may be out of sync. Check system time and >> time zones. >> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown >> Source) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) >> at >> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) >> at >> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) >> at >> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >>