xintongsong commented on code in PR #581:
URL: https://github.com/apache/flink-agents/pull/581#discussion_r2986859797


##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -1125,6 +1126,18 @@ private void maybeInitActionStateStore() {
         }
     }
 
+    private boolean isKeyOwnedByCurrentSubtask(Object key) {
+        int maxParallelism = 
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks();
+        int parallelism = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+        int subtaskIndex = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+
+        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 
maxParallelism);
+        int owner =
+                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                        maxParallelism, parallelism, keyGroup);

Review Comment:
   This computation here is repeated for every single key in the list state. 
This is unnecessary because all the inputs (maxParallelism, parallelism, 
keyGroup) won't change during the lifecycle of the operator. Therefore, we 
should calculate `owner` once and use it for comparing with all the keys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to