[ https://issues.apache.org/jira/browse/FLINK-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744875#comment-15744875 ]
ASF GitHub Bot commented on FLINK-5051: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2962#discussion_r92144965 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state + * serialization logic here. + */ +public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable { + + private static final int VERSION = 1; + + private TypeSerializerSerializationProxy<?> keySerializerProxy; + private List<StateMetaInfo<?, ?>> namedStateSerializationProxies; + + private ClassLoader userCodeClassLoader; + + public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + } + + public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) { + this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer)); + this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies); + Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE); + } + + public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() { + return namedStateSerializationProxies; + } + + public TypeSerializerSerializationProxy<?> getKeySerializerProxy() { + return keySerializerProxy; + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + keySerializerProxy.write(out); + + out.writeShort(namedStateSerializationProxies.size()); + Map<String, Integer> kVStateToId = new HashMap<>(namedStateSerializationProxies.size()); --- End diff -- What is this Map used for? > Backwards compatibility for serializers in backend state > -------------------------------------------------------- > > Key: FLINK-5051 > URL: https://issues.apache.org/jira/browse/FLINK-5051 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > > When a new state is register, e.g. in a keyed backend via > `getPartitionedState`, the caller has to provide all type serializers > required for the persistence of state components. Explicitly passing the > serializers on state creation already allows for potentiall version upgrades > of serializers. > However, those serializers are currently not part of any snapshot and are > only provided at runtime, when the state is registered newly or restored. For > backwards compatibility, this has strong implications: checkpoints are not > self contained in that state is currently a blackbox without knowledge about > it's corresponding serializers. Most cases where we would need to restructure > the state are basically lost. We could only convert them lazily at runtime > and only once the user is registering the concrete state, which might happen > at unpredictable points. > I suggest to adapt our solution as follows: > - As now, all states are registered with their set of serializers. > - Unlike now, all serializers are written to the snapshot. This makes > savepoints self-contained and also allows to create inspection tools for > savepoints at some point in the future. > - Introduce an interface {{Versioned}} with {{long getVersion()}} and > {{boolean isCompatible(Versioned v)}} which is then implemented by > serializers. Compatible serializers must ensure that they can deserialize > older versions, and can then serialize them in their new format. This is how > we upgrade. > We need to find the right tradeoff in how many places we need to store the > serializers. I suggest to write them once per parallel operator instance for > each state, i.e. we have a map with state_name -> tuple3<serializer<KEY>, > serializer<NAMESPACE>, serializer<STATE>>. This could go before all > key-groups are written, right at the head of the file. Then, for each file we > see on restore, we can first read the serializer map from the head of the > stream, then go through the key groups by offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)