XComp commented on code in PR #28555:
URL: https://github.com/apache/flink/pull/28555#discussion_r3497481078
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,78 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
}
+ /**
+ * Verifies that IOMetrics are visible to {@link
ExecutionStateUpdateListener}s at the time they
+ * are notified of a terminal state transition via {@code markFinished()},
{@code
+ * processFail()}, and {@code recoverExecution()}.
+ */
+ @Test
+ void testIOMetricsVisibleToListenersDuringStateTransition() throws
Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+ final JobVertexID jid3 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+ JobVertex v3 = new JobVertex("v3", jid3);
+
+ SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1, v3, 1);
+ DefaultExecutionGraph graph = (DefaultExecutionGraph)
scheduler.getExecutionGraph();
+
+ Iterator<Execution> executionIter =
graph.getRegisteredExecutions().values().iterator();
+ Execution finishedExecution = executionIter.next();
+ Execution failedExecution = executionIter.next();
+ Execution recoveredExecution = executionIter.next();
+
+ // The listener receives an ExecutionAttemptID, not the Execution
object. Keep a stable
+ // map to resolve executions (markFinished deregisters them after the
transition).
+ Map<ExecutionAttemptID, Execution> executionsById = new HashMap<>();
+ executionsById.put(finishedExecution.getAttemptId(),
finishedExecution);
+ executionsById.put(failedExecution.getAttemptId(), failedExecution);
+ executionsById.put(recoveredExecution.getAttemptId(),
recoveredExecution);
+
+ Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new
HashMap<>();
+ graph.registerExecutionStateUpdateListener(
+ (attemptId, previousState, newState) -> {
+ if (newState.isTerminal()) {
+ metricsSeenByListener.put(
+ attemptId,
executionsById.get(attemptId).getIOMetrics());
+ }
+ });
+
+ IOMetrics ioMetrics = new IOMetrics(10, 20, 30, 40, 100, 200, 300);
+
+ // markFinished() only transitions from a running state.
+ finishedExecution.transitionState(ExecutionState.RUNNING);
+ finishedExecution.markFinished(Collections.emptyMap(), ioMetrics);
+
+ // markFailed() with fromSchedulerNg=true to reach the FAILED
transition.
+ failedExecution.markFailed(
+ new Exception("test failure"),
+ false,
+ Collections.emptyMap(),
+ ioMetrics,
+ false,
+ true);
+
+ // recoverExecution() transitions directly to FINISHED during JM
failover recovery.
+ recoveredExecution.recoverExecution(
+ recoveredExecution.getAttemptId(),
+ new LocalTaskManagerLocation(),
+ Collections.emptyMap(),
+ ioMetrics);
Review Comment:
Good job here. One suggestion, though: Can we remove duplicated code?
We can write a private test method that takes a callback as a parameter. We
then add individual test methods for each callback
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,78 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
}
+ /**
+ * Verifies that IOMetrics are visible to {@link
ExecutionStateUpdateListener}s at the time they
+ * are notified of a terminal state transition via {@code markFinished()},
{@code
+ * processFail()}, and {@code recoverExecution()}.
+ */
+ @Test
+ void testIOMetricsVisibleToListenersDuringStateTransition() throws
Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+ final JobVertexID jid3 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+ JobVertex v3 = new JobVertex("v3", jid3);
+
+ SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1, v3, 1);
+ DefaultExecutionGraph graph = (DefaultExecutionGraph)
scheduler.getExecutionGraph();
+
+ Iterator<Execution> executionIter =
graph.getRegisteredExecutions().values().iterator();
+ Execution finishedExecution = executionIter.next();
+ Execution failedExecution = executionIter.next();
+ Execution recoveredExecution = executionIter.next();
+
+ // The listener receives an ExecutionAttemptID, not the Execution
object. Keep a stable
+ // map to resolve executions (markFinished deregisters them after the
transition).
+ Map<ExecutionAttemptID, Execution> executionsById = new HashMap<>();
+ executionsById.put(finishedExecution.getAttemptId(),
finishedExecution);
+ executionsById.put(failedExecution.getAttemptId(), failedExecution);
+ executionsById.put(recoveredExecution.getAttemptId(),
recoveredExecution);
+
+ Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new
HashMap<>();
+ graph.registerExecutionStateUpdateListener(
+ (attemptId, previousState, newState) -> {
+ if (newState.isTerminal()) {
+ metricsSeenByListener.put(
+ attemptId,
executionsById.get(attemptId).getIOMetrics());
+ }
+ });
+
+ IOMetrics ioMetrics = new IOMetrics(10, 20, 30, 40, 100, 200, 300);
+
+ // markFinished() only transitions from a running state.
+ finishedExecution.transitionState(ExecutionState.RUNNING);
+ finishedExecution.markFinished(Collections.emptyMap(), ioMetrics);
+
+ // markFailed() with fromSchedulerNg=true to reach the FAILED
transition.
+ failedExecution.markFailed(
+ new Exception("test failure"),
+ false,
+ Collections.emptyMap(),
+ ioMetrics,
+ false,
+ true);
+
+ // recoverExecution() transitions directly to FINISHED during JM
failover recovery.
+ recoveredExecution.recoverExecution(
+ recoveredExecution.getAttemptId(),
+ new LocalTaskManagerLocation(),
+ Collections.emptyMap(),
+ ioMetrics);
+
+ for (Map.Entry<ExecutionAttemptID, Execution> entry :
executionsById.entrySet()) {
+ IOMetrics captured = metricsSeenByListener.get(entry.getKey());
+ assertThat(captured)
Review Comment:
can we also assert for the updated accumulators? ...to cover all scenarios.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,78 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
}
+ /**
+ * Verifies that IOMetrics are visible to {@link
ExecutionStateUpdateListener}s at the time they
+ * are notified of a terminal state transition via {@code markFinished()},
{@code
+ * processFail()}, and {@code recoverExecution()}.
+ */
+ @Test
+ void testIOMetricsVisibleToListenersDuringStateTransition() throws
Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+ final JobVertexID jid3 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+ JobVertex v3 = new JobVertex("v3", jid3);
+
+ SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1, v3, 1);
+ DefaultExecutionGraph graph = (DefaultExecutionGraph)
scheduler.getExecutionGraph();
+
+ Iterator<Execution> executionIter =
graph.getRegisteredExecutions().values().iterator();
+ Execution finishedExecution = executionIter.next();
+ Execution failedExecution = executionIter.next();
+ Execution recoveredExecution = executionIter.next();
+
+ // The listener receives an ExecutionAttemptID, not the Execution
object. Keep a stable
+ // map to resolve executions (markFinished deregisters them after the
transition).
+ Map<ExecutionAttemptID, Execution> executionsById = new HashMap<>();
+ executionsById.put(finishedExecution.getAttemptId(),
finishedExecution);
+ executionsById.put(failedExecution.getAttemptId(), failedExecution);
+ executionsById.put(recoveredExecution.getAttemptId(),
recoveredExecution);
+
+ Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new
HashMap<>();
+ graph.registerExecutionStateUpdateListener(
+ (attemptId, previousState, newState) -> {
+ if (newState.isTerminal()) {
+ metricsSeenByListener.put(
+ attemptId,
executionsById.get(attemptId).getIOMetrics());
+ }
+ });
+
+ IOMetrics ioMetrics = new IOMetrics(10, 20, 30, 40, 100, 200, 300);
+
+ // markFinished() only transitions from a running state.
+ finishedExecution.transitionState(ExecutionState.RUNNING);
+ finishedExecution.markFinished(Collections.emptyMap(), ioMetrics);
+
+ // markFailed() with fromSchedulerNg=true to reach the FAILED
transition.
+ failedExecution.markFailed(
+ new Exception("test failure"),
+ false,
+ Collections.emptyMap(),
+ ioMetrics,
+ false,
+ true);
+
+ // recoverExecution() transitions directly to FINISHED during JM
failover recovery.
+ recoveredExecution.recoverExecution(
+ recoveredExecution.getAttemptId(),
+ new LocalTaskManagerLocation(),
+ Collections.emptyMap(),
+ ioMetrics);
Review Comment:
We're also missing the codepath for `completeCancelling`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -1673,6 +1709,10 @@ private boolean transitionState(
ExceptionUtils.stripCompletionException(error));
}
+ if (preCompletionAction != null) {
+ preCompletionAction.run();
+ }
+
if (targetState == INITIALIZING || targetState == RUNNING) {
initializingOrRunningFuture.complete(null);
} else if (targetState.isTerminal()) {
Review Comment:
nit: What about putting the `preCompletionAction` in this block? That sounds
like it's the more appropriate location considering the name
"preCompletionAction". WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]