[ https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999777#comment-15999777 ]
ASF GitHub Bot commented on FLINK-6425: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139379 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/MigrationStrategy.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A {@code MigrationStrategy} contains information about how to perform migration of data written + * by an older serializer so that new serializers can continue to work on them. + * + * @param <T> the type of the data being migrated. + */ +@PublicEvolving +public final class MigrationStrategy<T> { + + /** Whether or not migration is required. */ + private final boolean requiresStateMigration; + + /** + * The fallback deserializer to use, in the case the preceding serializer cannot be found. + * + * <p>This is only relevant if migration is required. + */ + private final TypeSerializer<T> fallbackDeserializer; + + /** + * Returns a strategy that simply signals that no migration needs to be performed. + * + * @return a strategy that does not perform migration + */ + public static <T> MigrationStrategy<T> noMigration() { + return new MigrationStrategy<>(false, null); + } + + /** + * Returns a strategy that signals migration to be performed, and in the case that the + * preceding serializer cannot be found, a provided fallback deserializer can be + * used. + * + * @param fallbackDeserializer a fallback deserializer that can be used to read old data for the migration + * in the case that the preceding serializer cannot be found. + * + * @return a strategy that performs migration with a fallback deserializer to read old data. + */ + public static <T> MigrationStrategy<T> migrateWithFallbackDeserializer(TypeSerializer<T> fallbackDeserializer) { + return new MigrationStrategy<>(true, fallbackDeserializer); + } + + /** + * Returns a strategy that signals migration to be performed, without a fallback deserializer. + * If the preceding serializer cannot be found, the migration fails because the old data cannot be read. + * + * @return a strategy that performs migration, without a fallback deserializer. + */ + public static <T> MigrationStrategy<T> migrate() { --- End diff -- Maybe this should be named `migrationRequired`? > Integrate serializer reconfiguration into state restore flow to activate > serializer upgrades > -------------------------------------------------------------------------------------------- > > Key: FLINK-6425 > URL: https://issues.apache.org/jira/browse/FLINK-6425 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > With FLINK-6191, {{TypeSerializer}} will be reconfigurable. > From the state backends' point of view, serializer reconfiguration doubles as > a mechanism to determine how serializer upgrades should be handled. > The general idea is that state checkpoints should contain the following as > the state's metainfo: > - the previous serializer > - snapshot of the previous serializer's configuration > The upgrade flow is as follows: > 1. On restore, try to deserialize the previous old serializer. > Deserialization may fail if a) the serializer no longer exists in classpath, > or b) the serializer class is not longer valid (i.e., implementation changed > and resulted in different serialVersionUID). In this case, use a dummy > serializer as a placeholder. This dummy serializer is currently the > {{ClassNotFoundProxySerializer}} in the code. > 2. Deserialize the configuration snapshot of the previous old serializer. The > configuration snapshot must be successfully deserialized, otherwise the state > restore fails. > 3. When we get the new registered serializer for the state (could be a > completely new serializer, the same serializer with different > implementations, or the exact same serializer untouched; either way they are > seen as a new serializer), we use the configuration snapshot of the old > serializer to reconfigure the new serializer. > This completes the upgrade of the old serializer. However, depending on the > result of the upgrade, state conversion needs to take place (for now, if > state conversion is required, we just fail the job as this functionality > isn't available yet). The results could be: > - Compatible: restore success + serializer upgraded. > - Compatible, but serialization schema changed: serializer upgraded but > requires state conversion, without the requirement that the old serializer > needs to be present. > - Incompatible: serializer upgraded requires state conversion, but requires > the old serializer to be present (i.e., can not be the dummy > {{ClassNotFoundProxySerializer}}). -- This message was sent by Atlassian JIRA (v6.3.15#6346)