[ https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398562#comment-16398562 ]
ASF GitHub Bot commented on FLINK-8802: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174454169 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java --- @@ -140,19 +159,28 @@ public boolean contains(UK userKey) { } @Override - public byte[] getSerializedValue(K key, N namespace) throws IOException { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + public byte[] getSerializedValue( + byte[] serializedKeyAndNamespace, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<HashMap<UK, UV>> valueSerializer) throws Exception { - HashMap<UK, UV> result = stateTable.get(key, namespace); + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - if (null == result) { + Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, keySerializer, namespaceSerializer); + + Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, keyAndNamespace.f1); + + if (result == null) { return null; } - TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer(); - TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer(); + final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) valueSerializer; --- End diff -- The reason is that RocksDB returns an iterator that gets lazily populated as you call next() while the serialize() of the MapSerializer expects a Map. If it were to go with your option, we would have to iterate over the map twice, once to create the map, and then to serialize it. > Concurrent serialization without duplicating serializers in state server. > ------------------------------------------------------------------------- > > Key: FLINK-8802 > URL: https://issues.apache.org/jira/browse/FLINK-8802 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Blocker > Fix For: 1.5.0 > > > The `getSerializedValue()` may be called by multiple threads but serializers > are not duplicated, which may lead to exceptions thrown when a serializer is > stateful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)