Hi,
We are running our Flink streaming pipeline with following configs: (It
runs on Flink 1.19.1 on AWS EMR (Yarn)

taskmanager.numberOfTaskSlots: 4
job.autoscaler.enabled: 'true'
jobmanager.scheduler: adaptive
jobmanager.adaptive-scheduler.combined-restart.enabled: true
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
state.backend.type: rocksdb
state.checkpoints.dir: hdfs:///flink-checkpoints
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: hdfs:///changelog
dstl.dfs.compression.enabled: 'true'

After running for few minutes, after a checkpoint is taken (15 minutes), at
around 17 minutes when it tries to autoscale, the job simply crashes and
checking Job Manager I see the following exception:


2025-01-07 05:10:29,287 INFO
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Failed
to go from CreatingExecutionGraph to Executing because the ExecutionGraph
creation failed.
java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
~[?:?]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
~[?:?]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
~[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[flink-rpc-akka73585e1b-0cff-452c-ace1-0c31eb56bd7f.jar:1.19.1-amzn-1]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?]
at
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
[?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
[?:?]
Caused by: java.util.concurrent.CompletionException:
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at
org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
~[?:?]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.UnsupportedOperationException: Cannot rescale the
given pointwise partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper$6.getOldSubtasks(SubtaskStateMapper.java:180)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.lambda$getNewToOldSubtasksMapping$0(SubtaskStateMapper.java:202)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180) ~[?:?]
at
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
~[?:?]
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
~[?:?]
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
~[?:?]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)
~[?:?]
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
~[?:?]
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
~[?:?]
at
org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:139)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.checkpoint.TaskStateAssignment.getInputMapping(TaskStateAssignment.java:410)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeInputChannelStates(StateAssignmentOperation.java:466)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:208)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:148)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1973)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1893)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:210)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:1407)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$27(AdaptiveScheduler.java:1397)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57)
~[flink-dist-1.19.1-amzn-1.jar:1.19.1-amzn-1]
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
~[?:?]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]

Can anyone please tell me what area to look into to see how to fix this ?

Thanks
Sachin

Reply via email to