StefanRRichter commented on a change in pull request #8611: 
[FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293559974
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##########
 @@ -156,10 +203,23 @@ public boolean isEmpty() {
         * @param transformation the transformation function.
         * @throws Exception if some exception happens in the transformation 
function.
         */
-       public abstract <T> void transform(
+       public <T> void transform(
                        N namespace,
                        T value,
-                       StateTransformationFunction<S, T> transformation) 
throws Exception;
+                       StateTransformationFunction<S, T> transformation) 
throws Exception {
+               K key = keyContext.getCurrentKey();
+               checkKeyNamespacePreconditions(key, namespace);
+
+               int keyGroup = keyContext.getCurrentKeyGroupIndex();
+               StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
+
+               if (stateMap == null) {
+                       stateMap = createStateMap();
 
 Review comment:
   If you feel like you want to keep it, you might consider writing a helper 
method for the task of creating and adding maps, because this is repeated code 
in multiple places.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to