Hi Dadashov,

Regarding your questions.


> Q1 Do all those 800 nodes download of batch of  3  at a time

The 800+ containers will be allocated on different yarn nodes. By default,
the LocalResourceVisibility is APPLICATION, so they will be downloaded only
once and shared for all taskmanager containers of a same application in the
same node. And the batch is not 3. Even the replica of your jars is 3(hdfs
blocks located on 3 different datanodes), a datanode could serve multiple
downloads. The limit is bandwidth of the datanode. I guess the bandwidth of
your hdfs datanode is not very good.So increase the replica of fat jar will
help to reduce the downloading time. And a JIRA ticket has been created.[1]


> Q2 What is the recommended way of handling 400MB+ Uberjar with 800+
containers ?

>From our online production experience, there are at least 3 optimization
ways.

   1. Increase the replica of jars in the yarn distributed cache.[1]
   2. Increase the container launch number or use NMClientAsync so that the
   allocated containers could be started asap. Even the startContainer in yarn
   nodemanager is asynchronous, launching container in
   FlinkYarnResourceManager is a blocking call. We have to start containers
   one by one.[2]
   3. Use yarn public cache to eliminate unnecessary jar downloading. Such
   as flink-dist.jar, it will not have to been uploaded ant then localized for
   each application.[3]


Unfortunately, the three features above are under developing. As a work
around, you could set dfs.replication=10 in the hdfs-site.xml of
HADOOP_CONF_DIR in the flink client machine.



[1].https://issues.apache.org/jira/browse/FLINK-12343

[2].https://issues.apache.org/jira/browse/FLINK-13184

[3].https://issues.apache.org/jira/browse/FLINK-13938



Best,

Yang

Zhu Zhu <reed...@gmail.com> 于2019年9月2日周一 上午10:42写道:

> Hi Elkhan,
>
> >>Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
> >>We are intending to use Flink Real-time pipeline for Replay from
> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
> and real-time. So for batch Flink job, the ?>>containers will be released
> once the job is done.
> >>I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> This optimization is conducted by making flink dist jar a public
> distributed cache of YARN.
> In this way, the localized dist jar can be shared by different YARN
> applications and it will not be removed when the YARN application which
> localized it terminates.
> This requires some changes in Flink though.
> We will open a ISSUE to contribute this optimization to the community.
>
> Thanks,
> Zhu Zhu
>
> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年8月31日周六 下午12:57写道:
>
>> Hi Dadashov,
>>
>> You may have a look at method YarnResourceManager#onContainersAllocated
>> which will launch containers (via NMClient#startContainer) after containers
>> are allocated.
>> The launching is performed in the main thread of YarnResourceManager and
>> the launching is synchronous/blocking. Consequently, the containers will be
>> launched one by one.
>>
>> Regards,
>> Xiaogang
>>
>> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月31日周六 上午2:37写道:
>>
>>> Thanks  everyone for valuable input and sharing  your experience for
>>> tackling the issue.
>>>
>>> Regarding suggestions :
>>> - We provision some common jars in all cluster nodes  *-->*  but this
>>> requires dependence on Infra Team schedule for handling common jars/updating
>>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>>> size),  did not improve much. Only 100 containers could started in time.
>>> but then receiving :
>>>
>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>>> start container.
>>> This token is expired. current time is 1566422713305 found 1566422560552
>>> Note: System times on machines may be out of sync. Check system time and 
>>> time zones.
>>>
>>>
>>> - It would be nice to see FLINK-13184
>>> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected
>>> version that will get in is 1.10
>>> - Increase replication factor --> It would be nice to have Flink conf
>>> for setting replication factor for only Fink job jars, but not the output.
>>> It is also challenging to set a replication for yet non-existing directory,
>>> the new files will have default replication factor. Will explore HDFS cache
>>> option.
>>>
>>> Maybe another option can be:
>>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>>> jars from already started TaskManagers  in P2P fashion, not to have a
>>> blocker on HDFS replication.
>>>
>>> Spark job without any tuning exact same size jar with 800 executors, can
>>> start without any issue at the same cluster in less than a minute.
>>>
>>> *Further questions:*
>>>
>>> *@ SHI Xiaogang <shixiaoga...@gmail.com <shixiaoga...@gmail.com>> :*
>>>
>>> I see that all 800 requests are sent concurrently :
>>>
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>>> container with resources <memory:16384, vCores:1>. Number pending requests
>>> 793.
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>>> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>>>
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>>> container with resources <memory:16384, vCores:1>. Number pending requests
>>> 794.
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>>> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
>>> ...
>>>
>>> Can you please elaborate the part  "As containers are launched and
>>> stopped one after another" ? Any pointer to class/method in Flink?
>>>
>>> *@ Zhu Zhu <reed...@gmail.com <reed...@gmail.com>> *:
>>>
>>> Regarding "One optimization that we take is letting yarn to reuse the
>>> flink-dist jar which was localized when running previous jobs."
>>>
>>> We are intending to use Flink Real-time pipeline for Replay from
>>> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
>>> and real-time. So for batch Flink job, the containers will be released once
>>> the job is done.
>>> I guess your job is real-time flink, so  you can share the  jars from
>>> already long-running jobs.
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> I can think of 2 approaches:
>>>>
>>>> 1. Allow flink to specify the replication of the submitted uber jar.
>>>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>>>> related jars that are hosted on hdfs. This way users don't need to build
>>>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>>>> can also be specify replication separately.
>>>>
>>>>
>>>>
>>>> Till Rohrmann <trohrm...@apache.org> 于2019年8月30日周五 下午3:33写道:
>>>>
>>>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope
>>>>> that we can merge it during this release cycle.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaoga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Datashov,
>>>>>>
>>>>>> We faced similar problems in our production clusters.
>>>>>>
>>>>>> Now both lauching and stopping of containers are performed in the
>>>>>> main thread of YarnResourceManager. As containers are launched and 
>>>>>> stopped
>>>>>> one after another, it usually takes long time to boostrap large jobs.
>>>>>> Things get worse when some node managers get lost. Yarn will retry many
>>>>>> times to communicate with them, leading to heartbeat timeout of
>>>>>> TaskManagers.
>>>>>>
>>>>>> Following are some efforts we made to help Flink deal with large jobs.
>>>>>>
>>>>>> 1. We provision some common jars in all cluster nodes and ask our
>>>>>> users not to include these jars in their uberjar. When containers
>>>>>> bootstrap, these jars are added to the classpath via JVM options. That 
>>>>>> way,
>>>>>> we can efficiently reduce the size of uberjars.
>>>>>>
>>>>>> 2. We deploys some asynchronous threads to launch and stop containers
>>>>>> in YarnResourceManager. The bootstrap time can be efficiently  reduced 
>>>>>> when
>>>>>> launching a large amount of containers. We'd like to contribute it to the
>>>>>> community very soon.
>>>>>>
>>>>>> 3. We deploys a timeout timer for each launching container. If a task
>>>>>> manager does not register in time after its container has been launched, 
>>>>>> a
>>>>>> new container will be allocated and launched. That will lead to certain
>>>>>> waste of resources, but can reduce the effects caused by slow or
>>>>>> problematic nodes.
>>>>>>
>>>>>> Now the community is considering the refactoring of ResourceManager.
>>>>>> I think it will be the time for improving its efficiency.
>>>>>>
>>>>>> Regards,
>>>>>> Xiaogang
>>>>>>
>>>>>> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道:
>>>>>>
>>>>>>> Dear Flink developers,
>>>>>>>
>>>>>>> Having  difficulty of getting  a Flink job started.
>>>>>>>
>>>>>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>>>>>> containers.
>>>>>>>
>>>>>>> The default HDFS replication is 3.
>>>>>>>
>>>>>>> *The Yarn queue is empty, and 800 containers  are allocated
>>>>>>> almost immediately  by Yarn  RM.*
>>>>>>>
>>>>>>> It takes very long time until all 800 nodes (node managers) will
>>>>>>> download Uberjar from HDFS to local machines.
>>>>>>>
>>>>>>> *Q1:*
>>>>>>>
>>>>>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? (
>>>>>>> batch size = HDFS replication size)
>>>>>>>
>>>>>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>>>>>> started  TM's replicate  to  yet-started  nodes?
>>>>>>>
>>>>>>> Most probably answer is (a), but  want to confirm.
>>>>>>>
>>>>>>> *Q2:*
>>>>>>>
>>>>>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>>>>>> containers ?
>>>>>>>
>>>>>>> Any specific params to tune?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> Because downloading the UberJar takes really   long time, after
>>>>>>> around 15 minutes since the job kicked, facing this exception:
>>>>>>>
>>>>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request 
>>>>>>> to start container.
>>>>>>> This token is expired. current time is 1567116179193 found 1567116001610
>>>>>>> Note: System times on machines may be out of sync. Check system time 
>>>>>>> and time zones.
>>>>>>>         at 
>>>>>>> sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source)
>>>>>>>         at 
>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>         at 
>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>>         at 
>>>>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>>>>>>         at 
>>>>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>>>>>>         at 
>>>>>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>>>>>>         at 
>>>>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>>>         at 
>>>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>         at 
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>         at 
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>         at 
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>         at 
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>

Reply via email to