Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174450032 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java --- @@ -140,19 +159,28 @@ public boolean contains(UK userKey) { } @Override - public byte[] getSerializedValue(K key, N namespace) throws IOException { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + byte[] serializedKeyAndNamespace, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception { - HashMap<UK, UV> result = stateTable.get(key, namespace); + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - if (null == result) { + Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); + + if (result == null) { return null; } - TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer(); - TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer(); + final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer; --- End diff -- i mean why isn't the signature of `KvStateSerializer.serializeMap`: ``` KvStateSerializer.serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer); ``` with the following implementation: ``` ... serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer) { if (map != null) { DataOutputSerializer dos = new DataOutputSerializer(32); serializer.serialize(map, dos); return dos.getCopyOfBuffer(); } else { return null; } ``` Why deal with the map key/value entries at all outside the serializer?
---