zhuzhurk commented on code in PR #24771: URL: https://github.com/apache/flink/pull/24771#discussion_r1609398720
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -224,18 +250,153 @@ private SpeculativeExecutionHandler createSpeculativeExecutionHandler( protected void startSchedulingInternal() { speculativeExecutionHandler.init( getExecutionGraph(), getMainThreadExecutor(), jobManagerJobMetricGroup); + jobRecoveryHandler.initialize( + log, + getExecutionGraph(), + shuffleMaster, + getMainThreadExecutor(), + failoverStrategy, + this::failJob, + this::resetVerticesInRecovering, + this::updateResultPartitionBytesMetrics, + this::initializeJobVertex, + this::updateTopology); + + if (jobRecoveryHandler.needRecover()) { + getMainThreadExecutor() + .schedule( + () -> + jobRecoveryHandler.startRecovering( + this::onRecoveringFinished, this::onRecoveringFailed), + previousWorkerRecoveryTimeout.toMillis(), Review Comment: Is it possible to start the recovering immediately when all recorded finished partitions are recovered? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -224,18 +250,153 @@ private SpeculativeExecutionHandler createSpeculativeExecutionHandler( protected void startSchedulingInternal() { speculativeExecutionHandler.init( getExecutionGraph(), getMainThreadExecutor(), jobManagerJobMetricGroup); + jobRecoveryHandler.initialize( + log, + getExecutionGraph(), + shuffleMaster, + getMainThreadExecutor(), + failoverStrategy, + this::failJob, + this::resetVerticesInRecovering, + this::updateResultPartitionBytesMetrics, + this::initializeJobVertex, + this::updateTopology); + + if (jobRecoveryHandler.needRecover()) { + getMainThreadExecutor() + .schedule( + () -> + jobRecoveryHandler.startRecovering( + this::onRecoveringFinished, this::onRecoveringFailed), + previousWorkerRecoveryTimeout.toMillis(), + TimeUnit.MILLISECONDS); + } else { + tryComputeSourceParallelismThenRunAsync( + (Void value, Throwable throwable) -> { + if (getExecutionGraph().getState() == JobStatus.CREATED) { + initializeVerticesIfPossible(); + super.startSchedulingInternal(); + } + }); + } + } + + @Override + protected void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + FailureHandlingResult wrappedResult = failureHandlingResult; + if (failureHandlingResult.canRestart()) { + Set<ExecutionVertexID> originalNeedToRestartVertices = + failureHandlingResult.getVerticesToRestart(); + + Set<JobVertexID> extraNeedToRestartJobVertices = + originalNeedToRestartVertices.stream() + .map(ExecutionVertexID::getJobVertexId) + .filter(requiredRestartJobVertices::contains) + .collect(Collectors.toSet()); + + requiredRestartJobVertices.removeAll(extraNeedToRestartJobVertices); + + Set<ExecutionVertexID> needToRestartVertices = + extraNeedToRestartJobVertices.stream() + .flatMap( + jobVertexId -> { + ExecutionJobVertex jobVertex = + getExecutionJobVertex(jobVertexId); + return Arrays.stream(jobVertex.getTaskVertices()) + .map(ExecutionVertex::getID); + }) + .collect(Collectors.toSet()); + needToRestartVertices.addAll(originalNeedToRestartVertices); + + wrappedResult = + FailureHandlingResult.restartable( + failureHandlingResult.getFailedExecution().orElse(null), + failureHandlingResult.getError(), + failureHandlingResult.getTimestamp(), + failureHandlingResult.getFailureLabels(), + needToRestartVertices, + failureHandlingResult.getRestartDelayMS(), + failureHandlingResult.isGlobalFailure(), + failureHandlingResult.isRootCause()); + } + + super.maybeRestartTasks(wrappedResult); + } + + @VisibleForTesting + boolean isRecovering() { + return jobRecoveryHandler.isRecovering(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionStateTransition taskExecutionState) { + boolean success = super.updateTaskExecutionState(taskExecutionState); + + if (success + && taskExecutionState.getExecutionState() == ExecutionState.FINISHED + && !isRecovering()) { + final ExecutionVertexID executionVertexId = + taskExecutionState.getID().getExecutionVertexId(); + jobRecoveryHandler.notifyExecutionFinished(executionVertexId, taskExecutionState); + } + return success; + } + + @Override + protected void resetForNewExecutions(Collection<ExecutionVertexID> vertices) { + super.resetForNewExecutions(vertices); + if (!isRecovering()) { + jobRecoveryHandler.notifyExecutionVertexReset(vertices); + } + } + + private void initializeJobVertex( + ExecutionJobVertex jobVertex, + int parallelism, + Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos, + long createTimestamp) + throws JobException { + if (!jobVertex.isParallelismDecided()) { + changeJobVertexParallelism(jobVertex, parallelism); + } else { + checkState(parallelism == jobVertex.getParallelism()); + } + checkState(canInitialize(jobVertex)); + getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp, jobVertexInputInfos); + if (!isRecovering()) { + jobRecoveryHandler.notifyExecutionJobVertexInitialization( + jobVertex.getJobVertex().getID(), parallelism, jobVertexInputInfos); + } + } + + private void resetVerticesInRecovering(Set<ExecutionVertexID> verticesToReset) + throws Exception { + for (ExecutionVertexID executionVertexID : verticesToReset) { + notifyCoordinatorsAboutTaskFailure( + getExecutionVertex(executionVertexID).getCurrentExecutionAttempt(), null); + } + resetForNewExecutions(verticesToReset); + restoreState(verticesToReset, false); + } + private void onRecoveringFinished(Set<JobVertexID> requiredRestartJobVertices) { Review Comment: Is is possible to avoid depending on the handler to offer this `requiredRestartJobVertices`, but instead continue the scheduling with the current topology status? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java: ########## @@ -146,6 +146,29 @@ private void maybeScheduleVertices(final Set<ExecutionVertexID> vertices) { scheduledVertices.addAll(verticesToSchedule); } + @Override + public void scheduleVerticesAfterRecovering() { + final Set<ExecutionVertexID> verticesToSchedule = new HashSet<>(); + + Set<ExecutionVertexID> nextVertices = + newVertices.stream() Review Comment: What if the graph was created in a static way? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryHandler.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.JobVertexInputInfo; +import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.function.ConsumerWithException; +import org.apache.flink.util.function.QuadConsumerWithException; +import org.apache.flink.util.function.TriConsumer; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** Interface for handling batch job recovery. */ +public interface BatchJobRecoveryHandler { + + /** + * Stops the job recovery handler and optionally clears up. + * + * @param clearUp whether to clear up. + */ + void stop(boolean clearUp); + + /** + * Starts the recovery process and sets up listeners for recovery completion or failure. + * + * @param recoverFinishedListener a listener called in case recovery finished. + * @param recoverFailedListener a runnable called in case recovery fails. + */ + void startRecovering( + Consumer<Set<JobVertexID>> recoverFinishedListener, Runnable recoverFailedListener); + + /** Determines whether the job needs to undergo recovery. */ + boolean needRecover(); + + /** Determines whether the job is recovering. */ + boolean isRecovering(); + + /** + * Notifies that a set of execution vertices have been reset. + * + * @param vertices a collection of execution vertex IDs that have been reset. + */ + void notifyExecutionVertexReset(Collection<ExecutionVertexID> vertices); + + /** + * Notifies the initialization of a job vertex. + * + * @param jobVertexId the ID of the job vertex being initialized. + * @param parallelism the parallelism degree of the job vertex. + * @param jobVertexInputInfos a map of intermediate dataset IDs to job vertex input info. + */ + void notifyExecutionJobVertexInitialization( + JobVertexID jobVertexId, + int parallelism, + Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos); + + /** Notifies that an execution has finished. */ + void notifyExecutionFinished( + ExecutionVertexID executionVertexId, TaskExecutionStateTransition taskExecutionState); + + /** Initializes the recovery handler with the necessary components. */ + void initialize( + Logger log, + ExecutionGraph executionGraph, + ShuffleMaster<?> shuffleMaster, Review Comment: It's better to introduce a context to host all the methods needed for recovery. This can help the needed method to be well documented instead using `Consumer/QuadConsumerWithException/Runnable/...`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -479,7 +640,13 @@ public void initializeVerticesIfPossible() { // ExecutionGraph#initializeJobVertex(ExecutionJobVertex, long) to initialize. // TODO: In the future, if we want to load balance for job vertices whose // parallelism has already been decided, we need to refactor the logic here. Review Comment: The comment is outdated. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -224,18 +250,153 @@ private SpeculativeExecutionHandler createSpeculativeExecutionHandler( protected void startSchedulingInternal() { speculativeExecutionHandler.init( getExecutionGraph(), getMainThreadExecutor(), jobManagerJobMetricGroup); + jobRecoveryHandler.initialize( + log, + getExecutionGraph(), + shuffleMaster, + getMainThreadExecutor(), + failoverStrategy, + this::failJob, + this::resetVerticesInRecovering, + this::updateResultPartitionBytesMetrics, + this::initializeJobVertex, + this::updateTopology); + + if (jobRecoveryHandler.needRecover()) { + getMainThreadExecutor() + .schedule( + () -> + jobRecoveryHandler.startRecovering( + this::onRecoveringFinished, this::onRecoveringFailed), + previousWorkerRecoveryTimeout.toMillis(), + TimeUnit.MILLISECONDS); + } else { + tryComputeSourceParallelismThenRunAsync( + (Void value, Throwable throwable) -> { + if (getExecutionGraph().getState() == JobStatus.CREATED) { + initializeVerticesIfPossible(); + super.startSchedulingInternal(); + } + }); + } + } + + @Override + protected void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { Review Comment: More comments are needed to describe what this method does and what it is for. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyBatchJobRecoveryHandler.java: ########## @@ -47,7 +47,7 @@ /** A dummy implementation of the {@link BatchJobRecoveryHandler}. */ public class DummyBatchJobRecoveryHandler implements BatchJobRecoveryHandler { @Override - public void stopJobEventManager(boolean clearEvents) {} + public void stop(boolean clearUp) {} Review Comment: clearUp -> cleanUp They have different meanings. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -224,18 +250,153 @@ private SpeculativeExecutionHandler createSpeculativeExecutionHandler( protected void startSchedulingInternal() { speculativeExecutionHandler.init( getExecutionGraph(), getMainThreadExecutor(), jobManagerJobMetricGroup); + jobRecoveryHandler.initialize( + log, + getExecutionGraph(), + shuffleMaster, + getMainThreadExecutor(), + failoverStrategy, + this::failJob, + this::resetVerticesInRecovering, + this::updateResultPartitionBytesMetrics, + this::initializeJobVertex, + this::updateTopology); + + if (jobRecoveryHandler.needRecover()) { + getMainThreadExecutor() + .schedule( + () -> + jobRecoveryHandler.startRecovering( + this::onRecoveringFinished, this::onRecoveringFailed), + previousWorkerRecoveryTimeout.toMillis(), + TimeUnit.MILLISECONDS); + } else { + tryComputeSourceParallelismThenRunAsync( + (Void value, Throwable throwable) -> { + if (getExecutionGraph().getState() == JobStatus.CREATED) { + initializeVerticesIfPossible(); + super.startSchedulingInternal(); + } + }); + } + } + + @Override + protected void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { + FailureHandlingResult wrappedResult = failureHandlingResult; + if (failureHandlingResult.canRestart()) { + Set<ExecutionVertexID> originalNeedToRestartVertices = + failureHandlingResult.getVerticesToRestart(); + + Set<JobVertexID> extraNeedToRestartJobVertices = + originalNeedToRestartVertices.stream() + .map(ExecutionVertexID::getJobVertexId) + .filter(requiredRestartJobVertices::contains) + .collect(Collectors.toSet()); + + requiredRestartJobVertices.removeAll(extraNeedToRestartJobVertices); + + Set<ExecutionVertexID> needToRestartVertices = + extraNeedToRestartJobVertices.stream() + .flatMap( + jobVertexId -> { + ExecutionJobVertex jobVertex = + getExecutionJobVertex(jobVertexId); + return Arrays.stream(jobVertex.getTaskVertices()) + .map(ExecutionVertex::getID); + }) + .collect(Collectors.toSet()); + needToRestartVertices.addAll(originalNeedToRestartVertices); + + wrappedResult = + FailureHandlingResult.restartable( + failureHandlingResult.getFailedExecution().orElse(null), + failureHandlingResult.getError(), + failureHandlingResult.getTimestamp(), + failureHandlingResult.getFailureLabels(), + needToRestartVertices, + failureHandlingResult.getRestartDelayMS(), + failureHandlingResult.isGlobalFailure(), + failureHandlingResult.isRootCause()); + } + + super.maybeRestartTasks(wrappedResult); + } + + @VisibleForTesting + boolean isRecovering() { + return jobRecoveryHandler.isRecovering(); + } + + @Override + public boolean updateTaskExecutionState(final TaskExecutionStateTransition taskExecutionState) { + boolean success = super.updateTaskExecutionState(taskExecutionState); + + if (success + && taskExecutionState.getExecutionState() == ExecutionState.FINISHED + && !isRecovering()) { + final ExecutionVertexID executionVertexId = + taskExecutionState.getID().getExecutionVertexId(); + jobRecoveryHandler.notifyExecutionFinished(executionVertexId, taskExecutionState); Review Comment: How about to do it in `onTaskFinished()`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryHandler.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.JobVertexInputInfo; +import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.function.ConsumerWithException; +import org.apache.flink.util.function.QuadConsumerWithException; +import org.apache.flink.util.function.TriConsumer; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** Interface for handling batch job recovery. */ +public interface BatchJobRecoveryHandler { + + /** + * Stops the job recovery handler and optionally clears up. + * + * @param clearUp whether to clear up. + */ + void stop(boolean clearUp); Review Comment: It's better to reorganization the order of these methods to make the code easier for reading. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java: ########## @@ -55,4 +55,9 @@ public interface SchedulingStrategy { * @param resultPartitionId The id of the result partition */ void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId); + + /** Called when the batch job recovery is finished. */ + default void scheduleVerticesAfterRecovering() { Review Comment: I prefer to name it as `scheduleAllVerticesIfPossible`. And comment that it schedules all vertices except for finished vertices and vertices whose inputs are not ready. Job recovery just leverages its capability to resume job scheduling. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ########## @@ -224,18 +250,153 @@ private SpeculativeExecutionHandler createSpeculativeExecutionHandler( protected void startSchedulingInternal() { speculativeExecutionHandler.init( getExecutionGraph(), getMainThreadExecutor(), jobManagerJobMetricGroup); + jobRecoveryHandler.initialize( Review Comment: Is it possible to organize the logic like this: ``` if (!maybeRecoverFromPreviousJobAttempt()) { ... start new scheduling ... } ``` And maybe combine `startRecovering()` and `initialize()` into one method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org