featzhang commented on code in PR #27719:
URL: https://github.com/apache/flink/pull/27719#discussion_r2870181324


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -67,17 +67,23 @@ class ExecutionGraphRestartTest {
     private static final int NUM_TASKS = 31;
 
     @RegisterExtension
-    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorExtension();
+    static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
+            TestingUtils.jmAsyncThreadExecutorExtension();
 
-    private static final ComponentMainThreadExecutor mainThreadExecutor =
-            ComponentMainThreadExecutorServiceAdapter.forMainThread();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> 
JM_MAIN_THREAD_EXECUTOR_EXTENSION =
+            TestingUtils.jmMainThreadExecutorExtension();
+
+    private ComponentMainThreadExecutor mainThreadExecutor;
 
     private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
 
     @BeforeEach
     void setUp() {
         taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+        mainThreadExecutor =

Review Comment:
   It might be cleaner to directly use the main-thread executor abstraction 
instead of wrapping JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor() via 
forSingleThreadExecutor.
   Please confirm that this preserves strict main-thread semantics and does not 
introduce an extra execution layer.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -349,43 +382,49 @@ void testFailExecutionAfterCancel() throws Exception {
                     new DefaultSchedulerBuilder(
                                     createJobGraphToCancel(),
                                     mainThreadExecutor,
-                                    EXECUTOR_RESOURCE.getExecutor())
+                                    EXECUTOR_EXTENSION.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
                             .setRestartBackoffTimeStrategy(
                                     new TestRestartBackoffTimeStrategy(false, 
Long.MAX_VALUE))
                             .setDelayExecutor(taskRestartExecutor)
                             .build();
-            ExecutionGraph eg = scheduler.getExecutionGraph();
+            mainThreadExecutor.execute(
+                    () -> {
+                        ExecutionGraph eg = scheduler.getExecutionGraph();
 
-            startScheduling(scheduler);
+                        startScheduling(scheduler);
 
-            offerSlots(slotPool, 1);
+                        offerSlots(slotPool, 1);
 
-            // Fail right after cancel (for example with concurrent slot 
release)
-            scheduler.cancel();
+                        // Fail right after cancel (for example with 
concurrent slot release)
+                        scheduler.cancel();
 
-            for (ExecutionVertex v : eg.getAllExecutionVertices()) {
-                v.getCurrentExecutionAttempt().fail(new Exception("Test 
Exception"));
-            }
+                        for (ExecutionVertex v : eg.getAllExecutionVertices()) 
{
+                            v.getCurrentExecutionAttempt().fail(new 
Exception("Test Exception"));
+                        }
 
-            FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
-                    .eventuallySucceeds()
-                    .isEqualTo(JobStatus.CANCELED);
+                        
FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
+                                .eventuallySucceeds()
+                                .isEqualTo(JobStatus.CANCELED);

Review Comment:
   As a structural improvement, it might be cleaner to enforce main-thread 
confinement within the scheduler setup itself rather than wrapping every test 
case body in mainThreadExecutor.execute.
   This would reduce boilerplate and improve readability.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -105,20 +111,24 @@ void testCancelAllPendingRequestWhileCanceling() throws 
Exception {
             JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
             SchedulerBase scheduler =
                     new DefaultSchedulerBuilder(
-                                    graph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                                    graph, mainThreadExecutor, 
EXECUTOR_EXTENSION.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
                             .build();
-            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
-            startScheduling(scheduler);
-            offerSlots(slotPool, NUM_TASKS);
+            mainThreadExecutor.execute(

Review Comment:
   Wrapping the entire test logic in mainThreadExecutor.execute(...) improves 
thread confinement, but we do not explicitly wait for completion.
   If the executor is asynchronous, this may still lead to timing issues.
   Would it be safer to block on a submitted future (e.g. submit(...).get()) to 
ensure deterministic execution?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -159,38 +173,43 @@ void testCancelWhileRestarting() throws Exception {
                     new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
-                                    EXECUTOR_RESOURCE.getExecutor())
+                                    EXECUTOR_EXTENSION.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
                             .setRestartBackoffTimeStrategy(
                                     new TestRestartBackoffTimeStrategy(true, 
Long.MAX_VALUE))
                             .setDelayExecutor(taskRestartExecutor)
                             .build();
-            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
-            startScheduling(scheduler);
+            mainThreadExecutor.execute(
+                    () -> {
+                        ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
 
-            final ResourceID taskManagerResourceId = offerSlots(slotPool, 
NUM_TASKS);
+                        startScheduling(scheduler);
 
-            // Release the TaskManager and wait for the job to restart
-            slotPool.releaseTaskManager(taskManagerResourceId, new 
Exception("Test Exception"));
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+                        final ResourceID taskManagerResourceId = 
offerSlots(slotPool, NUM_TASKS);
 
-            // Canceling needs to abort the restart
-            scheduler.cancel();
+                        // Release the TaskManager and wait for the job to 
restart
+                        slotPool.releaseTaskManager(
+                                taskManagerResourceId, new Exception("Test 
Exception"));
+                        
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
 
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+                        // Canceling needs to abort the restart
+                        scheduler.cancel();
 
-            taskRestartExecutor.triggerScheduledTasks();
+                        
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
 
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-            for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-                
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
-            }
+                        taskRestartExecutor.triggerScheduledTasks();
+
+                        
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+                        for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
+                            
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+                        }
+                    });
         }
     }
 
-    private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
+    private ResourceID offerSlots(SlotPool slotPool, int numSlots) {

Review Comment:
   The method offerSlots is now non-static due to the mainThreadExecutor 
instance usage.
   Please confirm that this change does not impact other test utility patterns 
or assumptions.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -159,38 +173,43 @@ void testCancelWhileRestarting() throws Exception {
                     new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
-                                    EXECUTOR_RESOURCE.getExecutor())
+                                    EXECUTOR_EXTENSION.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     
createExecutionSlotAllocatorFactory(slotPool))
                             .setRestartBackoffTimeStrategy(
                                     new TestRestartBackoffTimeStrategy(true, 
Long.MAX_VALUE))
                             .setDelayExecutor(taskRestartExecutor)
                             .build();
-            ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
-            startScheduling(scheduler);
+            mainThreadExecutor.execute(
+                    () -> {
+                        ExecutionGraph executionGraph = 
scheduler.getExecutionGraph();
 
-            final ResourceID taskManagerResourceId = offerSlots(slotPool, 
NUM_TASKS);
+                        startScheduling(scheduler);
 
-            // Release the TaskManager and wait for the job to restart
-            slotPool.releaseTaskManager(taskManagerResourceId, new 
Exception("Test Exception"));
-            
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+                        final ResourceID taskManagerResourceId = 
offerSlots(slotPool, NUM_TASKS);
 
-            // Canceling needs to abort the restart
-            scheduler.cancel();
+                        // Release the TaskManager and wait for the job to 
restart
+                        slotPool.releaseTaskManager(
+                                taskManagerResourceId, new Exception("Test 
Exception"));
+                        
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);

Review Comment:
   `assertThatFuture(executionGraph.getTerminationFuture())`
   
   Some state transitions (e.g. RESTARTING) may be triggered asynchronously.
   It may be safer to assert the state via a future-based or eventually-style 
assertion to avoid timing sensitivity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to