aljoscha commented on a change in pull request #14809:
URL: https://github.com/apache/flink/pull/14809#discussion_r572169549



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointStrategy.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
+import org.apache.flink.runtime.state.FullSnapshotResources;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateSerializerProvider;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+
+/** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
+class HeapSavepointStrategy<K>
+        implements SnapshotStrategy<KeyedStateHandle, 
FullSnapshotResources<K>> {
+
+    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates;
+    private final StreamCompressionDecorator keyGroupCompressionDecorator;
+    private final LocalRecoveryConfig localRecoveryConfig;
+    private final KeyGroupRange keyGroupRange;
+    private final StateSerializerProvider<K> keySerializerProvider;
+    private final int totalKeyGroups;
+
+    HeapSavepointStrategy(
+            Map<String, StateTable<K, ?, ?>> registeredKVStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
+            StreamCompressionDecorator keyGroupCompressionDecorator,
+            LocalRecoveryConfig localRecoveryConfig,
+            KeyGroupRange keyGroupRange,
+            StateSerializerProvider<K> keySerializerProvider,
+            int totalKeyGroups) {
+        this.registeredKVStates = registeredKVStates;
+        this.registeredPQStates = registeredPQStates;
+        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
+        this.localRecoveryConfig = localRecoveryConfig;
+        this.keyGroupRange = keyGroupRange;
+        this.keySerializerProvider = keySerializerProvider;
+        this.totalKeyGroups = totalKeyGroups;
+    }
+
+    @Override
+    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) {
+        return HeapSnapshotResources.create(
+                registeredKVStates,
+                registeredPQStates,
+                keyGroupCompressionDecorator,
+                keyGroupRange,
+                getKeySerializer(),
+                totalKeyGroups);
+    }
+
+    @Override
+    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
+            FullSnapshotResources<K> syncPartResource,
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory streamFactory,
+            @Nonnull CheckpointOptions checkpointOptions) {
+        return new FullSnapshotAsyncWriter<>(
+                checkpointOptions.getCheckpointType(),
+                createCheckpointStreamSupplier(checkpointId, streamFactory, 
checkpointOptions),
+                syncPartResource);
+    }
+
+    private SupplierWithException<CheckpointStreamWithResultProvider, 
Exception>
+            createCheckpointStreamSupplier(
+                    long checkpointId,
+                    CheckpointStreamFactory primaryStreamFactory,
+                    CheckpointOptions checkpointOptions) {
+
+        return localRecoveryConfig.isLocalRecoveryEnabled()
+                        && !checkpointOptions.getCheckpointType().isSavepoint()
+                ? () ->
+                        
CheckpointStreamWithResultProvider.createDuplicatingStream(

Review comment:
       I don't think this branch can (should) happen because this is the 
"savepoint" snapshot strategy.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IterableStateSnapshot;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link org.apache.flink.runtime.state.KeyValueStateIterator} over a {@link
+ * HeapKeyedStateBackend}.

Review comment:
       Also, it's very obvious but should we warn people that his is not 
thread-safe?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IterableStateSnapshot;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link org.apache.flink.runtime.state.KeyValueStateIterator} over a {@link
+ * HeapKeyedStateBackend}.
+ */
+@Internal
+public final class HeapKeyValueStateIterator implements KeyValueStateIterator {
+
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+    private final Map<StateUID, Integer> stateNamesToId;
+    private final Map<StateUID, StateSnapshot> stateStableSnapshots;
+    private final int keyGroupPrefixBytes;
+
+    private boolean isValid;
+    private boolean newKeyGroup;
+    private boolean newKVState;
+    private byte[] currentKey;
+    private byte[] currentValue;
+
+    /** Iterator over the key groups of the corresponding key group range. */
+    private final Iterator<Integer> keyGroupIterator;
+    /** The current value of the keyGroupIterator. */
+    private int currentKeyGroup;
+
+    /** Iterator over all states present in the snapshots. */
+    private Iterator<StateUID> statesIterator;
+    /** The current value of the statesIterator. */
+    private StateUID currentState;
+    /**
+     * An iterator over the values of the current state. It can be one of 
three:
+     *
+     * <ul>
+     *   <li>{@link QueueIterator} for iterating over entries in a priority 
queue
+     *   <li>{@link StateTableIterator} for iterating over entries in a 
StateTable
+     *   <li>{@link MapStateIterator} for iterating over a entries in a user 
map, this one falls

Review comment:
       ```suggestion
        *   <li>{@link MapStateIterator} for iterating over entries in a user 
map, this one falls
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.java
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.IterableStateSnapshot;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyValueStateIterator;
+import org.apache.flink.runtime.state.ListDelimitedSerializer;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link org.apache.flink.runtime.state.KeyValueStateIterator} over a {@link
+ * HeapKeyedStateBackend}.

Review comment:
       Maybe "over a Heap backend snapshot" or "over Heap backend snapshot 
resources" would be more accurate here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to