Hi Dadashov,
Regarding your questions. > Q1 Do all those 800 nodes download of batch of 3 at a time The 800+ containers will be allocated on different yarn nodes. By default, the LocalResourceVisibility is APPLICATION, so they will be downloaded only once and shared for all taskmanager containers of a same application in the same node. And the batch is not 3. Even the replica of your jars is 3(hdfs blocks located on 3 different datanodes), a datanode could serve multiple downloads. The limit is bandwidth of the datanode. I guess the bandwidth of your hdfs datanode is not very good.So increase the replica of fat jar will help to reduce the downloading time. And a JIRA ticket has been created.[1] > Q2 What is the recommended way of handling 400MB+ Uberjar with 800+ containers ? >From our online production experience, there are at least 3 optimization ways. 1. Increase the replica of jars in the yarn distributed cache.[1] 2. Increase the container launch number or use NMClientAsync so that the allocated containers could be started asap. Even the startContainer in yarn nodemanager is asynchronous, launching container in FlinkYarnResourceManager is a blocking call. We have to start containers one by one.[2] 3. Use yarn public cache to eliminate unnecessary jar downloading. Such as flink-dist.jar, it will not have to been uploaded ant then localized for each application.[3] Unfortunately, the three features above are under developing. As a work around, you could set dfs.replication=10 in the hdfs-site.xml of HADOOP_CONF_DIR in the flink client machine. [1].https://issues.apache.org/jira/browse/FLINK-12343 [2].https://issues.apache.org/jira/browse/FLINK-13184 [3].https://issues.apache.org/jira/browse/FLINK-13938 Best, Yang Zhu Zhu <reed...@gmail.com> 于2019年9月2日周一 上午10:42写道: > Hi Elkhan, > > >>Regarding "One optimization that we take is letting yarn to reuse the > flink-dist jar which was localized when running previous jobs." > >>We are intending to use Flink Real-time pipeline for Replay from > Hive/HDFS (from offline source), to have 1 single pipeline for both batch > and real-time. So for batch Flink job, the ?>>containers will be released > once the job is done. > >>I guess your job is real-time flink, so you can share the jars from > already long-running jobs. > > This optimization is conducted by making flink dist jar a public > distributed cache of YARN. > In this way, the localized dist jar can be shared by different YARN > applications and it will not be removed when the YARN application which > localized it terminates. > This requires some changes in Flink though. > We will open a ISSUE to contribute this optimization to the community. > > Thanks, > Zhu Zhu > > SHI Xiaogang <shixiaoga...@gmail.com> 于2019年8月31日周六 下午12:57写道: > >> Hi Dadashov, >> >> You may have a look at method YarnResourceManager#onContainersAllocated >> which will launch containers (via NMClient#startContainer) after containers >> are allocated. >> The launching is performed in the main thread of YarnResourceManager and >> the launching is synchronous/blocking. Consequently, the containers will be >> launched one by one. >> >> Regards, >> Xiaogang >> >> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月31日周六 上午2:37写道: >> >>> Thanks everyone for valuable input and sharing your experience for >>> tackling the issue. >>> >>> Regarding suggestions : >>> - We provision some common jars in all cluster nodes *-->* but this >>> requires dependence on Infra Team schedule for handling common jars/updating >>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half >>> size), did not improve much. Only 100 containers could started in time. >>> but then receiving : >>> >>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to >>> start container. >>> This token is expired. current time is 1566422713305 found 1566422560552 >>> Note: System times on machines may be out of sync. Check system time and >>> time zones. >>> >>> >>> - It would be nice to see FLINK-13184 >>> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected >>> version that will get in is 1.10 >>> - Increase replication factor --> It would be nice to have Flink conf >>> for setting replication factor for only Fink job jars, but not the output. >>> It is also challenging to set a replication for yet non-existing directory, >>> the new files will have default replication factor. Will explore HDFS cache >>> option. >>> >>> Maybe another option can be: >>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the >>> jars from already started TaskManagers in P2P fashion, not to have a >>> blocker on HDFS replication. >>> >>> Spark job without any tuning exact same size jar with 800 executors, can >>> start without any issue at the same cluster in less than a minute. >>> >>> *Further questions:* >>> >>> *@ SHI Xiaogang <shixiaoga...@gmail.com <shixiaoga...@gmail.com>> :* >>> >>> I see that all 800 requests are sent concurrently : >>> >>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO >>> org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor >>> container with resources <memory:16384, vCores:1>. Number pending requests >>> 793. >>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO >>> org.apache.flink.yarn.YarnResourceManager - Request slot with profile >>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, >>> nativeMemoryInMB=0, networkMemoryInMB=0} for job >>> e908cb4700d5127a0b67be035e4494f7 with allocation id >>> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}. >>> >>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO >>> org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor >>> container with resources <memory:16384, vCores:1>. Number pending requests >>> 794. >>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO >>> org.apache.flink.yarn.YarnResourceManager - Request slot with profile >>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, >>> nativeMemoryInMB=0, networkMemoryInMB=0} for job >>> e908cb4700d5127a0b67be035e4494f7 with allocation id >>> AllocationID{71bbb917374ade66df4c058c41b81f4e}. >>> ... >>> >>> Can you please elaborate the part "As containers are launched and >>> stopped one after another" ? Any pointer to class/method in Flink? >>> >>> *@ Zhu Zhu <reed...@gmail.com <reed...@gmail.com>> *: >>> >>> Regarding "One optimization that we take is letting yarn to reuse the >>> flink-dist jar which was localized when running previous jobs." >>> >>> We are intending to use Flink Real-time pipeline for Replay from >>> Hive/HDFS (from offline source), to have 1 single pipeline for both batch >>> and real-time. So for batch Flink job, the containers will be released once >>> the job is done. >>> I guess your job is real-time flink, so you can share the jars from >>> already long-running jobs. >>> >>> Thanks. >>> >>> >>> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> I can think of 2 approaches: >>>> >>>> 1. Allow flink to specify the replication of the submitted uber jar. >>>> 2. Allow flink to specify config flink.yarn.lib which is all the flink >>>> related jars that are hosted on hdfs. This way users don't need to build >>>> and submit a fat uber jar every time. And those flink jars hosted on hdfs >>>> can also be specify replication separately. >>>> >>>> >>>> >>>> Till Rohrmann <trohrm...@apache.org> 于2019年8月30日周五 下午3:33写道: >>>> >>>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope >>>>> that we can merge it during this release cycle. >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-13184 >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaoga...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Datashov, >>>>>> >>>>>> We faced similar problems in our production clusters. >>>>>> >>>>>> Now both lauching and stopping of containers are performed in the >>>>>> main thread of YarnResourceManager. As containers are launched and >>>>>> stopped >>>>>> one after another, it usually takes long time to boostrap large jobs. >>>>>> Things get worse when some node managers get lost. Yarn will retry many >>>>>> times to communicate with them, leading to heartbeat timeout of >>>>>> TaskManagers. >>>>>> >>>>>> Following are some efforts we made to help Flink deal with large jobs. >>>>>> >>>>>> 1. We provision some common jars in all cluster nodes and ask our >>>>>> users not to include these jars in their uberjar. When containers >>>>>> bootstrap, these jars are added to the classpath via JVM options. That >>>>>> way, >>>>>> we can efficiently reduce the size of uberjars. >>>>>> >>>>>> 2. We deploys some asynchronous threads to launch and stop containers >>>>>> in YarnResourceManager. The bootstrap time can be efficiently reduced >>>>>> when >>>>>> launching a large amount of containers. We'd like to contribute it to the >>>>>> community very soon. >>>>>> >>>>>> 3. We deploys a timeout timer for each launching container. If a task >>>>>> manager does not register in time after its container has been launched, >>>>>> a >>>>>> new container will be allocated and launched. That will lead to certain >>>>>> waste of resources, but can reduce the effects caused by slow or >>>>>> problematic nodes. >>>>>> >>>>>> Now the community is considering the refactoring of ResourceManager. >>>>>> I think it will be the time for improving its efficiency. >>>>>> >>>>>> Regards, >>>>>> Xiaogang >>>>>> >>>>>> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道: >>>>>> >>>>>>> Dear Flink developers, >>>>>>> >>>>>>> Having difficulty of getting a Flink job started. >>>>>>> >>>>>>> The job's uberjar/fat jar is around 400MB, and I need to kick 800+ >>>>>>> containers. >>>>>>> >>>>>>> The default HDFS replication is 3. >>>>>>> >>>>>>> *The Yarn queue is empty, and 800 containers are allocated >>>>>>> almost immediately by Yarn RM.* >>>>>>> >>>>>>> It takes very long time until all 800 nodes (node managers) will >>>>>>> download Uberjar from HDFS to local machines. >>>>>>> >>>>>>> *Q1:* >>>>>>> >>>>>>> a) Do all those 800 nodes download of batch of 3 at a time ? ( >>>>>>> batch size = HDFS replication size) >>>>>>> >>>>>>> b) Or Do Flink TM's can replicate from each other ? or already >>>>>>> started TM's replicate to yet-started nodes? >>>>>>> >>>>>>> Most probably answer is (a), but want to confirm. >>>>>>> >>>>>>> *Q2:* >>>>>>> >>>>>>> What is the recommended way of handling 400MB+ Uberjar with 800+ >>>>>>> containers ? >>>>>>> >>>>>>> Any specific params to tune? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> Because downloading the UberJar takes really long time, after >>>>>>> around 15 minutes since the job kicked, facing this exception: >>>>>>> >>>>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request >>>>>>> to start container. >>>>>>> This token is expired. current time is 1567116179193 found 1567116001610 >>>>>>> Note: System times on machines may be out of sync. Check system time >>>>>>> and time zones. >>>>>>> at >>>>>>> sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source) >>>>>>> at >>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>> at >>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>>>>>> at >>>>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) >>>>>>> at >>>>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) >>>>>>> at >>>>>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) >>>>>>> at >>>>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) >>>>>>> at >>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >>>>>>> at >>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >>>>>>> at >>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >>>>>>> at >>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >>>>>>> at >>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >>>>>>> at >>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang >>>> >>>