[ 
https://issues.apache.org/jira/browse/FLINK-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028487#comment-16028487
 ] 

Stefan Richter commented on FLINK-6762:
---------------------------------------

FYI, I think we can can ALMOST support this by changing line 1434 into

{code}
        if (!hasExtraKeys) {
                // use the restore sst files as the base for succeeding 
checkpoints
                synchronized (stateBackend.materializedSstFiles) {
                        
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles.keySet());
                }
        }
{code}

The mistake here is currently that after rescaling, previous checkpoints can no 
longer serve as base version to the next incremental checkpoint, so 
`stateBackend.materializedSstFiles` should be empty. However, there is still a 
small problem in the once case where we rescale once and in the next recovery 
rescale twice, back to the old parallelism. In this case, the 
`SharedStateRegistry` can mistake new state for duplicate uploads, because they 
share the same key (key-group are part of the key, so scaling back to the same 
parallelism as a previous job will create the same key). This is obviously 
something that can be fixed, unfortunately not in 3 lines as the base fix :-(

> Cannot rescale externalized incremental checkpoints
> ---------------------------------------------------
>
>                 Key: FLINK-6762
>                 URL: https://issues.apache.org/jira/browse/FLINK-6762
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Gyula Fora
>            Priority: Critical
>
> When a job is rescaled from an externalized incremental checkpoint, the 
> subsequent checkpoints fail with the following error:
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3205.
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:861)
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:776)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: 
> class org.apache.flink.runtime.state.PlaceholderStreamStateHandle
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484)
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342)
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329)
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270)
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122)
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66)
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199)
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164)
>       at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:286)
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:851)
> Full log:
> https://gist.github.com/gyfora/693b9a720aace843ff4570e504c4a242
> Rescaling with savepoints work.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to