Hi Dadashov, You may have a look at method YarnResourceManager#onContainersAllocated which will launch containers (via NMClient#startContainer) after containers are allocated. The launching is performed in the main thread of YarnResourceManager and the launching is synchronous/blocking. Consequently, the containers will be launched one by one.
Regards, Xiaogang Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月31日周六 上午2:37写道: > Thanks everyone for valuable input and sharing your experience for > tackling the issue. > > Regarding suggestions : > - We provision some common jars in all cluster nodes *-->* but this > requires dependence on Infra Team schedule for handling common jars/updating > - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half > size), did not improve much. Only 100 containers could started in time. > but then receiving : > > org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to > start container. > This token is expired. current time is 1566422713305 found 1566422560552 > Note: System times on machines may be out of sync. Check system time and time > zones. > > > - It would be nice to see FLINK-13184 > <https://issues.apache.org/jira/browse/FLINK-13184> , but expected > version that will get in is 1.10 > - Increase replication factor --> It would be nice to have Flink conf for > setting replication factor for only Fink job jars, but not the output. It > is also challenging to set a replication for yet non-existing directory, > the new files will have default replication factor. Will explore HDFS cache > option. > > Maybe another option can be: > - Letting yet-to-be-started Task Managers (or NodeManagers) download the > jars from already started TaskManagers in P2P fashion, not to have a > blocker on HDFS replication. > > Spark job without any tuning exact same size jar with 800 executors, can > start without any issue at the same cluster in less than a minute. > > *Further questions:* > > *@ SHI Xiaogang <shixiaoga...@gmail.com <shixiaoga...@gmail.com>> :* > > I see that all 800 requests are sent concurrently : > > 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO > org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor > container with resources <memory:16384, vCores:1>. Number pending requests > 793. > 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO > org.apache.flink.yarn.YarnResourceManager - Request slot with profile > ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, > nativeMemoryInMB=0, networkMemoryInMB=0} for job > e908cb4700d5127a0b67be035e4494f7 with allocation id > AllocationID{cb016f7ce1eac1342001ccdb1427ba07}. > > 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO > org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor > container with resources <memory:16384, vCores:1>. Number pending requests > 794. > 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO > org.apache.flink.yarn.YarnResourceManager - Request slot with profile > ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, > nativeMemoryInMB=0, networkMemoryInMB=0} for job > e908cb4700d5127a0b67be035e4494f7 with allocation id > AllocationID{71bbb917374ade66df4c058c41b81f4e}. > ... > > Can you please elaborate the part "As containers are launched and stopped > one after another" ? Any pointer to class/method in Flink? > > *@ Zhu Zhu <reed...@gmail.com <reed...@gmail.com>> *: > > Regarding "One optimization that we take is letting yarn to reuse the > flink-dist jar which was localized when running previous jobs." > > We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS > (from offline source), to have 1 single pipeline for both batch and > real-time. So for batch Flink job, the containers will be released once the > job is done. > I guess your job is real-time flink, so you can share the jars from > already long-running jobs. > > Thanks. > > > On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <zjf...@gmail.com> wrote: > >> I can think of 2 approaches: >> >> 1. Allow flink to specify the replication of the submitted uber jar. >> 2. Allow flink to specify config flink.yarn.lib which is all the flink >> related jars that are hosted on hdfs. This way users don't need to build >> and submit a fat uber jar every time. And those flink jars hosted on hdfs >> can also be specify replication separately. >> >> >> >> Till Rohrmann <trohrm...@apache.org> 于2019年8月30日周五 下午3:33写道: >> >>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that >>> we can merge it during this release cycle. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-13184 >>> >>> Cheers, >>> Till >>> >>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaoga...@gmail.com> >>> wrote: >>> >>>> Hi Datashov, >>>> >>>> We faced similar problems in our production clusters. >>>> >>>> Now both lauching and stopping of containers are performed in the main >>>> thread of YarnResourceManager. As containers are launched and stopped one >>>> after another, it usually takes long time to boostrap large jobs. Things >>>> get worse when some node managers get lost. Yarn will retry many times to >>>> communicate with them, leading to heartbeat timeout of TaskManagers. >>>> >>>> Following are some efforts we made to help Flink deal with large jobs. >>>> >>>> 1. We provision some common jars in all cluster nodes and ask our users >>>> not to include these jars in their uberjar. When containers bootstrap, >>>> these jars are added to the classpath via JVM options. That way, we can >>>> efficiently reduce the size of uberjars. >>>> >>>> 2. We deploys some asynchronous threads to launch and stop containers >>>> in YarnResourceManager. The bootstrap time can be efficiently reduced when >>>> launching a large amount of containers. We'd like to contribute it to the >>>> community very soon. >>>> >>>> 3. We deploys a timeout timer for each launching container. If a task >>>> manager does not register in time after its container has been launched, a >>>> new container will be allocated and launched. That will lead to certain >>>> waste of resources, but can reduce the effects caused by slow or >>>> problematic nodes. >>>> >>>> Now the community is considering the refactoring of ResourceManager. I >>>> think it will be the time for improving its efficiency. >>>> >>>> Regards, >>>> Xiaogang >>>> >>>> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道: >>>> >>>>> Dear Flink developers, >>>>> >>>>> Having difficulty of getting a Flink job started. >>>>> >>>>> The job's uberjar/fat jar is around 400MB, and I need to kick 800+ >>>>> containers. >>>>> >>>>> The default HDFS replication is 3. >>>>> >>>>> *The Yarn queue is empty, and 800 containers are allocated >>>>> almost immediately by Yarn RM.* >>>>> >>>>> It takes very long time until all 800 nodes (node managers) will >>>>> download Uberjar from HDFS to local machines. >>>>> >>>>> *Q1:* >>>>> >>>>> a) Do all those 800 nodes download of batch of 3 at a time ? ( >>>>> batch size = HDFS replication size) >>>>> >>>>> b) Or Do Flink TM's can replicate from each other ? or already >>>>> started TM's replicate to yet-started nodes? >>>>> >>>>> Most probably answer is (a), but want to confirm. >>>>> >>>>> *Q2:* >>>>> >>>>> What is the recommended way of handling 400MB+ Uberjar with 800+ >>>>> containers ? >>>>> >>>>> Any specific params to tune? >>>>> >>>>> Thanks. >>>>> >>>>> Because downloading the UberJar takes really long time, after around >>>>> 15 minutes since the job kicked, facing this exception: >>>>> >>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to >>>>> start container. >>>>> This token is expired. current time is 1567116179193 found 1567116001610 >>>>> Note: System times on machines may be out of sync. Check system time and >>>>> time zones. >>>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown >>>>> Source) >>>>> at >>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>>>> at >>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) >>>>> at >>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) >>>>> at >>>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) >>>>> at >>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >>>>> at >>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >>>>> >>>>> >>>>> >> >> -- >> Best Regards >> >> Jeff Zhang >> >