Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174473498 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java --- @@ -70,10 +88,18 @@ * <p>If no value is associated with key and namespace, <code>null</code> * is returned. * + * <p><b>TO IMPLEMENTERS:</b> This method is called by multiple threads. Anything + * stateful (e.g. serializers) should be either duplicated or protected from undesired + * consequences of concurrent invocations. + * * @param serializedKeyAndNamespace Serialized key and namespace * @return Serialized value or <code>null</code> if no value is associated with the key and namespace. * * @throws Exception Exceptions during serialization are forwarded */ - byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception; + byte[] getSerializedValue( --- End diff -- I would suggest to store the serializer in a thread local variable. The current solution is a bit confusing because this interface suddenly exposes serializers and caller have to provide serialzer in the `getSerializedValue` method. In my opinion this interface does not make much sense in this way. Furthermore, the serializers are copied externally into something that looks like a custom-build thread local. I suggest having the serializers thread local in the base class and bringing this interface back to the original form. There is also only one threadpool, dedicated for queryable state that would hold the serializers and even the current solution has a dedicated cleanup method. In that place, we can just clean the thread locals.
---