[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398259#comment-16398259
 ] 

ASF GitHub Bot commented on FLINK-8802:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174382685
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
                final CompletableFuture<KvStateResponse> responseFuture = new 
CompletableFuture<>();
     
                try {
    -                   final InternalKvState<?> kvState = 
registry.getKvState(request.getKvStateId());
    +                   final KvStateEntry<?, ?, ?> kvState = 
registry.getKvState(request.getKvStateId());
                        if (kvState == null) {
                                responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
                        } else {
                                byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
     
    -                           byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
    +                           // here we remove any type check...
    +                           // Ideally we want to keep that the info match 
the state.
    --- End diff --
    
    you can retain type safety:
    
    Call from the handler:
    ```
    byte[] serializedResult = getSerializedValue(kvState, 
serializedKeyAndNamespace);
    ```
    
    Added method:
    ```
    private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V> 
entry, byte[] serializedKeyAndNamespace) throws Exception {
                InternalKvState<K, N, V> state = entry.getState();
                KvStateInfo<K, N, V> infoForCurrentThread = 
entry.getInfoForCurrentThread();
                
                return state.getSerializedValue(
                        serializedKeyAndNamespace,
                        infoForCurrentThread.getKeySerializer(),
                        infoForCurrentThread.getNamespaceSerializer(),
                        infoForCurrentThread.getStateValueSerializer()
                );
        }
    ```


> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8802
>                 URL: https://issues.apache.org/jira/browse/FLINK-8802
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to