Hi Dadashov,

You may have a look at method YarnResourceManager#onContainersAllocated
which will launch containers (via NMClient#startContainer) after containers
are allocated.
The launching is performed in the main thread of YarnResourceManager and
the launching is synchronous/blocking. Consequently, the containers will be
launched one by one.

Regards,
Xiaogang

Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月31日周六 上午2:37写道:

> Thanks  everyone for valuable input and sharing  your experience for
> tackling the issue.
>
> Regarding suggestions :
> - We provision some common jars in all cluster nodes  *-->*  but this
> requires dependence on Infra Team schedule for handling common jars/updating
> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
> size),  did not improve much. Only 100 containers could started in time.
> but then receiving :
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1566422713305 found 1566422560552
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>
>
> - It would be nice to see FLINK-13184
> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected
> version that will get in is 1.10
> - Increase replication factor --> It would be nice to have Flink conf for
> setting replication factor for only Fink job jars, but not the output. It
> is also challenging to set a replication for yet non-existing directory,
> the new files will have default replication factor. Will explore HDFS cache
> option.
>
> Maybe another option can be:
> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
> jars from already started TaskManagers  in P2P fashion, not to have a
> blocker on HDFS replication.
>
> Spark job without any tuning exact same size jar with 800 executors, can
> start without any issue at the same cluster in less than a minute.
>
> *Further questions:*
>
> *@ SHI Xiaogang <shixiaoga...@gmail.com <shixiaoga...@gmail.com>> :*
>
> I see that all 800 requests are sent concurrently :
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources <memory:16384, vCores:1>. Number pending requests
> 793.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources <memory:16384, vCores:1>. Number pending requests
> 794.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
> ...
>
> Can you please elaborate the part  "As containers are launched and stopped
> one after another" ? Any pointer to class/method in Flink?
>
> *@ Zhu Zhu <reed...@gmail.com <reed...@gmail.com>> *:
>
> Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
>
> We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
> (from offline source), to have 1 single pipeline for both batch and
> real-time. So for batch Flink job, the containers will be released once the
> job is done.
> I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> Thanks.
>
>
> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <zjf...@gmail.com> wrote:
>
>> I can think of 2 approaches:
>>
>> 1. Allow flink to specify the replication of the submitted uber jar.
>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>> related jars that are hosted on hdfs. This way users don't need to build
>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>> can also be specify replication separately.
>>
>>
>>
>> Till Rohrmann <trohrm...@apache.org> 于2019年8月30日周五 下午3:33写道:
>>
>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>>> we can merge it during this release cycle.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaoga...@gmail.com>
>>> wrote:
>>>
>>>> Hi Datashov,
>>>>
>>>> We faced similar problems in our production clusters.
>>>>
>>>> Now both lauching and stopping of containers are performed in the main
>>>> thread of YarnResourceManager. As containers are launched and stopped one
>>>> after another, it usually takes long time to boostrap large jobs. Things
>>>> get worse when some node managers get lost. Yarn will retry many times to
>>>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>>>
>>>> Following are some efforts we made to help Flink deal with large jobs.
>>>>
>>>> 1. We provision some common jars in all cluster nodes and ask our users
>>>> not to include these jars in their uberjar. When containers bootstrap,
>>>> these jars are added to the classpath via JVM options. That way, we can
>>>> efficiently reduce the size of uberjars.
>>>>
>>>> 2. We deploys some asynchronous threads to launch and stop containers
>>>> in YarnResourceManager. The bootstrap time can be efficiently  reduced when
>>>> launching a large amount of containers. We'd like to contribute it to the
>>>> community very soon.
>>>>
>>>> 3. We deploys a timeout timer for each launching container. If a task
>>>> manager does not register in time after its container has been launched, a
>>>> new container will be allocated and launched. That will lead to certain
>>>> waste of resources, but can reduce the effects caused by slow or
>>>> problematic nodes.
>>>>
>>>> Now the community is considering the refactoring of ResourceManager. I
>>>> think it will be the time for improving its efficiency.
>>>>
>>>> Regards,
>>>> Xiaogang
>>>>
>>>> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道:
>>>>
>>>>> Dear Flink developers,
>>>>>
>>>>> Having  difficulty of getting  a Flink job started.
>>>>>
>>>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>>>> containers.
>>>>>
>>>>> The default HDFS replication is 3.
>>>>>
>>>>> *The Yarn queue is empty, and 800 containers  are allocated
>>>>> almost immediately  by Yarn  RM.*
>>>>>
>>>>> It takes very long time until all 800 nodes (node managers) will
>>>>> download Uberjar from HDFS to local machines.
>>>>>
>>>>> *Q1:*
>>>>>
>>>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? (
>>>>> batch size = HDFS replication size)
>>>>>
>>>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>>>> started  TM's replicate  to  yet-started  nodes?
>>>>>
>>>>> Most probably answer is (a), but  want to confirm.
>>>>>
>>>>> *Q2:*
>>>>>
>>>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>>>> containers ?
>>>>>
>>>>> Any specific params to tune?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Because downloading the UberJar takes really   long time, after around
>>>>> 15 minutes since the job kicked, facing this exception:
>>>>>
>>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>>>>> start container.
>>>>> This token is expired. current time is 1567116179193 found 1567116001610
>>>>> Note: System times on machines may be out of sync. Check system time and 
>>>>> time zones.
>>>>>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
>>>>> Source)
>>>>>   at 
>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>   at 
>>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>>>>   at 
>>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>>>>   at 
>>>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>>>>   at 
>>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>>>>   at 
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>>>>   at 
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>>>   at 
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>   at 
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>   at 
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>   at 
>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>   at 
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>   at 
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>   at 
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>>
>>>>>
>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

Reply via email to