Hi all,

Is there a known issue regarding stop-with-savepoint behavior when a job
relies on UnionListState? We faced the following error during a job upgrade
(Flink version 1.19, Flink Kubernetes Operator version 1.11) where the
savepointing failed probably because during stop some subtasks finished
before others, causing the upgrade to fail.

The stack trace for the error is attached below. Is there any workaround
for this?

org.apache.flink.util.SerializedThrowable:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider$PartialFinishingNotSupportedByStateException:
The vertex watermark_persistent_filter (id =
8b917f4e69e3c3d9ade4e68a199c9d1f) has used UnionListState, but part of its
tasks has called operators' finish method.
at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1097)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
~[?:?]
at
org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:261)
~[?:?]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
~[?:?]
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1285)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
~[?:?]
at
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:45)
~[?:?]
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:309) ~[?:?]
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307) ~[?:?]
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
~[?:?]
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
~[?:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[?:?]
at
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
~[?:?]
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
~[?:?]
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
~[?:?]
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
~[?:?]
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
~[?:?]
at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624)
~[?:?]
at
org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
~[?:?]
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536) ~[?:?]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[?:?]
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
~[?:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[?:?]
at
org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
~[?:?]
at
org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
~[?:?]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
~[?:?]
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
~[?:?]
at
org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
~[?:?]
at
org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
~[?:?]
at
org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:61)
~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) ~[?:?]
at
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
~[?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) ~[?:?]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
~[?:?]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
~[?:?]
Caused by: org.apache.flink.util.SerializedThrowable:
org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider$PartialFinishingNotSupportedByStateException:
The vertex watermark_persistent_filter (id =
8b917f4e69e3c3d9ade4e68a199c9d1f) has used UnionListState, but part of its
tasks has called operators' finish method.
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.checkNoPartlyOperatorsFinishedVertexUsedUnionListState(DefaultCheckpointPlan.java:186)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan.fulfillFinishedTaskStatus(DefaultCheckpointPlan.java:137)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1473)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1374)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1266)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70)
~[flink-kubernetes-operator-1.11.0-shaded.jar:1.11.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]

Thank you in advance for your help!

Best Regards,

Lucas

Reply via email to