[ https://issues.apache.org/jira/browse/FLINK-5041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15725664#comment-15725664 ]
ASF GitHub Bot commented on FLINK-5041: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2781#discussion_r91059047 --- Diff: flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java --- @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.runtime.checkpoint.savepoint; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.runtime.checkpoint.KeyGroupState; +import org.apache.flink.migration.runtime.checkpoint.SubtaskState; +import org.apache.flink.migration.runtime.checkpoint.TaskState; +import org.apache.flink.migration.runtime.state.KvStateSnapshot; +import org.apache.flink.migration.runtime.state.StateHandle; +import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle; +import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle; +import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; +import org.apache.flink.migration.state.MigrationStreamStateHandle; +import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList; +import org.apache.flink.migration.util.SerializedValue; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.MultiStreamStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.util.InstantiationUtil; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * <p> + * <p>In contrast to previous savepoint versions, this serializer makes sure + * that no default Java serialization is used for serialization. Therefore, we + * don't rely on any involved Java classes to stay the same. + */ +public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> { + + public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer(); + private static final StreamStateHandle SIGNAL_0 = new ByteStreamStateHandle("SIGNAL_0", new byte[]{0}); + private static final StreamStateHandle SIGNAL_1 = new ByteStreamStateHandle("SIGNAL_1", new byte[]{1}); + + private static final int MAX_SIZE = 4 * 1024 * 1024; + + private ClassLoader userClassLoader; + private long checkpointID; + + private SavepointV0Serializer() { + } + + + @Override + public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException { + throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility"); + } + + @Override + public SavepointV1 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException { + + this.checkpointID = dis.readLong(); + this.userClassLoader = userClassLoader; + + // Task states + int numTaskStates = dis.readInt(); + List<TaskState> taskStates = new ArrayList<>(numTaskStates); + + for (int i = 0; i < numTaskStates; i++) { + JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong()); + int parallelism = dis.readInt(); + + // Add task state + TaskState taskState = new TaskState(jobVertexId, parallelism); + taskStates.add(taskState); + + // Sub task states + int numSubTaskStates = dis.readInt(); + for (int j = 0; j < numSubTaskStates; j++) { + int subtaskIndex = dis.readInt(); + + int length = dis.readInt(); + + SerializedValue<StateHandle<?>> serializedValue; + if (length == -1) { + serializedValue = new SerializedValue<>(null); + } else { + byte[] serializedData = new byte[length]; + dis.readFully(serializedData, 0, length); + serializedValue = SerializedValue.fromBytes(serializedData); + } + + long stateSize = dis.readLong(); + long duration = dis.readLong(); + + SubtaskState subtaskState = new SubtaskState( + serializedValue, + stateSize, + duration); + + taskState.putState(subtaskIndex, subtaskState); + } + + // Key group states + int numKvStates = dis.readInt(); + for (int j = 0; j < numKvStates; j++) { + int keyGroupIndex = dis.readInt(); + + int length = dis.readInt(); + + SerializedValue<StateHandle<?>> serializedValue; + if (length == -1) { + serializedValue = new SerializedValue<>(null); + } else { + byte[] serializedData = new byte[length]; + dis.readFully(serializedData, 0, length); + serializedValue = SerializedValue.fromBytes(serializedData); + } + + long stateSize = dis.readLong(); + long duration = dis.readLong(); + + KeyGroupState keyGroupState = new KeyGroupState( + serializedValue, + stateSize, + duration); + + taskState.putKvState(keyGroupIndex, keyGroupState); + } + } + + try { + return convertSavepoint(taskStates); + } catch (Exception e) { + throw new IOException(e); + } + } + + private SavepointV1 convertSavepoint(List<TaskState> taskStates) throws Exception { + + List<org.apache.flink.runtime.checkpoint.TaskState> newTaskStates = new ArrayList<>(taskStates.size()); + + for (TaskState taskState : taskStates) { + newTaskStates.add(convertTaskState(taskState)); + } + + return new SavepointV1(checkpointID, newTaskStates); + } + + private org.apache.flink.runtime.checkpoint.TaskState convertTaskState(TaskState taskState) throws Exception { + + JobVertexID jobVertexID = taskState.getJobVertexID(); + int parallelism = taskState.getParallelism(); + int chainLength = determineOperatorChainLength(taskState); + + org.apache.flink.runtime.checkpoint.TaskState newTaskState = + new org.apache.flink.runtime.checkpoint.TaskState( + jobVertexID, + parallelism, + parallelism, + chainLength); + + if (chainLength > 0) { + + int parallelInstanceIdx = 0; + Collection<SubtaskState> subtaskStates = taskState.getStates(); + + for (SubtaskState subtaskState : subtaskStates) { + newTaskState.putState(parallelInstanceIdx, convertSubtaskState(subtaskState, parallelInstanceIdx)); + ++parallelInstanceIdx; + } + } + + return newTaskState; + } + + private org.apache.flink.runtime.checkpoint.SubtaskState convertSubtaskState( + SubtaskState subtaskState, int parallelInstanceIdx) throws Exception { + + SerializedValue<StateHandle<?>> serializedValue = subtaskState.getState(); + + StreamTaskStateList stateList = (StreamTaskStateList) serializedValue.deserializeValue(userClassLoader); + StreamTaskState[] streamTaskStates = stateList.getState(userClassLoader); + + List<StreamStateHandle> newChainStateList = Arrays.asList(new StreamStateHandle[streamTaskStates.length]); + KeyGroupsStateHandle newKeyedState = null; + + for (int chainIdx = 0; chainIdx < streamTaskStates.length; ++chainIdx) { + + StreamTaskState streamTaskState = streamTaskStates[chainIdx]; + if (streamTaskState == null) { + continue; + } + + newChainStateList.set(chainIdx, convertOperatorAndFunctionState(streamTaskState)); + + if (null == newKeyedState) { --- End diff -- How about adding a `checkState` sanity check to make sure that the keyed state is always associated with position 0 in the chain of states? Would help to fail fast in the presence of corrupt savepoints or bugs in the interoperability with the old code. > Implement savepoint backwards compatibility 1.1 -> 1.2 > ------------------------------------------------------ > > Key: FLINK-5041 > URL: https://issues.apache.org/jira/browse/FLINK-5041 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > > This issue tracks the implementation of backwards compatibility between Flink > 1.1 and 1.2 releases. > This task subsumes: > - Converting old savepoints to new savepoints, including a conversion of > state handles to their new replacement. > - Converting keyed state from old backend implementations to their new > counterparts. > - Converting operator and function state for all changed operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)