cjohnson-confluent commented on code in PR #28555:
URL: https://github.com/apache/flink/pull/28555#discussion_r3492910591
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,68 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
}
+ /**
+ * Verifies that IOMetrics are visible to {@link
ExecutionStateUpdateListener}s at the time they
+ * are notified of a terminal state transition. This guards against a
regression where
+ * updateAccumulatorsAndMetrics was called after transitionState (which
fires listeners inline),
+ * causing listeners to see null IOMetrics.
+ */
+ @Test
+ void testIOMetricsVisibleToListenersDuringStateTransition() throws
Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+
+ SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1);
+ ExecutionGraph graph = facade.supply(scheduler::getExecutionGraph);
+
+ // Capture IOMetrics as seen by the listener at notification time
+ Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new
HashMap<>();
+ facade.run(
Review Comment:
This is a vestige of my rebase from a previous release into master. I've
amended the commit so it's appropriate for master branch now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]