zentol commented on code in PR #21981: URL: https://github.com/apache/flink/pull/21981#discussion_r1117002036
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java: ########## @@ -157,13 +165,21 @@ public Map<AllocationID, Integer> calculateScore( .getMaxParallelism(), parallelism.get(evi.getJobVertexId()), evi.getSubtaskIndex()); + // Estimate state size per key group. For scoring, assume 1 if size estimate is 0 to + // accommodate for averaging non-zero states + Optional<Long> kgSizeMaybe = + stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> Math.max(e, 1L)); + if (!kgSizeMaybe.isPresent()) { + continue; + } Review Comment: > if we place the task on a different TM then it will have to download all its SST files (or am I missing something?) That part is clear. if incremental state is also covered (even if an operator didn't add any state to it in the latest checkpoint), then we're good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org