Hi all, Is there a known issue regarding stop-with-savepoint behavior when a job relies on UnionListState? We faced the following error during a job upgrade (Flink version 1.19, Flink Kubernetes Operator version 1.11) where the savepointing failed probably because during stop some subtasks finished before others, causing the upgrade to fail.
The stack trace for the error is attached below. Is there any workaround for this? org.apache.flink.util.SerializedThrowable: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider$PartialFinishingNotSupportedByStateException: The vertex watermark_persistent_filter (id = 8b917f4e69e3c3d9ade4e68a199c9d1f) has used UnionListState, but part of its tasks has called operators' finish method. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?] at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?] at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1097) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?] at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:261) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1285) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?] at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:45) ~[?:?] at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:309) ~[?:?] at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307) ~[?:?] at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234) ~[?:?] at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231) ~[?:?] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[?:?] at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65) ~[?:?] at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) ~[?:?] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) ~[?:?] at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) ~[?:?] at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) ~[?:?] at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624) ~[?:?] at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34) ~[?:?] at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536) ~[?:?] at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[?:?] at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[?:?] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[?:?] at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73) ~[?:?] at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110) ~[?:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[?:?] at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) ~[?:?] at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110) ~[?:?] at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) ~[?:?] at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:61) ~[?:?] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) ~[?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) ~[?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) ~[?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) ~[?:?] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) ~[?:?] Caused by: org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider$PartialFinishingNotSupportedByStateException: The vertex watermark_persistent_filter (id = 8b917f4e69e3c3d9ade4e68a199c9d1f) has used UnionListState, but part of its tasks has called operators' finish method. at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.checkNoPartlyOperatorsFinishedVertexUsedUnionListState(DefaultCheckpointPlan.java:186) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.fulfillFinishedTaskStatus(DefaultCheckpointPlan.java:137) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1473) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1374) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1266) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Thank you in advance for your help! Best Regards, Lucas