Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174444245 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java --- @@ -39,29 +39,27 @@ private final InternalKvState<K, N, V> state; private final KvStateInfo<K, N, V> stateInfo; + private final boolean isSerializerStateless; + private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache; public KvStateEntry(final InternalKvState<K, N, V> state) { - this.state = Preconditions.checkNotNull(state); this.stateInfo = new KvStateInfo<>( state.getKeySerializer(), state.getNamespaceSerializer(), state.getValueSerializer() ); - - this.serializerCache = - stateInfo.duplicate() == stateInfo - ? null // if the serializers are stateless, we do not need a cache - : new ConcurrentHashMap<>(); + this.serializerCache = new ConcurrentHashMap<>(); + this.isSerializerStateless = stateInfo.duplicate() == stateInfo; --- End diff -- -> areSerializersStateless?
---