XComp commented on code in PR #27726:
URL: https://github.com/apache/flink/pull/27726#discussion_r3498669270


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -124,6 +125,13 @@ public class JobVertex implements java.io.Serializable {
     /** The group inside which the vertex subtasks share slots. */
     @Nullable private CoLocationGroupImpl coLocationGroup;
 
+    /**
+     * The blob key of the offloaded TaskInformation for this vertex. Stored 
here to enable reuse
+     * across ExecutionGraph rebuilds (e.g., during adaptive scheduler 
restarts), preventing
+     * unbounded accumulation of orphaned blobs in permanent storage.
+     */
+    @Nullable private PermanentBlobKey taskInformationBlobKey = null;

Review Comment:
   > The fix caches the PermanentBlobKey on JobVertex, which unlike 
ExecutionJobVertex is never recreated during adaptive scheduler restarts. 
   
   This premise doesn't hold: `AdaptiveScheduler` recreates the `JobVertex` 
instances living in the `ExecutionGraph` on each restart from the 
`adjustedJobGraph` (see 
[AdaptiveScheduler:1528](https://github.com/apache/flink/blob/709f34525918ddbfb5a9f794c0b2a23b4b5a1461/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L1528))
 - a deep copy of the original immutable `JobGraph`. Any data that's added to 
the `JobVertex` won't survive the restart.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java:
##########
@@ -460,10 +460,22 @@ public Either<SerializedValue<TaskInformation>, 
PermanentBlobKey> getTaskInforma
         // serialize the task information!
         synchronized (stateMonitor) {
             if (taskInformationOrBlobKey == null) {
+                // check if JobVertex already has a cached key from a previous 
ExecutionGraph
+                PermanentBlobKey cachedKey = 
jobVertex.getTaskInformationBlobKey();
+                if (cachedKey != null) {
+                    taskInformationOrBlobKey = Either.Right(cachedKey);
+                    return taskInformationOrBlobKey;
+                }
+
                 final BlobWriter blobWriter = graph.getBlobWriter();
                 final TaskInformation taskInformation = getTaskInformation();

Review Comment:
   TaskInformation contains the parallelism and job configuration. The 
parallelism can change during a rescale operation resulting in the 
TaskInformation change for the same JobVertex.
   
   There's also 
[FLIP-530](https://cwiki.apache.org/confluence/display/FLINK/FLIP-530%3A+Dynamic+job+configuration)
 for updating the job configuration that might affect TaskInformation later on. 
In anyway, the TaskInformation might be different for the same JobVertex 
resulting in different BlobKeys being used.
   
   So, caching might not be the right thing to do in general.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -176,6 +178,34 @@ void testInitialState() throws Exception {
         assertThat(scheduler.getState()).isInstanceOf(Created.class);
     }
 
+    @Test
+    void testCloseAsyncCleansUpTaskInformationBlobKeys() throws Exception {

Review Comment:
   We missed testing the restart logic where we would have noticed that the 
current implementation doesn't work, I guess. 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -796,6 +798,23 @@ public CompletableFuture<Void> closeAsync() {
                 getMainThreadExecutor());
     }
 
+    /**
+     * Deletes cached TaskInformation blob keys from the blob store and clears 
them from
+     * JobVertices. This prevents orphaned blobs from accumulating in the HA 
blob store across JM
+     * restarts, since the in-memory cache on JobVertex is lost on restart and 
new blobs would be
+     * created.
+     */
+    private void cleanupTaskInformationBlobKeys() {

Review Comment:
   As mentioned in [my other 
comment](https://github.com/apache/flink/pull/27726/changes#r3499093909): 
Reusing serialized data can be done here as well. But this could only work if 
the parallelism didn't change. What I don't like, though, is that we would 
assume `ExecutionJobVertex`-internal information in `AdaptiveScheduler`.
   Ideally, we would have to recreate the MessageDigest used for the BlobKey to 
verify whether the digest changed. Only if the that's not the case, we could 
reuse the serialized TaskInformation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -796,6 +798,23 @@ public CompletableFuture<Void> closeAsync() {
                 getMainThreadExecutor());
     }
 
+    /**
+     * Deletes cached TaskInformation blob keys from the blob store and clears 
them from
+     * JobVertices. This prevents orphaned blobs from accumulating in the HA 
blob store across JM
+     * restarts, since the in-memory cache on JobVertex is lost on restart and 
new blobs would be
+     * created.
+     */
+    private void cleanupTaskInformationBlobKeys() {

Review Comment:
   TaskInformation cleanup needs to happen in two situations:
   * When the JobMaster terminates. This is already handled by the 
`ResourceCleaner` component
   * When the previous ExecutionGraph becomes obsolete. The ExecutionGraph 
itself doesn't have a proper lifecycle management through `AutoCloseable`. This 
is a flaw in the current design, I guess. To fix FLINK-38697, we could add the 
intermediate cleanup to 
[AdaptiveScheduler#createExecutionGraphWithAvailableResourcesAsync](https://github.com/apache/flink/blob/709f34525918ddbfb5a9f794c0b2a23b4b5a1461/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L1521),
 though. Here, the previous state is transitioned to a new EG. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to