Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174454169 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java --- @@ -140,19 +159,28 @@ public boolean contains(UK userKey) { } @Override - public byte[] getSerializedValue(K key, N namespace) throws IOException { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + byte[] serializedKeyAndNamespace, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception { - HashMap<UK, UV> result = stateTable.get(key, namespace); + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - if (null == result) { + Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); + + if (result == null) { return null; } - TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer(); - TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer(); + final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer; --- End diff -- The reason is that RocksDB returns an iterator that gets lazily populated as you call next() while the serialize() of the MapSerializer expects a Map. If it were to go with your option, we would have to iterate over the map twice, once to create the map, and then to serialize it.
---