[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398266#comment-16398266
 ] 

ASF GitHub Bot commented on FLINK-8802:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174383696
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 ---
    @@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
        }
     
        @Override
    -   public byte[] getSerializedValue(K key, N namespace) throws IOException 
{
    -           Preconditions.checkState(namespace != null, "No namespace 
given.");
    -           Preconditions.checkState(key != null, "No key given.");
    +   public byte[] getSerializedValue(
    +                   byte[] serializedKeyAndNamespace,
    +                   TypeSerializer<K> keySerializer,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   TypeSerializer<HashMap<UK, UV>> valueSerializer) throws 
Exception {
     
    -           HashMap<UK, UV> result = stateTable.get(key, namespace);
    +           Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
     
    -           if (null == result) {
    +           Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
    +                           serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
    +
    +           Map<UK, UV> result = stateTable.get(keyAndNamespace.f0, 
keyAndNamespace.f1);
    +
    +           if (result == null) {
                        return null;
                }
     
    -           TypeSerializer<UK> userKeySerializer = 
stateDesc.getKeySerializer();
    -           TypeSerializer<UV> userValueSerializer = 
stateDesc.getValueSerializer();
    +           final HashMapSerializer<UK, UV> serializer = 
(HashMapSerializer<UK, UV>) valueSerializer;
    --- End diff --
    
    this shouldn't be necessary. why can't we just pass the map serialzer into 
`serializeMap`?


> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8802
>                 URL: https://issues.apache.org/jira/browse/FLINK-8802
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to