aljoscha commented on a change in pull request #14648:
URL: https://github.com/apache/flink/pull/14648#discussion_r567776152



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/state/SavepointStateBackendSwitchTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBTestUtils;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.HeapStateBackendTestBase;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the unified savepoint format. They verify you can switch a state 
backend through a
+ * savepoint.
+ */
+public class SavepointStateBackendSwitchTest {
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Test
+    public void switchFromRocksToHeap() throws Exception {
+
+        final File pathToWrite = tempFolder.newFile("tmp_rocks_map_state");
+
+        final MapStateDescriptor<Long, Long> mapStateDescriptor =
+                new MapStateDescriptor<>("my-map-state", Long.class, 
Long.class);
+        mapStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        final ValueStateDescriptor<Long> valueStateDescriptor =
+                new ValueStateDescriptor<>("my-value-state", Long.class);
+        valueStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        ListStateDescriptor<Long> listStateDescriptor =
+                new ListStateDescriptor<>("my-list-state", Long.class);
+        listStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        final Integer namespace1 = 1;
+        final Integer namespace2 = 2;
+        final Integer namespace3 = 3;
+
+        takeRocksSavepoint(
+                pathToWrite,
+                mapStateDescriptor,
+                valueStateDescriptor,
+                listStateDescriptor,
+                namespace1,
+                namespace2,
+                namespace3);
+
+        final SnapshotResult<KeyedStateHandle> stateHandles;
+        try (BufferedInputStream bis =
+                new BufferedInputStream((new FileInputStream(pathToWrite)))) {
+            stateHandles =
+                    InstantiationUtil.deserializeObject(
+                            bis, 
Thread.currentThread().getContextClassLoader());
+        }
+        final KeyedStateHandle stateHandle = 
stateHandles.getJobManagerOwnedSnapshot();
+        try (final HeapKeyedStateBackend<String> keyedBackend =
+                
createHeapKeyedStateBackend(StateObjectCollection.singleton(stateHandle))) {
+
+            InternalMapState<String, Integer, Long, Long> state =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
mapStateDescriptor);
+
+            InternalValueState<String, Integer, Long> valueState =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
valueStateDescriptor);
+
+            InternalListState<String, Integer, Long> listState =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
listStateDescriptor);
+
+            keyedBackend.setCurrentKey("abc");
+            state.setCurrentNamespace(namespace1);
+            assertEquals(33L, (long) state.get(33L));
+            assertEquals(55L, (long) state.get(55L));
+            assertEquals(2, getStateSize(state));
+
+            state.setCurrentNamespace(namespace2);
+            assertEquals(22L, (long) state.get(22L));
+            assertEquals(11L, (long) state.get(11L));
+            assertEquals(2, getStateSize(state));
+            listState.setCurrentNamespace(namespace2);
+            assertThat(listState.get(), contains(4L, 5L, 6L));
+
+            state.setCurrentNamespace(namespace3);
+            assertEquals(44L, (long) state.get(44L));
+            assertEquals(1, getStateSize(state));
+
+            keyedBackend.setCurrentKey("mno");
+            state.setCurrentNamespace(namespace3);
+            assertEquals(11L, (long) state.get(11L));
+            assertEquals(22L, (long) state.get(22L));
+            assertEquals(33L, (long) state.get(33L));
+            assertEquals(44L, (long) state.get(44L));
+            assertEquals(55L, (long) state.get(55L));
+            assertEquals(5, getStateSize(state));
+            valueState.setCurrentNamespace(namespace3);
+            assertEquals(1239L, (long) valueState.value());
+            listState.setCurrentNamespace(namespace3);
+            assertThat(listState.get(), contains(1L, 2L, 3L));
+
+            KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, 
Integer>> priorityQueue =
+                    keyedBackend.create(
+                            "event-time",
+                            new TimerSerializer<>(
+                                    keyedBackend.getKeySerializer(), 
IntSerializer.INSTANCE));
+
+            assertThat(priorityQueue.size(), equalTo(1));
+            assertThat(
+                    priorityQueue.poll(),
+                    equalTo(new TimerHeapInternalTimer<>(1234L, "mno", 
namespace3)));
+        }
+    }
+
+    private HeapKeyedStateBackend<String> createHeapKeyedStateBackend(
+            Collection<KeyedStateHandle> stateHandles) throws 
BackendBuildingException {
+        // must be synchronized with the KeyGroup of the RocksDB state backend
+        final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+        final int numKeyGroups = keyGroupRange.getNumberOfKeyGroups();
+        ExecutionConfig executionConfig = new ExecutionConfig();
+
+        return new HeapKeyedStateBackendBuilder<>(
+                        mock(TaskKvStateRegistry.class),
+                        StringSerializer.INSTANCE,
+                        HeapStateBackendTestBase.class.getClassLoader(),
+                        numKeyGroups,
+                        keyGroupRange,
+                        executionConfig,
+                        TtlTimeProvider.DEFAULT,
+                        stateHandles,
+                        
AbstractStateBackend.getCompressionDecorator(executionConfig),
+                        TestLocalRecoveryConfig.disabled(),
+                        new HeapPriorityQueueSetFactory(keyGroupRange, 
numKeyGroups, 128),
+                        true,
+                        new CloseableRegistry())
+                .build();
+    }
+
+    private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> 
mapState)
+            throws Exception {
+        int i = 0;
+        Iterator<Map.Entry<UK, UV>> itt = mapState.iterator();
+        while (itt.hasNext()) {
+            i++;
+            itt.next();
+        }
+        return i;
+    }
+
+    private void takeRocksSavepoint(
+            File pathToWrite,
+            MapStateDescriptor<Long, Long> stateDescr,
+            ValueStateDescriptor<Long> valueStateDescriptor,
+            ListStateDescriptor<Long> listStateDescriptor,
+            Integer namespace1,
+            Integer namespace2,
+            Integer namespace3)
+            throws Exception {
+        try (final RocksDBKeyedStateBackend<String> keyedBackend =
+                RocksDBTestUtils.builderForTestDefaults(
+                                tempFolder.newFolder(),
+                                StringSerializer.INSTANCE,
+                                
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB)
+                        .build()) {
+            InternalMapState<String, Integer, Long, Long> mapState =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
stateDescr);
+
+            InternalValueState<String, Integer, Long> valueState =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
valueStateDescriptor);
+
+            InternalListState<String, Integer, Long> listState =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
listStateDescriptor);
+
+            keyedBackend.setCurrentKey("abc");
+            mapState.setCurrentNamespace(namespace1);
+            mapState.put(33L, 33L);
+            mapState.put(55L, 55L);
+
+            mapState.setCurrentNamespace(namespace2);
+            mapState.put(22L, 22L);
+            mapState.put(11L, 11L);
+            listState.setCurrentNamespace(namespace2);
+            listState.add(4L);
+            listState.add(5L);
+            listState.add(6L);
+
+            mapState.setCurrentNamespace(namespace3);
+            mapState.put(44L, 44L);
+
+            keyedBackend.setCurrentKey("mno");
+            mapState.setCurrentNamespace(namespace3);
+            mapState.put(11L, 11L);
+            mapState.put(22L, 22L);
+            mapState.put(33L, 33L);
+            mapState.put(44L, 44L);
+            mapState.put(55L, 55L);
+            valueState.setCurrentNamespace(namespace3);
+            valueState.update(1239L);
+            listState.setCurrentNamespace(namespace3);
+            listState.add(1L);
+            listState.add(2L);
+            listState.add(3L);
+
+            KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, 
Integer>> priorityQueue =
+                    keyedBackend.create(
+                            "event-time",
+                            new TimerSerializer<>(
+                                    keyedBackend.getKeySerializer(), 
IntSerializer.INSTANCE));
+            priorityQueue.add(new TimerHeapInternalTimer<>(1234L, "mno", 
namespace3));

Review comment:
       Should we maybe have more than one timer?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
##########
@@ -124,129 +96,63 @@ public RocksDBFullRestoreOperation(
                 writeBufferManagerCapacity);
         checkArgument(writeBatchSize >= 0, "Write batch size have to be no 
negative.");
         this.writeBatchSize = writeBatchSize;
+        this.savepointRestoreOperation =
+                new SavepointRestoreOperation<>(
+                        keyGroupRange,
+                        cancelStreamRegistry,
+                        userCodeClassLoader,
+                        restoreStateHandles,
+                        keySerializerProvider);
     }
 
     /** Restores all key-groups data that is referenced by the passed state 
handles. */
     @Override
     public RocksDBRestoreResult restore()
             throws IOException, StateMigrationException, RocksDBException {
         openDB();
-        for (KeyedStateHandle keyedStateHandle : restoreStateHandles) {
-            if (keyedStateHandle != null) {
-
-                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
-                    throw unexpectedStateHandleException(
-                            KeyGroupsStateHandle.class, 
keyedStateHandle.getClass());
-                }
-                this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) 
keyedStateHandle;
-                restoreKeyGroupsInStateHandle();
-            }
+        try (ThrowingIterator<SavepointRestoreResult> restore =
+                savepointRestoreOperation.restore()) {
+            applyRestoreResult(restore.next());
         }
         return new RocksDBRestoreResult(
                 this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, 
null, null);
     }
 
-    /** Restore one key groups state handle. */
-    private void restoreKeyGroupsInStateHandle()
-            throws IOException, StateMigrationException, RocksDBException {
-        try {
-            logger.info("Starting to restore from state handle: {}.", 
currentKeyGroupsStateHandle);
-            currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
-            cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
-            currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
-            restoreKVStateMetaData();
-            restoreKVStateData();
-            logger.info("Finished restoring from state handle: {}.", 
currentKeyGroupsStateHandle);
-        } finally {
-            if 
(cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
-                IOUtils.closeQuietly(currentStateHandleInStream);
-            }
-        }
-    }
-
-    /**
-     * Restore the KV-state / ColumnFamily meta data for all key-groups 
referenced by the current
-     * state handle.
-     */
-    private void restoreKVStateMetaData() throws IOException, 
StateMigrationException {
-        KeyedBackendSerializationProxy<K> serializationProxy =
-                readMetaData(currentStateHandleInView);
-
-        this.keygroupStreamCompressionDecorator =
-                serializationProxy.isUsingKeyGroupCompression()
-                        ? SnappyStreamCompressionDecorator.INSTANCE
-                        : UncompressedStreamCompressionDecorator.INSTANCE;
-
+    private void applyRestoreResult(SavepointRestoreResult 
savepointRestoreResult)
+            throws IOException, RocksDBException, StateMigrationException {
         List<StateMetaInfoSnapshot> restoredMetaInfos =
-                serializationProxy.getStateMetaInfoSnapshots();
-        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-
-        for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
-            RocksDbKvStateInfo registeredStateCFHandle =
-                    getOrRegisterStateColumnFamilyHandle(null, 
restoredMetaInfo);
-            
currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
-        }
+                savepointRestoreResult.getStateMetaInfoSnapshots();
+        List<ColumnFamilyHandle> columnFamilyHandles =
+                restoredMetaInfos.stream()
+                        .map(
+                                stateMetaInfoSnapshot -> {
+                                    RocksDbKvStateInfo registeredStateCFHandle 
=
+                                            
getOrRegisterStateColumnFamilyHandle(
+                                                    null, 
stateMetaInfoSnapshot);
+                                    return 
registeredStateCFHandle.columnFamilyHandle;
+                                })
+                        .collect(Collectors.toList());
+        restoreKVStateData(savepointRestoreResult.getRestoredKeyGroups(), 
columnFamilyHandles);
     }
 
     /**
      * Restore the KV-state / ColumnFamily data for all key-groups referenced 
by the current state
      * handle.
      */
-    private void restoreKVStateData() throws IOException, RocksDBException {
+    private void restoreKVStateData(
+            ThrowingIterator<KeyGroup> keyGroups, List<ColumnFamilyHandle> 
columnFamilies)
+            throws IOException, RocksDBException, StateMigrationException {
         // for all key-groups in the current state handle...
         try (RocksDBWriteBatchWrapper writeBatchWrapper =
                 new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
-            for (Tuple2<Integer, Long> keyGroupOffset :
-                    currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-                int keyGroup = keyGroupOffset.f0;
-
-                // Check that restored key groups all belong to the backend
-                Preconditions.checkState(
-                        keyGroupRange.contains(keyGroup),
-                        "The key group must belong to the backend");
-
-                long offset = keyGroupOffset.f1;
-                // not empty key-group?
-                if (0L != offset) {
-                    currentStateHandleInStream.seek(offset);
-                    try (InputStream compressedKgIn =
-                            
keygroupStreamCompressionDecorator.decorateWithCompression(
-                                    currentStateHandleInStream)) {
-                        DataInputViewStreamWrapper compressedKgInputView =
-                                new DataInputViewStreamWrapper(compressedKgIn);
-                        // TODO this could be aware of keyGroupPrefixBytes and 
write only one byte
-                        // if possible
-                        int kvStateId = compressedKgInputView.readShort();
-                        ColumnFamilyHandle handle =
-                                
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                        // insert all k/v pairs into DB
-                        boolean keyGroupHasMoreKeys = true;
-                        while (keyGroupHasMoreKeys) {
-                            byte[] key =
-                                    
BytePrimitiveArraySerializer.INSTANCE.deserialize(
-                                            compressedKgInputView);
-                            byte[] value =
-                                    
BytePrimitiveArraySerializer.INSTANCE.deserialize(
-                                            compressedKgInputView);
-                            if (hasMetaDataFollowsFlag(key)) {
-                                // clear the signal bit in the key to make it 
ready for insertion
-                                // again
-                                clearMetaDataFollowsFlag(key);
-                                writeBatchWrapper.put(handle, key, value);
-                                // TODO this could be aware of 
keyGroupPrefixBytes and write only
-                                // one byte if possible
-                                kvStateId =
-                                        END_OF_KEY_GROUP_MARK & 
compressedKgInputView.readShort();
-                                if (END_OF_KEY_GROUP_MARK == kvStateId) {
-                                    keyGroupHasMoreKeys = false;
-                                } else {
-                                    handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                }
-                            } else {
-                                writeBatchWrapper.put(handle, key, value);
-                            }
-                        }
-                    }
+            while (keyGroups.hasNext()) {
+                KeyGroup keyGroup = keyGroups.next();
+                ThrowingIterator<KeyGroupEntry> groupEntries = 
keyGroup.getKeyGroupEntries();
+                while (groupEntries.hasNext()) {
+                    // TODO we call it more times than before

Review comment:
       What's up with this one?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/state/SavepointStateBackendSwitchTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBTestUtils;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.HeapStateBackendTestBase;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the unified savepoint format. They verify you can switch a state 
backend through a
+ * savepoint.
+ */
+public class SavepointStateBackendSwitchTest {
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Test
+    public void switchFromRocksToHeap() throws Exception {
+
+        final File pathToWrite = tempFolder.newFile("tmp_rocks_map_state");
+
+        final MapStateDescriptor<Long, Long> mapStateDescriptor =
+                new MapStateDescriptor<>("my-map-state", Long.class, 
Long.class);
+        mapStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        final ValueStateDescriptor<Long> valueStateDescriptor =
+                new ValueStateDescriptor<>("my-value-state", Long.class);
+        valueStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        ListStateDescriptor<Long> listStateDescriptor =
+                new ListStateDescriptor<>("my-list-state", Long.class);
+        listStateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+        final Integer namespace1 = 1;
+        final Integer namespace2 = 2;
+        final Integer namespace3 = 3;
+
+        takeRocksSavepoint(
+                pathToWrite,
+                mapStateDescriptor,
+                valueStateDescriptor,
+                listStateDescriptor,
+                namespace1,
+                namespace2,
+                namespace3);
+
+        final SnapshotResult<KeyedStateHandle> stateHandles;
+        try (BufferedInputStream bis =
+                new BufferedInputStream((new FileInputStream(pathToWrite)))) {
+            stateHandles =
+                    InstantiationUtil.deserializeObject(
+                            bis, 
Thread.currentThread().getContextClassLoader());
+        }
+        final KeyedStateHandle stateHandle = 
stateHandles.getJobManagerOwnedSnapshot();
+        try (final HeapKeyedStateBackend<String> keyedBackend =
+                
createHeapKeyedStateBackend(StateObjectCollection.singleton(stateHandle))) {
+
+            InternalMapState<String, Integer, Long, Long> state =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
mapStateDescriptor);
+
+            InternalValueState<String, Integer, Long> valueState =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
valueStateDescriptor);
+
+            InternalListState<String, Integer, Long> listState =
+                    keyedBackend.createInternalState(IntSerializer.INSTANCE, 
listStateDescriptor);
+
+            keyedBackend.setCurrentKey("abc");
+            state.setCurrentNamespace(namespace1);
+            assertEquals(33L, (long) state.get(33L));
+            assertEquals(55L, (long) state.get(55L));
+            assertEquals(2, getStateSize(state));
+
+            state.setCurrentNamespace(namespace2);
+            assertEquals(22L, (long) state.get(22L));
+            assertEquals(11L, (long) state.get(11L));
+            assertEquals(2, getStateSize(state));
+            listState.setCurrentNamespace(namespace2);
+            assertThat(listState.get(), contains(4L, 5L, 6L));
+
+            state.setCurrentNamespace(namespace3);
+            assertEquals(44L, (long) state.get(44L));
+            assertEquals(1, getStateSize(state));
+
+            keyedBackend.setCurrentKey("mno");
+            state.setCurrentNamespace(namespace3);
+            assertEquals(11L, (long) state.get(11L));
+            assertEquals(22L, (long) state.get(22L));
+            assertEquals(33L, (long) state.get(33L));
+            assertEquals(44L, (long) state.get(44L));
+            assertEquals(55L, (long) state.get(55L));
+            assertEquals(5, getStateSize(state));
+            valueState.setCurrentNamespace(namespace3);
+            assertEquals(1239L, (long) valueState.value());
+            listState.setCurrentNamespace(namespace3);
+            assertThat(listState.get(), contains(1L, 2L, 3L));
+
+            KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<String, 
Integer>> priorityQueue =
+                    keyedBackend.create(
+                            "event-time",
+                            new TimerSerializer<>(
+                                    keyedBackend.getKeySerializer(), 
IntSerializer.INSTANCE));
+
+            assertThat(priorityQueue.size(), equalTo(1));
+            assertThat(
+                    priorityQueue.poll(),
+                    equalTo(new TimerHeapInternalTimer<>(1234L, "mno", 
namespace3)));
+        }
+    }
+
+    private HeapKeyedStateBackend<String> createHeapKeyedStateBackend(
+            Collection<KeyedStateHandle> stateHandles) throws 
BackendBuildingException {
+        // must be synchronized with the KeyGroup of the RocksDB state backend

Review comment:
       Should this be a constant in the test that is passed also when creating 
the Rocks backend?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to