It is a very useful optimization in large scale development. In blink we use a thread pool to launch container, and a flink cluster with more than 5000 Yarn containers could work well. Hope Xintong Song could contribute it to flink soon.
Xintong Song <tonysong...@gmail.com> 于2019年7月10日周三 下午2:20写道: > Thanks for the kindly offer, Qi. > > I think this work should not take much time, so I can take care of it. > It's just the community is currently under feature freeze for release 1.9, > so we need to wait until the release code branch being cut. > > > Thank you~ > > Xintong Song > > > > On Wed, Jul 10, 2019 at 1:55 PM qi luo <luoqi...@gmail.com> wrote: > >> Thanks Xintong and Haibo, I’ve found the fix in the blink branch. >> >> We’re also glad to help contribute this patch to community version, in >> case you don’t have time. >> >> Regards, >> Qi >> >> On Jul 10, 2019, at 11:51 AM, Haibo Sun <sunhaib...@163.com> wrote: >> >> Hi, Qi >> >> Sorry, by talking to Xintong Song offline, I made sure that some of what >> I said before is wrong. Please refer to Xintong's answer. >> >> Best, >> Haibo >> >> At 2019-07-10 11:37:16, "Xintong Song" <tonysong...@gmail.com> wrote: >> >> Hi Qi, >> >> Thanks for reporting this problem. I think you are right. Starting large >> amount of TMs in main thread on YARN could take relative long time, causing >> RM to become unresponsive. In our enterprise version Blink, we actually >> have a thread pool for starting TMs. I think we should contribute this >> feature to the community version as well. I created a JIRA ticket >> <https://issues.apache.org/jira/browse/FLINK-13184> from which you can >> track the progress of this issue. >> >> For solving your problem at the moment, I agree with Haibo that configure >> a larger registration timeout could be a workaround. >> >> Thank you~ >> Xintong Song >> >> >> >> On Wed, Jul 10, 2019 at 10:37 AM Haibo Sun <sunhaib...@163.com> wrote: >> >>> Hi, Qi >>> >>> According to our experience, it is no problem to allocate more than 1000 >>> containers when the registration timeout is set 5 minutes . Perhaps there >>> are other reasons? Or you can try to increase the value of >>> `taskmanager.registration.timeout`. For allocating containers using >>> multi-thread, I personally think it's going to get very complicated, and >>> the more recommended way is to put some waiting works into asynchronous >>> processing, so as to liberate the main thread. >>> >>> Best, >>> Haibo >>> >>> >>> At 2019-07-09 21:05:51, "qi luo" <luoqi...@gmail.com> wrote: >>> >>> Hi guys, >>> >>> We’re using latest version Flink YarnResourceManager, but our job >>> startup occasionally hangs when allocating many Yarn containers (e.g. >>> >1000). I checked the related code in YarnResourceManager as below: >>> >>> <PastedGraphic-1.tiff> >>> >>> It seems that it handles all allocated containers and starts TM in main >>> thread. Thus when containers allocations are heavy, the RM thread becomes >>> unresponsive (such as no response to TM heartbeats, see TM logs as below). >>> >>> Any idea on how to better handle such case (e.g. multi-threading to >>> handle allocated containers) would be very appreciated. Thanks! >>> >>> Regards, >>> Qi >>> >>> >>> ———————————————————————— >>> TM log: >>> >>> 2019-07-09 13:56:59,110 INFO >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Connecting >>> to ResourceManager >>> akka.tcp://flink@xxx/user/resourcemanager(00000000000000000000000000000000). >>> 2019-07-09 14:00:01,138 INFO >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not >>> resolve ResourceManager address akka.tcp://flink@xxx/user/resourcemanager, >>> retrying in 10000 ms: Ask timed out on >>> [ActorSelection[Anchor(akka.tcp://flink@xxx/), >>> Path(/user/resourcemanager)]] after [182000 ms]. Sender[null] sent message >>> of type "akka.actor.Identify".. >>> 2019-07-09 14:01:59,137 ERROR >>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Fatal error >>> occurred in TaskExecutor akka.tcp://flink@xxx/user/taskmanager_0. >>> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: >>> Could not register at the ResourceManager within the specified maximum >>> registration duration 300000 ms. This indicates a problem with this >>> instance. Terminating now. >>> at >>> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1023) >>> at >>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1009) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >>> at >>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> >>