[ https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405713#comment-16405713 ]
ASF GitHub Bot commented on FLINK-8802: --------------------------------------- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r175640731 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalQueryableKvState.java --- @@ -16,46 +16,42 @@ * limitations under the License. */ -package org.apache.flink.runtime.query; +package org.apache.flink.runtime.state.internal; -import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.query.KvStateInfo; import org.apache.flink.util.Preconditions; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** - * An entry holding the {@link InternalKvState} along with its {@link KvStateInfo}. + * An abstract base class to be subclassed by states that are expected to be queryable. + * Its main task is to keep a "thread-local" copy of the different serializers (if needed). * * @param <K> The type of key the state is associated to * @param <N> The type of the namespace the state is associated to * @param <V> The type of values kept internally in state */ -@Internal -public class KvStateEntry<K, N, V> { +public abstract class InternalQueryableKvState<K, N, V> implements InternalKvState<N> { - private final InternalKvState<K, N, V> state; private final KvStateInfo<K, N, V> stateInfo; - private final boolean areSerializersStateless; - private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache; + private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache = new ConcurrentHashMap<>(4); --- End diff -- nit: just wonder why didn't use ThreadLocal<KvStateInfo<K, N, V>> provided by JDK... > Concurrent serialization without duplicating serializers in state server. > ------------------------------------------------------------------------- > > Key: FLINK-8802 > URL: https://issues.apache.org/jira/browse/FLINK-8802 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Blocker > Fix For: 1.5.0 > > > The `getSerializedValue()` may be called by multiple threads but serializers > are not duplicated, which may lead to exceptions thrown when a serializer is > stateful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)