XComp commented on code in PR #28555:
URL: https://github.com/apache/flink/pull/28555#discussion_r3492210636
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,68 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
}
+ /**
+ * Verifies that IOMetrics are visible to {@link
ExecutionStateUpdateListener}s at the time they
+ * are notified of a terminal state transition. This guards against a
regression where
+ * updateAccumulatorsAndMetrics was called after transitionState (which
fires listeners inline),
+ * causing listeners to see null IOMetrics.
+ */
+ @Test
+ void testIOMetricsVisibleToListenersDuringStateTransition() throws
Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+
+ SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1);
+ ExecutionGraph graph = facade.supply(scheduler::getExecutionGraph);
+
+ // Capture IOMetrics as seen by the listener at notification time
+ Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new
HashMap<>();
+ facade.run(
Review Comment:
Where is the `facade` variable coming from? This is causing compilation
errors.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -1166,10 +1166,14 @@ void markFinished(Map<String, Accumulator<?, ?>>
userAccumulators, IOMetrics met
if (current == INITIALIZING || current == RUNNING || current ==
DEPLOYING) {
+ // Store metrics before the state transition so that listeners
+ // notified inside transitionState() can read them via
getIOMetrics().
+ // This matches the ordering already used by
completeCancelling().
+ updateAccumulatorsAndMetrics(userAccumulators, metrics);
+
if (transitionState(current, FINISHED)) {
Review Comment:
have you considered moving the metrics update into a pre-completion callback
that's passed into the `transitionState` method and is called as part of the
state transition before completing
[here](https://github.com/apache/flink/blob/d8461ed0c6cb6246dae1daa3d6d5edfc73815ef9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1685).
We could introduce a `transitionToFinalState` to make the contract cleaner.
WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
Review Comment:
`recoverExecution` suffers from the same issue. So, we might want to solve
it there as well, hm?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]