[ https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398259#comment-16398259 ]
ASF GitHub Bot commented on FLINK-8802: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174382685 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java --- @@ -78,13 +75,22 @@ public KvStateServerHandler( final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>(); try { - final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId()); + final KvStateEntry<?, ?, ?> kvState = registry.getKvState(request.getKvStateId()); if (kvState == null) { responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); } else { byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + // here we remove any type check... + // Ideally we want to keep that the info match the state. --- End diff -- you can retain type safety: Call from the handler: ``` byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace); ``` Added method: ``` private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V> entry, byte[] serializedKeyAndNamespace) throws Exception { InternalKvState<K, N, V> state = entry.getState(); KvStateInfo<K, N, V> infoForCurrentThread = entry.getInfoForCurrentThread(); return state.getSerializedValue( serializedKeyAndNamespace, infoForCurrentThread.getKeySerializer(), infoForCurrentThread.getNamespaceSerializer(), infoForCurrentThread.getStateValueSerializer() ); } ``` > Concurrent serialization without duplicating serializers in state server. > ------------------------------------------------------------------------- > > Key: FLINK-8802 > URL: https://issues.apache.org/jira/browse/FLINK-8802 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Blocker > Fix For: 1.5.0 > > > The `getSerializedValue()` may be called by multiple threads but serializers > are not duplicated, which may lead to exceptions thrown when a serializer is > stateful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)