StefanRRichter commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r293559974
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ########## @@ -156,10 +203,23 @@ public boolean isEmpty() { * @param transformation the transformation function. * @throws Exception if some exception happens in the transformation function. */ - public abstract <T> void transform( + public <T> void transform( N namespace, T value, - StateTransformationFunction<S, T> transformation) throws Exception; + StateTransformationFunction<S, T> transformation) throws Exception { + K key = keyContext.getCurrentKey(); + checkKeyNamespacePreconditions(key, namespace); + + int keyGroup = keyContext.getCurrentKeyGroupIndex(); + StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup); + + if (stateMap == null) { + stateMap = createStateMap(); Review comment: If you feel like you want to keep it, you might consider writing a helper method for the task of creating and adding maps, because this is repeated code in multiple places. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services