XComp commented on code in PR #27726:
URL: https://github.com/apache/flink/pull/27726#discussion_r3498669270
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -124,6 +125,13 @@ public class JobVertex implements java.io.Serializable {
/** The group inside which the vertex subtasks share slots. */
@Nullable private CoLocationGroupImpl coLocationGroup;
+ /**
+ * The blob key of the offloaded TaskInformation for this vertex. Stored
here to enable reuse
+ * across ExecutionGraph rebuilds (e.g., during adaptive scheduler
restarts), preventing
+ * unbounded accumulation of orphaned blobs in permanent storage.
+ */
+ @Nullable private PermanentBlobKey taskInformationBlobKey = null;
Review Comment:
> The fix caches the PermanentBlobKey on JobVertex, which unlike
ExecutionJobVertex is never recreated during adaptive scheduler restarts.
This premise doesn't hold: `AdaptiveScheduler` recreates the `JobVertex`
instances living in the `ExecutionGraph` on each restart from the
`adjustedJobGraph` (see
[AdaptiveScheduler:1528](https://github.com/apache/flink/blob/709f34525918ddbfb5a9f794c0b2a23b4b5a1461/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L1528))
- a deep copy of the original immutable `JobGraph`. Any data that's added to
the `JobVertex` won't survive the restart.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java:
##########
@@ -460,10 +460,22 @@ public Either<SerializedValue<TaskInformation>,
PermanentBlobKey> getTaskInforma
// serialize the task information!
synchronized (stateMonitor) {
if (taskInformationOrBlobKey == null) {
+ // check if JobVertex already has a cached key from a previous
ExecutionGraph
+ PermanentBlobKey cachedKey =
jobVertex.getTaskInformationBlobKey();
+ if (cachedKey != null) {
+ taskInformationOrBlobKey = Either.Right(cachedKey);
+ return taskInformationOrBlobKey;
+ }
+
final BlobWriter blobWriter = graph.getBlobWriter();
final TaskInformation taskInformation = getTaskInformation();
Review Comment:
TaskInformation contains the parallelism and job configuration. The
parallelism can change during a rescale operation resulting in the
TaskInformation change for the same JobVertex.
There's also
[FLIP-530](https://cwiki.apache.org/confluence/display/FLINK/FLIP-530%3A+Dynamic+job+configuration)
for updating the job configuration that might affect TaskInformation later on.
In anyway, the TaskInformation might be different for the same JobVertex
resulting in different BlobKeys being used.
So, caching might not be the right thing to do in general.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -176,6 +178,34 @@ void testInitialState() throws Exception {
assertThat(scheduler.getState()).isInstanceOf(Created.class);
}
+ @Test
+ void testCloseAsyncCleansUpTaskInformationBlobKeys() throws Exception {
Review Comment:
We missed testing the restart logic where we would have noticed that the
current implementation doesn't work, I guess. 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -796,6 +798,23 @@ public CompletableFuture<Void> closeAsync() {
getMainThreadExecutor());
}
+ /**
+ * Deletes cached TaskInformation blob keys from the blob store and clears
them from
+ * JobVertices. This prevents orphaned blobs from accumulating in the HA
blob store across JM
+ * restarts, since the in-memory cache on JobVertex is lost on restart and
new blobs would be
+ * created.
+ */
+ private void cleanupTaskInformationBlobKeys() {
Review Comment:
As mentioned in [my other
comment](https://github.com/apache/flink/pull/27726/changes#r3499093909):
Reusing serialized data can be done here as well. But this could only work if
the parallelism didn't change. What I don't like, though, is that we would
assume `ExecutionJobVertex`-internal information in `AdaptiveScheduler`.
Ideally, we would have to recreate the MessageDigest used for the BlobKey to
verify whether the digest changed. Only if the that's not the case, we could
reuse the serialized TaskInformation.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -796,6 +798,23 @@ public CompletableFuture<Void> closeAsync() {
getMainThreadExecutor());
}
+ /**
+ * Deletes cached TaskInformation blob keys from the blob store and clears
them from
+ * JobVertices. This prevents orphaned blobs from accumulating in the HA
blob store across JM
+ * restarts, since the in-memory cache on JobVertex is lost on restart and
new blobs would be
+ * created.
+ */
+ private void cleanupTaskInformationBlobKeys() {
Review Comment:
TaskInformation cleanup needs to happen in two situations:
* When the JobMaster terminates. This is already handled by the
`ResourceCleaner` component
* When the previous ExecutionGraph becomes obsolete. The ExecutionGraph
itself doesn't have a proper lifecycle management through `AutoCloseable`. This
is a flaw in the current design, I guess. To fix FLINK-38697, we could add the
intermediate cleanup to
[AdaptiveScheduler#createExecutionGraphWithAvailableResourcesAsync](https://github.com/apache/flink/blob/709f34525918ddbfb5a9f794c0b2a23b4b5a1461/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L1521),
though. Here, the previous state is transitioned to a new EG. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]