Hi, I guess you can workaround the current limitation by increasing the client.timeout. [1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#client-timeout On Thu, Apr 15, 2021 at 7:06 AM 太平洋 <495635...@qq.com> wrote: > Thanks. My Program read hundreds of small files from s3 by SQL. What has > happened in the instantiation of the SplitEnumerator? What can i do to > reduce the time now? > > ------------------ 原始邮件 ------------------ > *发件人:* "Becket Qin" <becket....@gmail.com>; > *发送时间:* 2021年4月15日(星期四) 上午9:55 > *收件人:* "Piotr Nowojski"<pnowoj...@apache.org>; > *抄送:* "太平洋"<495635...@qq.com>;"user"<user@flink.apache.org>; > *主题:* Re: flink1.12.2 "Failed to execute job" > > Hi, > > Piotr is correct. The cause of this issue is likely because the > instantiation of the SplitEnumerator is done in the JM main thread. > FLINK-22282 has been created to address this issue. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Apr 14, 2021 at 10:32 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> I haven't found anything strange in the logs (I've received logs in a >> separate message). It looks like the problem is that split enumeration is >> taking a long time, and currently this is being done in the Job Manager's >> main thread, blocking other things from executing. For the time being I >> think the only thing you can do is to either speed up the split enumeration >> (probably difficult) or increase the timeouts that are failing. I don't >> know if there is some other workaround at the moment (Becket?). >> >> Piotrek >> >> śr., 14 kwi 2021 o 15:57 Piotr Nowojski <pnowoj...@apache.org> >> napisał(a): >> >>> Hey, >>> >>> could you provide full logs from both task managers and job managers? >>> >>> Piotrek >>> >>> śr., 14 kwi 2021 o 15:43 太平洋 <495635...@qq.com> napisał(a): >>> >>>> After submit job, I received 'Failed to execute job' error. And the >>>> time between initialization and scheduling last 214s. What has >>>> happened during this period? >>>> >>>> version: flink: 1.12.2 >>>> deployment: k8s standalone >>>> logs: >>>> >>>> 2021-04-14 12:47:58,547 WARN >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - >>>> Property [transaction.timeout.ms] not specified. Setting it to 3600000 >>>> ms >>>> 2021-04-14 12:48:04,175 INFO >>>> org.apache.flink.client.deployment.application.executors. >>>> EmbeddedExecutor [] - Job 1276000e99efdb77bdae0df88ab91da3 is >>>> submitted. >>>> 2021-04-14 12:48:04,175 INFO >>>> org.apache.flink.client.deployment.application.executors. >>>> EmbeddedExecutor [] - Submitting Job with JobId=1276000e99 >>>> efdb77bdae0df88ab91da3. >>>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher. >>>> StandaloneDispatcher [] - Received JobGraph submission >>>> 1276000e99efdb77bdae0df88ab91da3 >>>> (Prediction Program). >>>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher. >>>> StandaloneDispatcher [] - Submitting job 1276000e99efdb77bdae0df88ab91da3 >>>> (Prediction Program). >>>> 2021-04-14 12:48:04,250 INFO org.apache.flink.runtime.rpc.akka. >>>> AkkaRpcService [] - Starting RPC endpoint for >>>> org.apache.flink.runtime.jobmaster.JobMaster at >>>> akka://flink/user/rpc/jobmanager_8 . >>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Initializing job Prediction Program (1276000e99 >>>> efdb77bdae0df88ab91da3). >>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Using restart back off time strategy >>>> NoRestartBackoffTimeStrategy for Prediction Program (1276000e99 >>>> efdb77bdae0df88ab91da3). >>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Running initialization on master for job Prediction >>>> Program (1276000e99efdb77bdae0df88ab91da3). >>>> 2021-04-14 12:48:04,252 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Successfully ran initialization on master in 0 ms. >>>> 2021-04-14 12:48:04,254 INFO >>>> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] >>>> - Built 10 pipelined regions in 0 ms >>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Using application-defined state backend: >>>> org.apache.flink.streaming.api.operators.sorted.state. >>>> BatchExecutionStateBackend@3ea8cd5a >>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.checkpoint. >>>> CheckpointCoordinator [] - No checkpoint found during restore. >>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Using failover strategy >>>> org.apache.flink.runtime.executiongraph.failover.flip1. >>>> RestartPipelinedRegionFailoverStrategy@26845997 for Prediction Program >>>> (1276000e99efdb77bdae0df88ab91da3). >>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster. >>>> JobManagerRunnerImpl [] - JobManager runner for job Prediction Program >>>> (1276000e99efdb77bdae0df88ab91da3) was granted leadership with session >>>> id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@flink >>>> -jobmanager:6123/user/rpc/jobmanager_8. >>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Starting execution of job Prediction Program >>>> (1276000e99efdb77bdae0df88ab91da3) >>>> under job master id 00000000000000000000000000000000. >>>> 2021-04-14 12:48:04,255 INFO >>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - >>>> Starting split enumerator for source Source: >>>> TableSourceScan(table=[[default_catalog, >>>> default_database, cpu_util, filter=[], project=[instance_id, value, >>>> timestamp]]], fields=[instance_id, value, timestamp]) -> >>>> Calc(select=[instance_id, >>>> value, timestamp], where=[(timestamp > 1618145278)]) -> >>>> SinkConversionToDataPoint -> Map. >>>> org.apache.flink.util.FlinkException: Failed to execute job 'Prediction >>>> Program'. at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918) >>>> at >>>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) >>>> at >>>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782) >>>> at com.jd.app.StreamingJob.main(StreamingJob.java:265) at >>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >>>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >>>> at >>>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) >>>> at >>>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) >>>> at >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) >>>> at >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) >>>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) Caused by: >>>> java.lang.RuntimeException: Error while waiting for job to be initialized >>>> at >>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160) >>>> at >>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitAndGetJobClientFuture$2(EmbeddedExecutor.java:140) >>>> at >>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) >>>> at >>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >>>> at >>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >>>> at >>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) >>>> ... 1 more Caused by: java.util.concurrent.ExecutionException: >>>> java.util.concurrent.TimeoutException: Invocation of public default >>>> java.util.concurrent.CompletableFuture >>>> org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) >>>> timed out. at >>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>>> at >>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$null$0(EmbeddedExecutor.java:145) >>>> at >>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144) >>>> ... 6 more Caused by: java.util.concurrent.TimeoutException: Invocation of >>>> public default java.util.concurrent.CompletableFuture >>>> org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) >>>> timed out. at com.sun.proxy.$Proxy26.requestJobStatus(Unknown Source) at >>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$null$0(EmbeddedExecutor.java:143) >>>> ... 7 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>>> [Actor[akka://flink/user/rpc/dispatcher_1#1243668943]] after [60000 ms]. >>>> Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. >>>> A typical reason for `AskTimeoutException` is that the recipient actor >>>> didn't send a reply. at >>>> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at >>>> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) >>>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) >>>> at >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >>>> at >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) >>>> ... 1 more >>>> 2021-04-14 12:49:04,321 ERROR com.jd.app.StreamingJob [] - xxxx exec >>>> error org.apache.flink.util.FlinkException: Failed to execute job >>>> 'xxxxxx'. >>>> 2021-04-14 12:51:38,327 INFO org.apache.flink.runtime.jobmaster. >>>> JobMaster [] - Starting scheduling with scheduling strategy >>>> [org.apache.flink.runtime.scheduler.strategy. >>>> PipelinedRegionSchedulingStrategy] >>>> 2021-04-14 12:51:38,328 INFO org.apache.flink.runtime.executiongraph. >>>> ExecutionGraph [] - Job Prediction Program >>>> (1276000e99efdb77bdae0df88ab91da3) >>>> switched from state CREATED to RUNNING. >>>> 2021-04-14 12:51:38,328 INFO org.apache.flink.runtime.executiongraph. >>>> ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, >>>> default_database, cpu_util, filter=[], project=[instance_id, value, >>>> timestamp]]], fields=[instance_id, value, timestamp]) -> >>>> Calc(select=[instance_id, >>>> value, timestamp], where=[(timestamp > 1618145278)]) -> >>>> SinkConversionToDataPoint -> Map (1/5) (52ad5c769b4836498fadf954d5921401) >>>> switched from CREATED to SCHEDULED. >>>> 2021-04-14 12:51:38,328 INFO >>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot >>>> serve slot request, no ResourceManager connected. Adding as pending >>>> request [SlotRequestId{90a7db543b771ed399f0b2b0414ef288}] >>>> 2021-04-14 12:51:38,328 INFO org.apache.flink.runtime.executiongraph. >>>> ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, >>>> default_database, cpu_util, filter=[], project=[instance_id, value, >>>> timestamp]]], fields=[instance_id, value, timestamp]) -> >>>> Calc(select=[instance_id, >>>> value, timestamp], where=[(timestamp > 1618145278)]) -> >>>> SinkConversionToDataPoint -> Map (2/5) (1f877463154f27d6f0aa7a9af9c2f64b) >>>> switched from CREATED to SCHEDULED. >>>> >>>