[ https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arvid Heise updated FLINK-17921: -------------------------------- Component/s: Runtime / Coordination > RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected > akka.timeout > ----------------------------------------------------------------------------------- > > Key: FLINK-17921 > URL: https://issues.apache.org/jira/browse/FLINK-17921 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Task > Affects Versions: 1.8.1, 1.10.1 > Reporter: zhangminglei > Priority: Major > > As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} > would cause akka.timeout. But that's not the message what we want. > If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} > return {{null}} and is used in > {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause the > following exception, this is not expected to happen from there. If we > increase the {{akka.ask.timeout}} to another value, exception is still in > there. > {code:java} > java.io.IOException: Error updating global aggregate. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) > ... 8 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical > reason for `AskTimeoutException` is that the recipient actor didn't send a > reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > {code} > The following stacktrace would describe the root cause. We can see that > {{CompletableFuture.waitingGet}} is the key point, it imply that the > {{Completabilefuture}} will give the current thread to waiting, which will > lead to the timeout of the akka communication of Flink. Therefore, even if > the timeout is 1 hour, the problem cannot be solved. > {code:java} > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007b76617a8> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use > {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout > information can truly reflect the current status of the program, > {{akka.time.out}} error is too wide, which is not conducive to user > troubleshooting. -- This message was sent by Atlassian Jira (v8.3.4#803005)