[ https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398532#comment-16398532 ]
ASF GitHub Bot commented on FLINK-8802: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174444107 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}. + * + * @param <K> The type of key the state is associated to + * @param <N> The type of the namespace the state is associated to + * @param <V> The type of values kept internally in state + */ +@Internal +public class KvStateEntry<K, N, V> { + + private final InternalKvState<K, N, V> state; + private final KvStateInfo<K, N, V> stateInfo; + + private final boolean isSerializerStateless; + + private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache; + + public KvStateEntry(final InternalKvState<K, N, V> state) { + this.state = Preconditions.checkNotNull(state); + this.stateInfo = new KvStateInfo<>( + state.getKeySerializer(), + state.getNamespaceSerializer(), + state.getValueSerializer() + ); + this.serializerCache = new ConcurrentHashMap<>(); + this.isSerializerStateless = stateInfo.duplicate() == stateInfo; + } + + public InternalKvState<K, N, V> getState() { + return state; + } + + public KvStateInfo<K, N, V> getInfoForCurrentThread() { + return isSerializerStateless + ? stateInfo + : serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate()); + } + + public void clear() { + if (serializerCache != null) { + serializerCache.clear(); + } + } + + @VisibleForTesting + public int getCacheSize() { + return serializerCache == null ? -1 : serializerCache.size(); --- End diff -- unnecessary null check > Concurrent serialization without duplicating serializers in state server. > ------------------------------------------------------------------------- > > Key: FLINK-8802 > URL: https://issues.apache.org/jira/browse/FLINK-8802 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Blocker > Fix For: 1.5.0 > > > The `getSerializedValue()` may be called by multiple threads but serializers > are not duplicated, which may lead to exceptions thrown when a serializer is > stateful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)