[ 
https://issues.apache.org/jira/browse/FLINK-22067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17341037#comment-17341037
 ] 

Roman Khachatryan commented on FLINK-22067:
-------------------------------------------

I think the issue is caused by 
[ae402b|https://github.com/apache/flink/commit/ae402bc9134201892dac59e0421e5b3834430656]
 which fails check/savepoints if not all tasks are running (previously we 
checked only sources).

The change uses OperatorLatch to wait for non-souce vertices to run. However it 
doesn't take DoP into account. Previously, test waitied only for sources, which 
aren't parallel in these tests.

The same issue is with all SavepointTestBase descendants, so FLINK-22481 is 
likely a duplicate.

To fix this, I propose to use API to wait for vertices using Flink API. Draft 
PR: 
 [https://github.com/apache/flink/pull/15858]

`WaitingFunction` and it's descendants then could be deleted.

WDYT?

cc: [~akalashnikov], [~sjwiesman]
  

> SavepointWindowReaderITCase.testApplyEvictorWindowStateReader
> -------------------------------------------------------------
>
>                 Key: FLINK-22067
>                 URL: https://issues.apache.org/jira/browse/FLINK-22067
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.13.0, 1.14.0
>            Reporter: Till Rohrmann
>            Priority: Blocker
>              Labels: auto-deprioritized-critical, test-stability
>         Attachments: isolated_logs_builD_9072.log
>
>
> The test case 
> {{SavepointWindowReaderITCase.testApplyEvictorWindowStateReader}} failed on 
> AZP with:
> {code}
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
>       at 
> org.apache.flink.state.api.utils.SavepointTestBase.takeSavepoint(SavepointTestBase.java:69)
>       ... 33 more
> Caused by: java.util.concurrent.TimeoutException: Invocation of public 
> default java.util.concurrent.CompletableFuture 
> org.apache.flink.runtime.webmonitor.RestfulGateway.triggerSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time)
>  timed out.
>       at com.sun.proxy.$Proxy32.triggerSavepoint(Unknown Source)
>       at 
> org.apache.flink.runtime.minicluster.MiniCluster.lambda$triggerSavepoint$8(MiniCluster.java:716)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
>       at 
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
>       at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751)
>       at 
> org.apache.flink.runtime.minicluster.MiniCluster.triggerSavepoint(MiniCluster.java:714)
>       at 
> org.apache.flink.client.program.MiniClusterClient.triggerSavepoint(MiniClusterClient.java:101)
>       at 
> org.apache.flink.state.api.utils.SavepointTestBase.triggerSavepoint(SavepointTestBase.java:93)
>       at 
> org.apache.flink.state.api.utils.SavepointTestBase.lambda$takeSavepoint$0(SavepointTestBase.java:68)
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1646)
>       at 
> java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1632)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/rpc/dispatcher_2#-390276455]] after [10000 ms]. 
> Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A 
> typical reason for `AskTimeoutException` is that the recipient actor didn't 
> send a reply.
>       at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
>       at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
>       at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
>       at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
>       at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>       at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>       at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>       at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15809&view=logs&j=b2f046ab-ae17-5406-acdc-240be7e870e4&t=93e5ae06-d194-513d-ba8d-150ef6da1d7c&l=9197



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to