[ https://issues.apache.org/jira/browse/FLINK-22067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17341037#comment-17341037 ]
Roman Khachatryan commented on FLINK-22067: ------------------------------------------- I think the issue is caused by [ae402b|https://github.com/apache/flink/commit/ae402bc9134201892dac59e0421e5b3834430656] which fails check/savepoints if not all tasks are running (previously we checked only sources). The change uses OperatorLatch to wait for non-souce vertices to run. However it doesn't take DoP into account. Previously, test waitied only for sources, which aren't parallel in these tests. The same issue is with all SavepointTestBase descendants, so FLINK-22481 is likely a duplicate. To fix this, I propose to use API to wait for vertices using Flink API. Draft PR: [https://github.com/apache/flink/pull/15858] `WaitingFunction` and it's descendants then could be deleted. WDYT? cc: [~akalashnikov], [~sjwiesman] > SavepointWindowReaderITCase.testApplyEvictorWindowStateReader > ------------------------------------------------------------- > > Key: FLINK-22067 > URL: https://issues.apache.org/jira/browse/FLINK-22067 > Project: Flink > Issue Type: Bug > Components: API / State Processor > Affects Versions: 1.13.0, 1.14.0 > Reporter: Till Rohrmann > Priority: Blocker > Labels: auto-deprioritized-critical, test-stability > Attachments: isolated_logs_builD_9072.log > > > The test case > {{SavepointWindowReaderITCase.testApplyEvictorWindowStateReader}} failed on > AZP with: > {code} > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > at > org.apache.flink.state.api.utils.SavepointTestBase.takeSavepoint(SavepointTestBase.java:69) > ... 33 more > Caused by: java.util.concurrent.TimeoutException: Invocation of public > default java.util.concurrent.CompletableFuture > org.apache.flink.runtime.webmonitor.RestfulGateway.triggerSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time) > timed out. > at com.sun.proxy.$Proxy32.triggerSavepoint(Unknown Source) > at > org.apache.flink.runtime.minicluster.MiniCluster.lambda$triggerSavepoint$8(MiniCluster.java:716) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) > at > java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) > at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751) > at > org.apache.flink.runtime.minicluster.MiniCluster.triggerSavepoint(MiniCluster.java:714) > at > org.apache.flink.client.program.MiniClusterClient.triggerSavepoint(MiniClusterClient.java:101) > at > org.apache.flink.state.api.utils.SavepointTestBase.triggerSavepoint(SavepointTestBase.java:93) > at > org.apache.flink.state.api.utils.SavepointTestBase.lambda$takeSavepoint$0(SavepointTestBase.java:68) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1646) > at > java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1632) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/rpc/dispatcher_2#-390276455]] after [10000 ms]. > Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A > typical reason for `AskTimeoutException` is that the recipient actor didn't > send a reply. > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) > at java.lang.Thread.run(Thread.java:748) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15809&view=logs&j=b2f046ab-ae17-5406-acdc-240be7e870e4&t=93e5ae06-d194-513d-ba8d-150ef6da1d7c&l=9197 -- This message was sent by Atlassian Jira (v8.3.4#803005)