This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 83e2a1c2073 Add ConsistencyCheckTasksRunnerTest (#37094)
83e2a1c2073 is described below

commit 83e2a1c2073d969e4d2f8ec346c556d94a729e16
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 14 15:48:30 2025 +0800

    Add ConsistencyCheckTasksRunnerTest (#37094)
    
    * Add ConsistencyCheckTasksRunnerTest
    
    * Add ConsistencyCheckTasksRunnerTest
---
 .../scenario/consistency-check/pom.xml             |   7 +
 .../task/ConsistencyCheckTasksRunnerTest.java      | 382 +++++++++++++++++++++
 2 files changed, 389 insertions(+)

diff --git a/kernel/data-pipeline/scenario/consistency-check/pom.xml 
b/kernel/data-pipeline/scenario/consistency-check/pom.xml
index 9a02d308dbd..4095bb3d2a9 100644
--- a/kernel/data-pipeline/scenario/consistency-check/pom.xml
+++ b/kernel/data-pipeline/scenario/consistency-check/pom.xml
@@ -32,5 +32,12 @@
             <artifactId>shardingsphere-data-pipeline-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-test-infra-framework</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
new file mode 100644
index 00000000000..28373cf203b
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
+
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemErrorMessageGovernanceRepository;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemFacade;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemProcessGovernanceRepository;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobCheckGovernanceRepository;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobFacade;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckProcessContext;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.internal.configuration.plugins.Plugins;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({PipelineAPIFactory.class, PipelineJobIdUtils.class})
+class ConsistencyCheckTasksRunnerTest {
+    
+    private static final String CHECK_JOB_ID = "check_job_id";
+    
+    private static final String PARENT_JOB_ID = "parent_job_id";
+    
+    private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "Fixture");
+    
+    @Test
+    void assertStartWhenStopping() {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        jobItemContext.setStopping(true);
+        assertDoesNotThrow(new 
ConsistencyCheckTasksRunner(jobItemContext)::start);
+    }
+    
+    @Test
+    void assertStartWhenNotStopping() throws ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        PipelineExecuteEngine executeEngine = 
mock(PipelineExecuteEngine.class);
+        
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenAnswer(invocation
 -> CompletableFuture.completedFuture(null));
+        ConsistencyCheckProcessContext processContext = 
mock(ConsistencyCheckProcessContext.class);
+        
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
 jobItemContext, processContext);
+        when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new 
ConsistencyCheckJobType());
+        PipelineContextKey checkContextKey = new PipelineContextKey("check", 
InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
+        PipelineJobItemFacade jobItemFacade = 
mock(PipelineJobItemFacade.class);
+        when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+        PipelineJobItemProcessGovernanceRepository processRepository = 
mock(PipelineJobItemProcessGovernanceRepository.class);
+        when(jobItemFacade.getProcess()).thenReturn(processRepository);
+        try (MockedStatic<PipelineExecuteEngine> triggerMocked = 
mockStatic(PipelineExecuteEngine.class)) {
+            triggerMocked.when(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation -> 
null);
+            new ConsistencyCheckTasksRunner(jobItemContext).start();
+            triggerMocked.verify(() -> 
PipelineExecuteEngine.trigger(anyCollection(), any()));
+        }
+        verify(executeEngine).submit(any(PipelineLifecycleRunnable.class));
+        verify(processRepository).persist(eq(CHECK_JOB_ID), eq(0), 
anyString());
+    }
+    
+    @Test
+    void assertStop() throws ReflectiveOperationException {
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(createJobItemContext());
+        PipelineLifecycleRunnable checkExecutor = 
mock(PipelineLifecycleRunnable.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("checkExecutor"),
 runner, checkExecutor);
+        runner.stop();
+        assertTrue(runner.getJobItemContext().isStopping());
+        verify(checkExecutor).stop();
+    }
+    
+    @Test
+    void assertRunBlockingPersistResultWhenNotStopping() throws 
ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
+        PipelineProcessConfigurationPersistService processConfigPersistService 
= mock(PipelineProcessConfigurationPersistService.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
 runner, processConfigPersistService);
+        PipelineProcessConfiguration processConfig = new 
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null), new 
PipelineWriteConfiguration(1, 1, null), null);
+        PipelineJobType parentJobType = mock(PipelineJobType.class);
+        when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+        
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
+        PipelineJobConfiguration parentJobConfig = 
mock(PipelineJobConfiguration.class);
+        when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
+        PipelineContextKey parentContextKey = new PipelineContextKey("parent", 
InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+        when(processConfigPersistService.load(parentContextKey, 
"CONSISTENCY_CHECK")).thenReturn(processConfig);
+        Map<String, TableDataConsistencyCheckResult> checkResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
+        PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
+        
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(), 
jobItemContext.getJobConfig().getAlgorithmProps())).thenReturn(checkResult);
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
+        PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+        PipelineJobCheckGovernanceRepository checkRepository = 
mock(PipelineJobCheckGovernanceRepository.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+        when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+        when(jobFacade.getCheck()).thenReturn(checkRepository);
+        try (
+                MockedConstruction<PipelineJobConfigurationManager> 
mockedConstruction = mockConstruction(PipelineJobConfigurationManager.class,
+                        (mock, context) -> 
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
+                MockedStatic<PipelineProcessConfigurationUtils> 
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+            processConfigMocked.when(() -> 
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
+            
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig), 
any(TransmissionProcessContext.class), 
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
+            AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
+            invokeRunBlocking(runnable);
+            verify(jobItemManager).persistProgress(jobItemContext);
+            verify(checkRepository).persistCheckJobResult(PARENT_JOB_ID, 
CHECK_JOB_ID, checkResult);
+            
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(), 
notNullValue());
+        }
+    }
+    
+    @Test
+    void assertRunBlockingSkipPersistWhenStopping() throws 
ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        jobItemContext.setStopping(true);
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
+        PipelineProcessConfigurationPersistService processConfigPersistService 
= mock(PipelineProcessConfigurationPersistService.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
 runner, processConfigPersistService);
+        PipelineProcessConfiguration processConfig = new 
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null),
+                new PipelineWriteConfiguration(1, 1, null), null);
+        PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
+        PipelineJobType parentJobType = mock(PipelineJobType.class);
+        when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+        
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
+        PipelineJobConfiguration parentJobConfig = 
mock(PipelineJobConfiguration.class);
+        when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
+        PipelineContextKey parentContextKey = new PipelineContextKey("parent", 
InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+        when(processConfigPersistService.load(parentContextKey, 
"CONSISTENCY_CHECK")).thenReturn(processConfig);
+        Map<String, TableDataConsistencyCheckResult> checkResult = 
Collections.singletonMap("t_user", new TableDataConsistencyCheckResult(true));
+        
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(), 
jobItemContext.getJobConfig().getAlgorithmProps())).thenReturn(checkResult);
+        PipelineJobCheckGovernanceRepository checkRepository = 
mock(PipelineJobCheckGovernanceRepository.class);
+        try (
+                MockedConstruction<PipelineJobConfigurationManager> 
mockedConstruction = mockConstruction(PipelineJobConfigurationManager.class,
+                        (mock, context) -> 
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
+                MockedStatic<PipelineProcessConfigurationUtils> 
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+            processConfigMocked.when(() -> 
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
+            
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig), 
any(TransmissionProcessContext.class), 
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
+            AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
+            invokeRunBlocking(runnable);
+            verify(jobItemManager).persistProgress(jobItemContext);
+            verifyNoInteractions(checkRepository);
+            
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(), 
notNullValue());
+        }
+    }
+    
+    @Test
+    void assertOnSuccessWhenStopping() throws ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        jobItemContext.setStopping(true);
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobManager jobManager = mock(PipelineJobManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
+        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        callback.onSuccess();
+        assertThat(jobItemContext.getStatus(), is(JobStatus.RUNNING));
+        verifyNoInteractions(jobManager);
+        verifyNoInteractions(jobItemManager);
+    }
+    
+    @Test
+    void assertOnSuccessUpdateToFinished() throws ReflectiveOperationException 
{
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobManager jobManager = mock(PipelineJobManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
+        PipelineContextKey parentContextKey = new 
PipelineContextKey("parent_db", InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+        PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+        when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+        PipelineJobCheckGovernanceRepository checkRepository = 
mock(PipelineJobCheckGovernanceRepository.class);
+        when(jobFacade.getCheck()).thenReturn(checkRepository);
+        Map<String, TableDataConsistencyCheckResult> checkResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
+        when(checkRepository.getCheckJobResult(PARENT_JOB_ID, 
CHECK_JOB_ID)).thenReturn(checkResult);
+        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        callback.onSuccess();
+        assertThat(jobItemContext.getStatus(), is(JobStatus.FINISHED));
+        verify(jobItemManager).persistProgress(jobItemContext);
+        verify(jobManager).stop(CHECK_JOB_ID);
+    }
+    
+    @Test
+    void assertOnSuccessUpdateToFailure() throws ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobManager jobManager = mock(PipelineJobManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
mock(PipelineJobItemManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
 runner, jobItemManager);
+        PipelineContextKey parentContextKey = new 
PipelineContextKey("parent_db", InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+        PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+        when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+        PipelineJobCheckGovernanceRepository checkRepository = 
mock(PipelineJobCheckGovernanceRepository.class);
+        when(jobFacade.getCheck()).thenReturn(checkRepository);
+        when(checkRepository.getCheckJobResult(PARENT_JOB_ID, 
CHECK_JOB_ID)).thenReturn(Collections.emptyMap());
+        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        callback.onSuccess();
+        assertThat(jobItemContext.getStatus(), 
is(JobStatus.CONSISTENCY_CHECK_FAILURE));
+        verify(jobItemManager).persistProgress(jobItemContext);
+        verify(jobManager).stop(CHECK_JOB_ID);
+    }
+    
+    @Test
+    void assertOnFailureWhenCheckerCanceling() throws 
ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobManager jobManager = mock(PipelineJobManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
+        PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
+        when(checker.isCanceling()).thenReturn(true);
+        setConsistencyChecker(runner, checker);
+        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        callback.onFailure(new RuntimeException("cancel"));
+        verify(jobManager).stop(CHECK_JOB_ID);
+    }
+    
+    @Test
+    void assertOnFailurePersistError() throws ReflectiveOperationException {
+        ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(jobItemContext);
+        PipelineJobManager jobManager = mock(PipelineJobManager.class);
+        
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
 runner, jobManager);
+        PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
+        when(checker.isCanceling()).thenReturn(false);
+        setConsistencyChecker(runner, checker);
+        PipelineContextKey checkContextKey = new 
PipelineContextKey("check_db", InstanceType.JDBC);
+        
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+        PipelineGovernanceFacade governanceFacade = 
mock(PipelineGovernanceFacade.class);
+        
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
+        PipelineJobItemFacade jobItemFacade = 
mock(PipelineJobItemFacade.class);
+        when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+        PipelineJobItemErrorMessageGovernanceRepository errorRepository = 
mock(PipelineJobItemErrorMessageGovernanceRepository.class);
+        when(jobItemFacade.getErrorMessage()).thenReturn(errorRepository);
+        ExecuteCallback callback = createCheckExecuteCallback(runner);
+        RuntimeException failure = new RuntimeException("failure");
+        callback.onFailure(failure);
+        verify(errorRepository).update(CHECK_JOB_ID, 0, failure);
+        verify(jobManager).stop(CHECK_JOB_ID);
+    }
+    
+    @Test
+    void assertDoStopWithoutChecker() throws ReflectiveOperationException {
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(createJobItemContext());
+        AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
+        invokeDoStop(runnable);
+    }
+    
+    @Test
+    void assertDoStopWithChecker() throws ReflectiveOperationException {
+        ConsistencyCheckTasksRunner runner = new 
ConsistencyCheckTasksRunner(createJobItemContext());
+        PipelineDataConsistencyChecker checker = 
mock(PipelineDataConsistencyChecker.class);
+        setConsistencyChecker(runner, checker);
+        AbstractPipelineLifecycleRunnable runnable = 
createCheckPipelineLifecycleRunnable(runner);
+        invokeDoStop(runnable);
+        verify(checker).cancel();
+    }
+    
+    private ConsistencyCheckJobItemContext createJobItemContext() {
+        ConsistencyCheckJobConfiguration jobConfig = new 
ConsistencyCheckJobConfiguration(CHECK_JOB_ID, PARENT_JOB_ID, "FIXTURE", new 
Properties(), databaseType);
+        return new ConsistencyCheckJobItemContext(jobConfig, 0, 
JobStatus.RUNNING, null);
+    }
+    
+    private ExecuteCallback createCheckExecuteCallback(final 
ConsistencyCheckTasksRunner runner) throws ReflectiveOperationException {
+        for (Class<?> each : 
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
+            if ("CheckExecuteCallback".equals(each.getSimpleName())) {
+                Constructor<?> constructor = 
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
+                constructor.setAccessible(true);
+                return (ExecuteCallback) constructor.newInstance(runner);
+            }
+        }
+        throw new IllegalStateException("CheckExecuteCallback not found");
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void setConsistencyChecker(final ConsistencyCheckTasksRunner 
runner, final PipelineDataConsistencyChecker checker) throws 
ReflectiveOperationException {
+        Field field = 
ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker");
+        field.setAccessible(true);
+        
((java.util.concurrent.atomic.AtomicReference<PipelineDataConsistencyChecker>) 
field.get(runner)).set(checker);
+    }
+    
+    private AbstractPipelineLifecycleRunnable 
createCheckPipelineLifecycleRunnable(final ConsistencyCheckTasksRunner runner) 
throws ReflectiveOperationException {
+        for (Class<?> each : 
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
+            if ("CheckPipelineLifecycleRunnable".equals(each.getSimpleName())) 
{
+                Constructor<?> constructor = 
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
+                constructor.setAccessible(true);
+                return (AbstractPipelineLifecycleRunnable) 
constructor.newInstance(runner);
+            }
+        }
+        throw new IllegalStateException("CheckPipelineLifecycleRunnable not 
found");
+    }
+    
+    private void invokeRunBlocking(final AbstractPipelineLifecycleRunnable 
runnable) throws ReflectiveOperationException {
+        Method method = runnable.getClass().getDeclaredMethod("runBlocking");
+        method.setAccessible(true);
+        method.invoke(runnable);
+    }
+    
+    private void invokeDoStop(final AbstractPipelineLifecycleRunnable 
runnable) throws ReflectiveOperationException {
+        Method method = runnable.getClass().getDeclaredMethod("doStop");
+        method.setAccessible(true);
+        method.invoke(runnable);
+    }
+}

Reply via email to