zhuzhurk commented on code in PR #24771:
URL: https://github.com/apache/flink/pull/24771#discussion_r1609398720


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler 
createSpeculativeExecutionHandler(
     protected void startSchedulingInternal() {
         speculativeExecutionHandler.init(
                 getExecutionGraph(), getMainThreadExecutor(), 
jobManagerJobMetricGroup);
+        jobRecoveryHandler.initialize(
+                log,
+                getExecutionGraph(),
+                shuffleMaster,
+                getMainThreadExecutor(),
+                failoverStrategy,
+                this::failJob,
+                this::resetVerticesInRecovering,
+                this::updateResultPartitionBytesMetrics,
+                this::initializeJobVertex,
+                this::updateTopology);
+
+        if (jobRecoveryHandler.needRecover()) {
+            getMainThreadExecutor()
+                    .schedule(
+                            () ->
+                                    jobRecoveryHandler.startRecovering(
+                                            this::onRecoveringFinished, 
this::onRecoveringFailed),
+                            previousWorkerRecoveryTimeout.toMillis(),

Review Comment:
   Is it possible to start the recovering immediately when all recorded 
finished partitions are recovered?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler 
createSpeculativeExecutionHandler(
     protected void startSchedulingInternal() {
         speculativeExecutionHandler.init(
                 getExecutionGraph(), getMainThreadExecutor(), 
jobManagerJobMetricGroup);
+        jobRecoveryHandler.initialize(
+                log,
+                getExecutionGraph(),
+                shuffleMaster,
+                getMainThreadExecutor(),
+                failoverStrategy,
+                this::failJob,
+                this::resetVerticesInRecovering,
+                this::updateResultPartitionBytesMetrics,
+                this::initializeJobVertex,
+                this::updateTopology);
+
+        if (jobRecoveryHandler.needRecover()) {
+            getMainThreadExecutor()
+                    .schedule(
+                            () ->
+                                    jobRecoveryHandler.startRecovering(
+                                            this::onRecoveringFinished, 
this::onRecoveringFailed),
+                            previousWorkerRecoveryTimeout.toMillis(),
+                            TimeUnit.MILLISECONDS);
+        } else {
+            tryComputeSourceParallelismThenRunAsync(
+                    (Void value, Throwable throwable) -> {
+                        if (getExecutionGraph().getState() == 
JobStatus.CREATED) {
+                            initializeVerticesIfPossible();
+                            super.startSchedulingInternal();
+                        }
+                    });
+        }
+    }
+
+    @Override
+    protected void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+        FailureHandlingResult wrappedResult = failureHandlingResult;
+        if (failureHandlingResult.canRestart()) {
+            Set<ExecutionVertexID> originalNeedToRestartVertices =
+                    failureHandlingResult.getVerticesToRestart();
+
+            Set<JobVertexID> extraNeedToRestartJobVertices =
+                    originalNeedToRestartVertices.stream()
+                            .map(ExecutionVertexID::getJobVertexId)
+                            .filter(requiredRestartJobVertices::contains)
+                            .collect(Collectors.toSet());
+
+            
requiredRestartJobVertices.removeAll(extraNeedToRestartJobVertices);
+
+            Set<ExecutionVertexID> needToRestartVertices =
+                    extraNeedToRestartJobVertices.stream()
+                            .flatMap(
+                                    jobVertexId -> {
+                                        ExecutionJobVertex jobVertex =
+                                                
getExecutionJobVertex(jobVertexId);
+                                        return 
Arrays.stream(jobVertex.getTaskVertices())
+                                                .map(ExecutionVertex::getID);
+                                    })
+                            .collect(Collectors.toSet());
+            needToRestartVertices.addAll(originalNeedToRestartVertices);
+
+            wrappedResult =
+                    FailureHandlingResult.restartable(
+                            
failureHandlingResult.getFailedExecution().orElse(null),
+                            failureHandlingResult.getError(),
+                            failureHandlingResult.getTimestamp(),
+                            failureHandlingResult.getFailureLabels(),
+                            needToRestartVertices,
+                            failureHandlingResult.getRestartDelayMS(),
+                            failureHandlingResult.isGlobalFailure(),
+                            failureHandlingResult.isRootCause());
+        }
+
+        super.maybeRestartTasks(wrappedResult);
+    }
+
+    @VisibleForTesting
+    boolean isRecovering() {
+        return jobRecoveryHandler.isRecovering();
+    }
+
+    @Override
+    public boolean updateTaskExecutionState(final TaskExecutionStateTransition 
taskExecutionState) {
+        boolean success = super.updateTaskExecutionState(taskExecutionState);
+
+        if (success
+                && taskExecutionState.getExecutionState() == 
ExecutionState.FINISHED
+                && !isRecovering()) {
+            final ExecutionVertexID executionVertexId =
+                    taskExecutionState.getID().getExecutionVertexId();
+            jobRecoveryHandler.notifyExecutionFinished(executionVertexId, 
taskExecutionState);
+        }
+        return success;
+    }
+
+    @Override
+    protected void resetForNewExecutions(Collection<ExecutionVertexID> 
vertices) {
+        super.resetForNewExecutions(vertices);
+        if (!isRecovering()) {
+            jobRecoveryHandler.notifyExecutionVertexReset(vertices);
+        }
+    }
+
+    private void initializeJobVertex(
+            ExecutionJobVertex jobVertex,
+            int parallelism,
+            Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos,
+            long createTimestamp)
+            throws JobException {
+        if (!jobVertex.isParallelismDecided()) {
+            changeJobVertexParallelism(jobVertex, parallelism);
+        } else {
+            checkState(parallelism == jobVertex.getParallelism());
+        }
+        checkState(canInitialize(jobVertex));
+        getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp, 
jobVertexInputInfos);
+        if (!isRecovering()) {
+            jobRecoveryHandler.notifyExecutionJobVertexInitialization(
+                    jobVertex.getJobVertex().getID(), parallelism, 
jobVertexInputInfos);
+        }
+    }
+
+    private void resetVerticesInRecovering(Set<ExecutionVertexID> 
verticesToReset)
+            throws Exception {
+        for (ExecutionVertexID executionVertexID : verticesToReset) {
+            notifyCoordinatorsAboutTaskFailure(
+                    
getExecutionVertex(executionVertexID).getCurrentExecutionAttempt(), null);
+        }
+        resetForNewExecutions(verticesToReset);
+        restoreState(verticesToReset, false);
+    }
 
+    private void onRecoveringFinished(Set<JobVertexID> 
requiredRestartJobVertices) {

Review Comment:
   Is is possible to avoid depending on the handler to offer this 
`requiredRestartJobVertices`, but instead continue the scheduling with the 
current topology status?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java:
##########
@@ -146,6 +146,29 @@ private void maybeScheduleVertices(final 
Set<ExecutionVertexID> vertices) {
         scheduledVertices.addAll(verticesToSchedule);
     }
 
+    @Override
+    public void scheduleVerticesAfterRecovering() {
+        final Set<ExecutionVertexID> verticesToSchedule = new HashSet<>();
+
+        Set<ExecutionVertexID> nextVertices =
+                newVertices.stream()

Review Comment:
   What if the graph was created in a static way?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryHandler.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.QuadConsumerWithException;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/** Interface for handling batch job recovery. */
+public interface BatchJobRecoveryHandler {
+
+    /**
+     * Stops the job recovery handler and optionally clears up.
+     *
+     * @param clearUp whether to clear up.
+     */
+    void stop(boolean clearUp);
+
+    /**
+     * Starts the recovery process and sets up listeners for recovery 
completion or failure.
+     *
+     * @param recoverFinishedListener a listener called in case recovery 
finished.
+     * @param recoverFailedListener a runnable called in case recovery fails.
+     */
+    void startRecovering(
+            Consumer<Set<JobVertexID>> recoverFinishedListener, Runnable 
recoverFailedListener);
+
+    /** Determines whether the job needs to undergo recovery. */
+    boolean needRecover();
+
+    /** Determines whether the job is recovering. */
+    boolean isRecovering();
+
+    /**
+     * Notifies that a set of execution vertices have been reset.
+     *
+     * @param vertices a collection of execution vertex IDs that have been 
reset.
+     */
+    void notifyExecutionVertexReset(Collection<ExecutionVertexID> vertices);
+
+    /**
+     * Notifies the initialization of a job vertex.
+     *
+     * @param jobVertexId the ID of the job vertex being initialized.
+     * @param parallelism the parallelism degree of the job vertex.
+     * @param jobVertexInputInfos a map of intermediate dataset IDs to job 
vertex input info.
+     */
+    void notifyExecutionJobVertexInitialization(
+            JobVertexID jobVertexId,
+            int parallelism,
+            Map<IntermediateDataSetID, JobVertexInputInfo> 
jobVertexInputInfos);
+
+    /** Notifies that an execution has finished. */
+    void notifyExecutionFinished(
+            ExecutionVertexID executionVertexId, TaskExecutionStateTransition 
taskExecutionState);
+
+    /** Initializes the recovery handler with the necessary components. */
+    void initialize(
+            Logger log,
+            ExecutionGraph executionGraph,
+            ShuffleMaster<?> shuffleMaster,

Review Comment:
   It's better to introduce a context to host all the methods needed for 
recovery.
   This can help the needed method to be well documented instead using 
`Consumer/QuadConsumerWithException/Runnable/...`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -479,7 +640,13 @@ public void initializeVerticesIfPossible() {
                     // ExecutionGraph#initializeJobVertex(ExecutionJobVertex, 
long) to initialize.
                     // TODO: In the future, if we want to load balance for job 
vertices whose
                     // parallelism has already been decided, we need to 
refactor the logic here.

Review Comment:
   The comment is outdated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler 
createSpeculativeExecutionHandler(
     protected void startSchedulingInternal() {
         speculativeExecutionHandler.init(
                 getExecutionGraph(), getMainThreadExecutor(), 
jobManagerJobMetricGroup);
+        jobRecoveryHandler.initialize(
+                log,
+                getExecutionGraph(),
+                shuffleMaster,
+                getMainThreadExecutor(),
+                failoverStrategy,
+                this::failJob,
+                this::resetVerticesInRecovering,
+                this::updateResultPartitionBytesMetrics,
+                this::initializeJobVertex,
+                this::updateTopology);
+
+        if (jobRecoveryHandler.needRecover()) {
+            getMainThreadExecutor()
+                    .schedule(
+                            () ->
+                                    jobRecoveryHandler.startRecovering(
+                                            this::onRecoveringFinished, 
this::onRecoveringFailed),
+                            previousWorkerRecoveryTimeout.toMillis(),
+                            TimeUnit.MILLISECONDS);
+        } else {
+            tryComputeSourceParallelismThenRunAsync(
+                    (Void value, Throwable throwable) -> {
+                        if (getExecutionGraph().getState() == 
JobStatus.CREATED) {
+                            initializeVerticesIfPossible();
+                            super.startSchedulingInternal();
+                        }
+                    });
+        }
+    }
+
+    @Override
+    protected void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {

Review Comment:
   More comments are needed to describe what this method does and what it is 
for.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyBatchJobRecoveryHandler.java:
##########
@@ -47,7 +47,7 @@
 /** A dummy implementation of the {@link BatchJobRecoveryHandler}. */
 public class DummyBatchJobRecoveryHandler implements BatchJobRecoveryHandler {
     @Override
-    public void stopJobEventManager(boolean clearEvents) {}
+    public void stop(boolean clearUp) {}

Review Comment:
   clearUp -> cleanUp
   
   They have different meanings.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler 
createSpeculativeExecutionHandler(
     protected void startSchedulingInternal() {
         speculativeExecutionHandler.init(
                 getExecutionGraph(), getMainThreadExecutor(), 
jobManagerJobMetricGroup);
+        jobRecoveryHandler.initialize(
+                log,
+                getExecutionGraph(),
+                shuffleMaster,
+                getMainThreadExecutor(),
+                failoverStrategy,
+                this::failJob,
+                this::resetVerticesInRecovering,
+                this::updateResultPartitionBytesMetrics,
+                this::initializeJobVertex,
+                this::updateTopology);
+
+        if (jobRecoveryHandler.needRecover()) {
+            getMainThreadExecutor()
+                    .schedule(
+                            () ->
+                                    jobRecoveryHandler.startRecovering(
+                                            this::onRecoveringFinished, 
this::onRecoveringFailed),
+                            previousWorkerRecoveryTimeout.toMillis(),
+                            TimeUnit.MILLISECONDS);
+        } else {
+            tryComputeSourceParallelismThenRunAsync(
+                    (Void value, Throwable throwable) -> {
+                        if (getExecutionGraph().getState() == 
JobStatus.CREATED) {
+                            initializeVerticesIfPossible();
+                            super.startSchedulingInternal();
+                        }
+                    });
+        }
+    }
+
+    @Override
+    protected void maybeRestartTasks(final FailureHandlingResult 
failureHandlingResult) {
+        FailureHandlingResult wrappedResult = failureHandlingResult;
+        if (failureHandlingResult.canRestart()) {
+            Set<ExecutionVertexID> originalNeedToRestartVertices =
+                    failureHandlingResult.getVerticesToRestart();
+
+            Set<JobVertexID> extraNeedToRestartJobVertices =
+                    originalNeedToRestartVertices.stream()
+                            .map(ExecutionVertexID::getJobVertexId)
+                            .filter(requiredRestartJobVertices::contains)
+                            .collect(Collectors.toSet());
+
+            
requiredRestartJobVertices.removeAll(extraNeedToRestartJobVertices);
+
+            Set<ExecutionVertexID> needToRestartVertices =
+                    extraNeedToRestartJobVertices.stream()
+                            .flatMap(
+                                    jobVertexId -> {
+                                        ExecutionJobVertex jobVertex =
+                                                
getExecutionJobVertex(jobVertexId);
+                                        return 
Arrays.stream(jobVertex.getTaskVertices())
+                                                .map(ExecutionVertex::getID);
+                                    })
+                            .collect(Collectors.toSet());
+            needToRestartVertices.addAll(originalNeedToRestartVertices);
+
+            wrappedResult =
+                    FailureHandlingResult.restartable(
+                            
failureHandlingResult.getFailedExecution().orElse(null),
+                            failureHandlingResult.getError(),
+                            failureHandlingResult.getTimestamp(),
+                            failureHandlingResult.getFailureLabels(),
+                            needToRestartVertices,
+                            failureHandlingResult.getRestartDelayMS(),
+                            failureHandlingResult.isGlobalFailure(),
+                            failureHandlingResult.isRootCause());
+        }
+
+        super.maybeRestartTasks(wrappedResult);
+    }
+
+    @VisibleForTesting
+    boolean isRecovering() {
+        return jobRecoveryHandler.isRecovering();
+    }
+
+    @Override
+    public boolean updateTaskExecutionState(final TaskExecutionStateTransition 
taskExecutionState) {
+        boolean success = super.updateTaskExecutionState(taskExecutionState);
+
+        if (success
+                && taskExecutionState.getExecutionState() == 
ExecutionState.FINISHED
+                && !isRecovering()) {
+            final ExecutionVertexID executionVertexId =
+                    taskExecutionState.getID().getExecutionVertexId();
+            jobRecoveryHandler.notifyExecutionFinished(executionVertexId, 
taskExecutionState);

Review Comment:
   How about to do it in `onTaskFinished()`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryHandler.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
+import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.QuadConsumerWithException;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/** Interface for handling batch job recovery. */
+public interface BatchJobRecoveryHandler {
+
+    /**
+     * Stops the job recovery handler and optionally clears up.
+     *
+     * @param clearUp whether to clear up.
+     */
+    void stop(boolean clearUp);

Review Comment:
   It's better to reorganization the order of these methods to make the code 
easier for reading.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java:
##########
@@ -55,4 +55,9 @@ public interface SchedulingStrategy {
      * @param resultPartitionId The id of the result partition
      */
     void onPartitionConsumable(IntermediateResultPartitionID 
resultPartitionId);
+
+    /** Called when the batch job recovery is finished. */
+    default void scheduleVerticesAfterRecovering() {

Review Comment:
   I prefer to name it as `scheduleAllVerticesIfPossible`. And comment that it 
schedules all vertices except for finished vertices and vertices whose inputs 
are not ready.
   Job recovery just leverages its capability to resume job scheduling.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##########
@@ -224,18 +250,153 @@ private SpeculativeExecutionHandler 
createSpeculativeExecutionHandler(
     protected void startSchedulingInternal() {
         speculativeExecutionHandler.init(
                 getExecutionGraph(), getMainThreadExecutor(), 
jobManagerJobMetricGroup);
+        jobRecoveryHandler.initialize(

Review Comment:
   Is it possible to organize the logic like this:
   ```
   if (!maybeRecoverFromPreviousJobAttempt()) {
      ... start new scheduling ...
   }
   ```
   
   And maybe combine `startRecovering()` and `initialize()` into one method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to