This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 14227d2e172 Refactor ConsistencyCheckTasksRunnerTest (#37256)
14227d2e172 is described below

commit 14227d2e172063dc28c049a8888aaabea0311a62
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 2 15:34:25 2025 +0800

    Refactor ConsistencyCheckTasksRunnerTest (#37256)
    
    * Add StatisticsManagerTest
    
    * Add more test cases on DatabaseMetaDataManagerTest
    
    * Refactor ConsistencyCheckTasksRunnerTest
    
    * Refactor ConsistencyCheckTasksRunnerTest
    
    * Refactor ConsistencyCheckTasksRunnerTest
---
 .../task/ConsistencyCheckTasksRunnerTest.java      | 279 +++++++++++++++------
 1 file changed, 209 insertions(+), 70 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
index 669115c09a4..b84769fc28c 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
@@ -60,13 +60,11 @@ import org.mockito.MockedConstruction;
 import org.mockito.MockedStatic;
 import org.mockito.internal.configuration.plugins.Plugins;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -143,12 +141,13 @@ class ConsistencyCheckTasksRunnerTest {
         ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
         ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
-        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
         PipelineProcessConfigurationPersistService processConfigPersistService 
= mock(PipelineProcessConfigurationPersistService.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
 runner, processConfigPersistService);
         PipelineProcessConfiguration processConfig = new 
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null), new 
PipelineWriteConfiguration(1, 1, null), null);
         PipelineJobType<PipelineJobConfiguration> parentJobType = 
mock(PipelineJobType.class);
         when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
         
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
         PipelineJobConfiguration parentJobConfig = 
mock(PipelineJobConfiguration.class);
         when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
@@ -164,17 +163,29 @@ class ConsistencyCheckTasksRunnerTest {
         
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
         when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
         when(jobFacade.getCheck()).thenReturn(checkRepository);
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenAnswer(invocation
 -> {
+            PipelineLifecycleRunnable runnable = invocation.getArgument(0);
+            runnable.start();
+            return CompletableFuture.completedFuture(null);
+        });
         try (
                 MockedConstruction<PipelineJobConfigurationManager> ignore = 
mockConstruction(PipelineJobConfigurationManager.class,
                         (mock, context) -> 
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
-                MockedStatic<PipelineProcessConfigurationUtils> 
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+                MockedStatic<PipelineProcessConfigurationUtils> 
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class);
+                MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
             processConfigMocked.when(() -> 
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
             
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig), 
any(TransmissionProcessContext.class), 
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
-            AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
-            invokeRunBlocking(runnable);
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
null);
+            runner.start();
             verify(jobItemManager).persistProgress(jobItemContext);
             verify(checkRepository).persistCheckJobResult(PARENT_JOB_ID, 
CHECK_JOB_ID, checkResult);
             
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(), 
notNullValue());
+            runner.stop();
+            verify(checker).cancel();
         }
     }
     
@@ -182,17 +193,17 @@ class ConsistencyCheckTasksRunnerTest {
     @Test
     void assertRunBlockingSkipPersistWhenStopping() throws 
ReflectiveOperationException {
         ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
-        jobItemContext.setStopping(true);
         ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
         PipelineProcessConfigurationPersistService processConfigPersistService 
= mock(PipelineProcessConfigurationPersistService.class);
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
 runner, processConfigPersistService);
-        PipelineProcessConfiguration processConfig = new 
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null),
-                new PipelineWriteConfiguration(1, 1, null), null);
+        PipelineProcessConfiguration processConfig = new 
PipelineProcessConfiguration(
+                new PipelineReadConfiguration(1, 1, 1, null), new 
PipelineWriteConfiguration(1, 1, null), null);
         PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
         PipelineJobType<PipelineJobConfiguration> parentJobType = 
mock(PipelineJobType.class);
         when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
         
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
         PipelineJobConfiguration parentJobConfig = 
mock(PipelineJobConfiguration.class);
         when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
@@ -200,16 +211,29 @@ class ConsistencyCheckTasksRunnerTest {
         
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
         when(processConfigPersistService.load(parentContextKey, 
"CONSISTENCY_CHECK")).thenReturn(processConfig);
         Map<String, TableDataConsistencyCheckResult> checkResult = 
Collections.singletonMap("t_user", new TableDataConsistencyCheckResult(true));
-        
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(), 
jobItemContext.getJobConfig().getAlgorithmProps())).thenReturn(checkResult);
+        
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(), 
jobItemContext.getJobConfig().getAlgorithmProps())).thenAnswer(invocation -> {
+            jobItemContext.setStopping(true);
+            return checkResult;
+        });
         PipelineJobCheckGovernanceRepository checkRepository = 
mock(PipelineJobCheckGovernanceRepository.class);
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenAnswer(invocation
 -> {
+            PipelineLifecycleRunnable runnable = invocation.getArgument(0);
+            runnable.start();
+            return CompletableFuture.completedFuture(null);
+        });
         try (
                 MockedConstruction<PipelineJobConfigurationManager> ignored = 
mockConstruction(PipelineJobConfigurationManager.class,
                         (mock, context) -> 
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
-                MockedStatic<PipelineProcessConfigurationUtils> 
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+                MockedStatic<PipelineProcessConfigurationUtils> 
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class);
+                MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
             processConfigMocked.when(() -> 
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
             
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig), 
any(TransmissionProcessContext.class), 
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
-            AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
-            invokeRunBlocking(runnable);
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
null);
+            runner.start();
             verify(jobItemManager).persistProgress(jobItemContext);
             verifyNoInteractions(checkRepository);
             
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(), 
notNullValue());
@@ -220,13 +244,27 @@ class ConsistencyCheckTasksRunnerTest {
     @Test
     void assertOnSuccessWhenStopping() throws ReflectiveOperationException {
         ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
-        jobItemContext.setStopping(true);
         ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
         PipelineJobManager jobManager = mock(PipelineJobManager.class);
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
-        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+        AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
{
+                callbackRef.set(invocation.getArgument(1));
+                return null;
+            });
+            runner.start();
+        }
+        ExecuteCallback callback = callbackRef.get();
+        jobItemContext.setStopping(true);
         callback.onSuccess();
         assertThat(jobItemContext.getStatus(), is(JobStatus.RUNNING));
         verifyNoInteractions(jobManager);
@@ -244,6 +282,7 @@ class ConsistencyCheckTasksRunnerTest {
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
         PipelineContextKey parentContextKey = new 
PipelineContextKey("parent_db", InstanceType.JDBC);
         
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
         PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
         
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
         PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
@@ -252,7 +291,20 @@ class ConsistencyCheckTasksRunnerTest {
         when(jobFacade.getCheck()).thenReturn(checkRepository);
         Map<String, TableDataConsistencyCheckResult> checkResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
         when(checkRepository.getCheckJobResult(PARENT_JOB_ID, 
CHECK_JOB_ID)).thenReturn(checkResult);
-        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
{
+                callbackRef.set(invocation.getArgument(1));
+                return null;
+            });
+            runner.start();
+        }
+        ExecuteCallback callback = callbackRef.get();
         callback.onSuccess();
         assertThat(jobItemContext.getStatus(), is(JobStatus.FINISHED));
         verify(jobItemManager).persistProgress(jobItemContext);
@@ -270,6 +322,7 @@ class ConsistencyCheckTasksRunnerTest {
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
         PipelineContextKey parentContextKey = new 
PipelineContextKey("parent_db", InstanceType.JDBC);
         
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
         PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
         
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
         PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
@@ -277,7 +330,68 @@ class ConsistencyCheckTasksRunnerTest {
         PipelineJobCheckGovernanceRepository checkRepository = 
mock(PipelineJobCheckGovernanceRepository.class);
         when(jobFacade.getCheck()).thenReturn(checkRepository);
         when(checkRepository.getCheckJobResult(PARENT_JOB_ID, 
CHECK_JOB_ID)).thenReturn(Collections.emptyMap());
-        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
{
+                callbackRef.set(invocation.getArgument(1));
+                return null;
+            });
+            runner.start();
+        }
+        ExecuteCallback callback = callbackRef.get();
+        callback.onSuccess();
+        assertThat(jobItemContext.getStatus(), 
is(JobStatus.CONSISTENCY_CHECK_FAILURE));
+        verify(jobItemManager).persistProgress(jobItemContext);
+        verify(jobManager).stop(CHECK_JOB_ID);
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    void assertOnSuccessUpdateToFailureWhenResultContainsFailedItem() throws 
ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobManager jobManager = mock(PipelineJobManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
+        PipelineContextKey checkContextKey = new 
PipelineContextKey("check_db_fail", InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+        PipelineContextKey parentContextKey = new 
PipelineContextKey("parent_db", InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
+        PipelineGovernanceFacade checkGovernanceFacade = 
mock(PipelineGovernanceFacade.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(checkGovernanceFacade);
+        PipelineJobItemFacade jobItemFacade = 
mock(PipelineJobItemFacade.class);
+        PipelineJobItemProcessGovernanceRepository processRepository = 
mock(PipelineJobItemProcessGovernanceRepository.class);
+        when(jobItemFacade.getProcess()).thenReturn(processRepository);
+        
when(checkGovernanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+        PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+        when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+        PipelineJobCheckGovernanceRepository checkRepository = 
mock(PipelineJobCheckGovernanceRepository.class);
+        when(jobFacade.getCheck()).thenReturn(checkRepository);
+        Map<String, TableDataConsistencyCheckResult> checkResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(false));
+        when(checkRepository.getCheckJobResult(PARENT_JOB_ID, 
CHECK_JOB_ID)).thenReturn(checkResult);
+        AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
{
+                callbackRef.set(invocation.getArgument(1));
+                return null;
+            });
+            runner.start();
+        }
+        ExecuteCallback callback = callbackRef.get();
         callback.onSuccess();
         assertThat(jobItemContext.getStatus(), 
is(JobStatus.CONSISTENCY_CHECK_FAILURE));
         verify(jobItemManager).persistProgress(jobItemContext);
@@ -292,8 +406,22 @@ class ConsistencyCheckTasksRunnerTest {
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
         PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
         when(checker.isCanceling()).thenReturn(true);
-        setConsistencyChecker(runner, checker);
-        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker"),
 runner, new AtomicReference<>(checker));
+        AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
{
+                callbackRef.set(invocation.getArgument(1));
+                return null;
+            });
+            runner.start();
+        }
+        ExecuteCallback callback = callbackRef.get();
         callback.onFailure(new RuntimeException("cancel"));
         verify(jobManager).stop(CHECK_JOB_ID);
     }
@@ -306,16 +434,32 @@ class ConsistencyCheckTasksRunnerTest {
         
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
         PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
         when(checker.isCanceling()).thenReturn(false);
-        setConsistencyChecker(runner, checker);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker"),
 runner, new AtomicReference<>(checker));
         PipelineContextKey checkContextKey = new 
PipelineContextKey("check_db", InstanceType.JDBC);
         
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
         PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
         
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
         PipelineJobItemFacade jobItemFacade = 
mock(PipelineJobItemFacade.class);
         when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
         PipelineJobItemErrorMessageGovernanceRepository errorRepository = 
mock(PipelineJobItemErrorMessageGovernanceRepository.class);
+        PipelineJobItemProcessGovernanceRepository processRepository = 
mock(PipelineJobItemProcessGovernanceRepository.class);
+        when(jobItemFacade.getProcess()).thenReturn(processRepository);
         when(jobItemFacade.getErrorMessage()).thenReturn(errorRepository);
-        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
{
+                callbackRef.set(invocation.getArgument(1));
+                return null;
+            });
+            runner.start();
+        }
+        ExecuteCallback callback = callbackRef.get();
         RuntimeException failure = new RuntimeException("failure");
         callback.onFailure(failure);
         verify(errorRepository).update(CHECK_JOB_ID, 0, failure);
@@ -323,20 +467,49 @@ class ConsistencyCheckTasksRunnerTest {
     }
     
     @Test
-    void assertDoStopWithoutChecker() throws ReflectiveOperationException {
-        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(createJobItemContext());
-        AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
-        invokeDoStop(runnable);
+    void assertOnFailurePersistErrorWhenCheckerAbsent() throws 
ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobManager jobManager = mock(PipelineJobManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
+        PipelineContextKey checkContextKey = new 
PipelineContextKey("check_db_null_checker", InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
+        PipelineJobItemFacade jobItemFacade = 
mock(PipelineJobItemFacade.class);
+        PipelineJobItemProcessGovernanceRepository processRepository = 
mock(PipelineJobItemProcessGovernanceRepository.class);
+        when(jobItemFacade.getProcess()).thenReturn(processRepository);
+        when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+        PipelineJobItemErrorMessageGovernanceRepository errorRepository = 
mock(PipelineJobItemErrorMessageGovernanceRepository.class);
+        when(jobItemFacade.getErrorMessage()).thenReturn(errorRepository);
+        AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
{
+                callbackRef.set(invocation.getArgument(1));
+                return null;
+            });
+            runner.start();
+        }
+        ExecuteCallback callback = callbackRef.get();
+        RuntimeException failure = new RuntimeException("failure");
+        callback.onFailure(failure);
+        verify(errorRepository).update(CHECK_JOB_ID, 0, failure);
+        verify(jobManager).stop(CHECK_JOB_ID);
     }
     
     @Test
-    void assertDoStopWithChecker() throws ReflectiveOperationException {
+    void assertStopWithoutConsistencyChecker() throws 
ReflectiveOperationException {
         ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(createJobItemContext());
-        PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
-        setConsistencyChecker(runner, checker);
-        AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
-        invokeDoStop(runnable);
-        verify(checker).cancel();
+        PipelineLifecycleRunnable checkExecutor = (PipelineLifecycleRunnable) 
Plugins.getMemberAccessor().get(ConsistencyCheckTasksRunner.class.getDeclaredField("checkExecutor"),
 runner);
+        markLifecycleRunnableRunning(checkExecutor);
+        runner.stop();
+        assertTrue(runner.getJobItemContext().isStopping());
     }
     
     private ConsistencyCheckJobItemContext createJobItemContext() {
@@ -344,44 +517,10 @@ class ConsistencyCheckTasksRunnerTest {
         return new ConsistencyCheckJobItemContext(jobConfig, 0, 
JobStatus.RUNNING, null);
     }
     
-    private ExecuteCallback createCheckExecuteCallback(final 
ConsistencyCheckTasksRunner runner) throws ReflectiveOperationException {
-        for (Class<?> each : 
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
-            if ("CheckExecuteCallback".equals(each.getSimpleName())) {
-                Constructor<?> constructor = 
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
-                constructor.setAccessible(true);
-                return (ExecuteCallback) constructor.newInstance(runner);
-            }
-        }
-        throw new IllegalStateException("CheckExecuteCallback not found");
-    }
-    
     @SuppressWarnings("unchecked")
-    private void setConsistencyChecker(final ConsistencyCheckTasksRunner 
runner, final PipelineDataConsistencyChecker checker) throws 
ReflectiveOperationException {
-        Field field = 
ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker");
-        field.setAccessible(true);
-        
((java.util.concurrent.atomic.AtomicReference<PipelineDataConsistencyChecker>) 
field.get(runner)).set(checker);
-    }
-    
-    private AbstractPipelineLifecycleRunnable 
createCheckPipelineLifecycleRunnable(final ConsistencyCheckTasksRunner runner) 
throws ReflectiveOperationException {
-        for (Class<?> each : 
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
-            if ("CheckPipelineLifecycleRunnable".equals(each.getSimpleName())) 
{
-                Constructor<?> constructor = 
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
-                constructor.setAccessible(true);
-                return (AbstractPipelineLifecycleRunnable) 
constructor.newInstance(runner);
-            }
-        }
-        throw new IllegalStateException("CheckPipelineLifecycleRunnable not 
found");
-    }
-    
-    private void invokeRunBlocking(final AbstractPipelineLifecycleRunnable 
runnable) throws ReflectiveOperationException {
-        Method method = runnable.getClass().getDeclaredMethod("runBlocking");
-        method.setAccessible(true);
-        method.invoke(runnable);
-    }
-    
-    private void invokeDoStop(final AbstractPipelineLifecycleRunnable 
runnable) throws ReflectiveOperationException {
-        Method method = runnable.getClass().getDeclaredMethod("doStop");
-        method.setAccessible(true);
-        method.invoke(runnable);
+    private void markLifecycleRunnableRunning(final PipelineLifecycleRunnable 
runnable) throws ReflectiveOperationException {
+        AtomicReference<Boolean> running = (AtomicReference<Boolean>) 
Plugins.getMemberAccessor().get(AbstractPipelineLifecycleRunnable.class.getDeclaredField("running"),
 runnable);
+        running.set(true);
+        
Plugins.getMemberAccessor().set(AbstractPipelineLifecycleRunnable.class.getDeclaredField("startTimeMillis"),
 runnable, System.currentTimeMillis());
     }
 }

Reply via email to