zentol commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1117002036


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -157,13 +165,21 @@ public Map<AllocationID, Integer> calculateScore(
                                     .getMaxParallelism(),
                             parallelism.get(evi.getJobVertexId()),
                             evi.getSubtaskIndex());
+            // Estimate state size per key group. For scoring, assume 1 if 
size estimate is 0 to
+            // accommodate for averaging non-zero states
+            Optional<Long> kgSizeMaybe =
+                    stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> 
Math.max(e, 1L));
+            if (!kgSizeMaybe.isPresent()) {
+                continue;
+            }

Review Comment:
   > if we place the task on a different TM then it will have to download all 
its SST files (or am I missing something?)
   
   That part is clear.
   
   if incremental state is also covered (even if an operator didn't add any 
state to it in the latest checkpoint), then we're good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to