Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174383696 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java --- @@ -140,19 +159,28 @@ public boolean contains(UK userKey) { } @Override - public byte[] getSerializedValue(K key, N namespace) throws IOException { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + byte[] serializedKeyAndNamespace, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception { - HashMap<UK, UV> result = stateTable.get(key, namespace); + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - if (null == result) { + Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); + + if (result == null) { return null; } - TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer(); - TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer(); + final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer; --- End diff -- this shouldn't be necessary. why can't we just pass the map serialzer into `serializeMap`?
---