Hi, We are running our Flink streaming pipeline with following configs: (It runs on Flink 1.19.1 on AWS EMR (Yarn)
taskmanager.numberOfTaskSlots: 4 job.autoscaler.enabled: 'true' jobmanager.scheduler: adaptive jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m state.backend.type: rocksdb state.checkpoints.dir: hdfs:///flink-checkpoints state.backend.incremental: 'true' state.backend.local-recovery: 'true' state.backend.changelog.enabled: 'true' state.backend.changelog.storage: filesystem dstl.dfs.base-path: hdfs:///changelog dstl.dfs.compression.enabled: 'true' After running for few minutes, after a checkpoint is taken (15 minutes), at around 17 minutes when it tries to autoscale, the job simply crashes and checking Job Manager I see the following exception: 2025-01-07 05:10:29,287 INFO org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Failed to go from CreatingExecutionGraph to Executing because the ExecutionGraph creation failed. java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle(). at org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) ~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) ~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) ~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) [?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) [?:?] Caused by: java.util.concurrent.CompletionException: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle(). at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.lang.UnsupportedOperationException: Cannot rescale the given pointwise partitioner. Did you change the partitioner to forward or rescale? It may also help to add an explicit shuffle(). at org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper$6.getOldSubtasks(SubtaskStateMapper.java:180) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.lambda$getNewToOldSubtasksMapping$0(SubtaskStateMapper.java:202) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180) ~[?:?] at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104) ~[?:?] at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711) ~[?:?] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575) ~[?:?] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:?] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616) ~[?:?] at org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:139) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:410) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:466) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:208) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:148) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1973) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1893) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:210) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:1407) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$27(AdaptiveScheduler.java:1397) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) ~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Can anyone please tell me what area to look into to see how to fix this ? Thanks Sachin