This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 83e2a1c2073 Add ConsistencyCheckTasksRunnerTest (#37094)
83e2a1c2073 is described below
commit 83e2a1c2073d969e4d2f8ec346c556d94a729e16
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 14 15:48:30 2025 +0800
Add ConsistencyCheckTasksRunnerTest (#37094)
* Add ConsistencyCheckTasksRunnerTest
* Add ConsistencyCheckTasksRunnerTest
---
.../scenario/consistency-check/pom.xml | 7 +
.../task/ConsistencyCheckTasksRunnerTest.java | 382 +++++++++++++++++++++
2 files changed, 389 insertions(+)
diff --git a/kernel/data-pipeline/scenario/consistency-check/pom.xml
b/kernel/data-pipeline/scenario/consistency-check/pom.xml
index 9a02d308dbd..4095bb3d2a9 100644
--- a/kernel/data-pipeline/scenario/consistency-check/pom.xml
+++ b/kernel/data-pipeline/scenario/consistency-check/pom.xml
@@ -32,5 +32,12 @@
<artifactId>shardingsphere-data-pipeline-core</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-test-infra-framework</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
new file mode 100644
index 00000000000..28373cf203b
--- /dev/null
+++
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
+
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
+import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemErrorMessageGovernanceRepository;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemFacade;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemProcessGovernanceRepository;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobCheckGovernanceRepository;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobFacade;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckProcessContext;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.internal.configuration.plugins.Plugins;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({PipelineAPIFactory.class, PipelineJobIdUtils.class})
+class ConsistencyCheckTasksRunnerTest {
+
+ private static final String CHECK_JOB_ID = "check_job_id";
+
+ private static final String PARENT_JOB_ID = "parent_job_id";
+
+ private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "Fixture");
+
+ @Test
+ void assertStartWhenStopping() {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ jobItemContext.setStopping(true);
+ assertDoesNotThrow(new
ConsistencyCheckTasksRunner(jobItemContext)::start);
+ }
+
+ @Test
+ void assertStartWhenNotStopping() throws ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenAnswer(invocation
-> CompletableFuture.completedFuture(null));
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
+ PipelineContextKey checkContextKey = new PipelineContextKey("check",
InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
+ PipelineJobItemFacade jobItemFacade =
mock(PipelineJobItemFacade.class);
+ when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+ PipelineJobItemProcessGovernanceRepository processRepository =
mock(PipelineJobItemProcessGovernanceRepository.class);
+ when(jobItemFacade.getProcess()).thenReturn(processRepository);
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
null);
+ new ConsistencyCheckTasksRunner(jobItemContext).start();
+ triggerMocked.verify(() ->
PipelineExecuteEngine.trigger(anyCollection(), any()));
+ }
+ verify(executeEngine).submit(any(PipelineLifecycleRunnable.class));
+ verify(processRepository).persist(eq(CHECK_JOB_ID), eq(0),
anyString());
+ }
+
+ @Test
+ void assertStop() throws ReflectiveOperationException {
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(createJobItemContext());
+ PipelineLifecycleRunnable checkExecutor =
mock(PipelineLifecycleRunnable.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("checkExecutor"),
runner, checkExecutor);
+ runner.stop();
+ assertTrue(runner.getJobItemContext().isStopping());
+ verify(checkExecutor).stop();
+ }
+
+ @Test
+ void assertRunBlockingPersistResultWhenNotStopping() throws
ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
+ PipelineProcessConfigurationPersistService processConfigPersistService
= mock(PipelineProcessConfigurationPersistService.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
runner, processConfigPersistService);
+ PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null), new
PipelineWriteConfiguration(1, 1, null), null);
+ PipelineJobType parentJobType = mock(PipelineJobType.class);
+ when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
+ PipelineJobConfiguration parentJobConfig =
mock(PipelineJobConfiguration.class);
+ when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
+ PipelineContextKey parentContextKey = new PipelineContextKey("parent",
InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+ when(processConfigPersistService.load(parentContextKey,
"CONSISTENCY_CHECK")).thenReturn(processConfig);
+ Map<String, TableDataConsistencyCheckResult> checkResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
+ PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
+
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(),
jobItemContext.getJobConfig().getAlgorithmProps())).thenReturn(checkResult);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
+ PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+ PipelineJobCheckGovernanceRepository checkRepository =
mock(PipelineJobCheckGovernanceRepository.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+ when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+ when(jobFacade.getCheck()).thenReturn(checkRepository);
+ try (
+ MockedConstruction<PipelineJobConfigurationManager>
mockedConstruction = mockConstruction(PipelineJobConfigurationManager.class,
+ (mock, context) ->
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
+ MockedStatic<PipelineProcessConfigurationUtils>
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+ processConfigMocked.when(() ->
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
+
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig),
any(TransmissionProcessContext.class),
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
+ AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
+ invokeRunBlocking(runnable);
+ verify(jobItemManager).persistProgress(jobItemContext);
+ verify(checkRepository).persistCheckJobResult(PARENT_JOB_ID,
CHECK_JOB_ID, checkResult);
+
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(),
notNullValue());
+ }
+ }
+
+ @Test
+ void assertRunBlockingSkipPersistWhenStopping() throws
ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ jobItemContext.setStopping(true);
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
+ PipelineProcessConfigurationPersistService processConfigPersistService
= mock(PipelineProcessConfigurationPersistService.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
runner, processConfigPersistService);
+ PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null),
+ new PipelineWriteConfiguration(1, 1, null), null);
+ PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
+ PipelineJobType parentJobType = mock(PipelineJobType.class);
+ when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
+ PipelineJobConfiguration parentJobConfig =
mock(PipelineJobConfiguration.class);
+ when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
+ PipelineContextKey parentContextKey = new PipelineContextKey("parent",
InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+ when(processConfigPersistService.load(parentContextKey,
"CONSISTENCY_CHECK")).thenReturn(processConfig);
+ Map<String, TableDataConsistencyCheckResult> checkResult =
Collections.singletonMap("t_user", new TableDataConsistencyCheckResult(true));
+
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(),
jobItemContext.getJobConfig().getAlgorithmProps())).thenReturn(checkResult);
+ PipelineJobCheckGovernanceRepository checkRepository =
mock(PipelineJobCheckGovernanceRepository.class);
+ try (
+ MockedConstruction<PipelineJobConfigurationManager>
mockedConstruction = mockConstruction(PipelineJobConfigurationManager.class,
+ (mock, context) ->
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
+ MockedStatic<PipelineProcessConfigurationUtils>
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+ processConfigMocked.when(() ->
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
+
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig),
any(TransmissionProcessContext.class),
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
+ AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
+ invokeRunBlocking(runnable);
+ verify(jobItemManager).persistProgress(jobItemContext);
+ verifyNoInteractions(checkRepository);
+
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(),
notNullValue());
+ }
+ }
+
+ @Test
+ void assertOnSuccessWhenStopping() throws ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ jobItemContext.setStopping(true);
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
+ ExecuteCallback callback = createCheckExecuteCallback(runner);
+ callback.onSuccess();
+ assertThat(jobItemContext.getStatus(), is(JobStatus.RUNNING));
+ verifyNoInteractions(jobManager);
+ verifyNoInteractions(jobItemManager);
+ }
+
+ @Test
+ void assertOnSuccessUpdateToFinished() throws ReflectiveOperationException
{
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
+ PipelineContextKey parentContextKey = new
PipelineContextKey("parent_db", InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+ PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+ when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+ PipelineJobCheckGovernanceRepository checkRepository =
mock(PipelineJobCheckGovernanceRepository.class);
+ when(jobFacade.getCheck()).thenReturn(checkRepository);
+ Map<String, TableDataConsistencyCheckResult> checkResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
+ when(checkRepository.getCheckJobResult(PARENT_JOB_ID,
CHECK_JOB_ID)).thenReturn(checkResult);
+ ExecuteCallback callback = createCheckExecuteCallback(runner);
+ callback.onSuccess();
+ assertThat(jobItemContext.getStatus(), is(JobStatus.FINISHED));
+ verify(jobItemManager).persistProgress(jobItemContext);
+ verify(jobManager).stop(CHECK_JOB_ID);
+ }
+
+ @Test
+ void assertOnSuccessUpdateToFailure() throws ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
+ PipelineContextKey parentContextKey = new
PipelineContextKey("parent_db", InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+ PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+ when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+ PipelineJobCheckGovernanceRepository checkRepository =
mock(PipelineJobCheckGovernanceRepository.class);
+ when(jobFacade.getCheck()).thenReturn(checkRepository);
+ when(checkRepository.getCheckJobResult(PARENT_JOB_ID,
CHECK_JOB_ID)).thenReturn(Collections.emptyMap());
+ ExecuteCallback callback = createCheckExecuteCallback(runner);
+ callback.onSuccess();
+ assertThat(jobItemContext.getStatus(),
is(JobStatus.CONSISTENCY_CHECK_FAILURE));
+ verify(jobItemManager).persistProgress(jobItemContext);
+ verify(jobManager).stop(CHECK_JOB_ID);
+ }
+
+ @Test
+ void assertOnFailureWhenCheckerCanceling() throws
ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
+ PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
+ when(checker.isCanceling()).thenReturn(true);
+ setConsistencyChecker(runner, checker);
+ ExecuteCallback callback = createCheckExecuteCallback(runner);
+ callback.onFailure(new RuntimeException("cancel"));
+ verify(jobManager).stop(CHECK_JOB_ID);
+ }
+
+ @Test
+ void assertOnFailurePersistError() throws ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
+ PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
+ when(checker.isCanceling()).thenReturn(false);
+ setConsistencyChecker(runner, checker);
+ PipelineContextKey checkContextKey = new
PipelineContextKey("check_db", InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
+ PipelineJobItemFacade jobItemFacade =
mock(PipelineJobItemFacade.class);
+ when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+ PipelineJobItemErrorMessageGovernanceRepository errorRepository =
mock(PipelineJobItemErrorMessageGovernanceRepository.class);
+ when(jobItemFacade.getErrorMessage()).thenReturn(errorRepository);
+ ExecuteCallback callback = createCheckExecuteCallback(runner);
+ RuntimeException failure = new RuntimeException("failure");
+ callback.onFailure(failure);
+ verify(errorRepository).update(CHECK_JOB_ID, 0, failure);
+ verify(jobManager).stop(CHECK_JOB_ID);
+ }
+
+ @Test
+ void assertDoStopWithoutChecker() throws ReflectiveOperationException {
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(createJobItemContext());
+ AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
+ invokeDoStop(runnable);
+ }
+
+ @Test
+ void assertDoStopWithChecker() throws ReflectiveOperationException {
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(createJobItemContext());
+ PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
+ setConsistencyChecker(runner, checker);
+ AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
+ invokeDoStop(runnable);
+ verify(checker).cancel();
+ }
+
+ private ConsistencyCheckJobItemContext createJobItemContext() {
+ ConsistencyCheckJobConfiguration jobConfig = new
ConsistencyCheckJobConfiguration(CHECK_JOB_ID, PARENT_JOB_ID, "FIXTURE", new
Properties(), databaseType);
+ return new ConsistencyCheckJobItemContext(jobConfig, 0,
JobStatus.RUNNING, null);
+ }
+
+ private ExecuteCallback createCheckExecuteCallback(final
ConsistencyCheckTasksRunner runner) throws ReflectiveOperationException {
+ for (Class<?> each :
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
+ if ("CheckExecuteCallback".equals(each.getSimpleName())) {
+ Constructor<?> constructor =
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
+ constructor.setAccessible(true);
+ return (ExecuteCallback) constructor.newInstance(runner);
+ }
+ }
+ throw new IllegalStateException("CheckExecuteCallback not found");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void setConsistencyChecker(final ConsistencyCheckTasksRunner
runner, final PipelineDataConsistencyChecker checker) throws
ReflectiveOperationException {
+ Field field =
ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker");
+ field.setAccessible(true);
+
((java.util.concurrent.atomic.AtomicReference<PipelineDataConsistencyChecker>)
field.get(runner)).set(checker);
+ }
+
+ private AbstractPipelineLifecycleRunnable
createCheckPipelineLifecycleRunnable(final ConsistencyCheckTasksRunner runner)
throws ReflectiveOperationException {
+ for (Class<?> each :
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
+ if ("CheckPipelineLifecycleRunnable".equals(each.getSimpleName()))
{
+ Constructor<?> constructor =
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
+ constructor.setAccessible(true);
+ return (AbstractPipelineLifecycleRunnable)
constructor.newInstance(runner);
+ }
+ }
+ throw new IllegalStateException("CheckPipelineLifecycleRunnable not
found");
+ }
+
+ private void invokeRunBlocking(final AbstractPipelineLifecycleRunnable
runnable) throws ReflectiveOperationException {
+ Method method = runnable.getClass().getDeclaredMethod("runBlocking");
+ method.setAccessible(true);
+ method.invoke(runnable);
+ }
+
+ private void invokeDoStop(final AbstractPipelineLifecycleRunnable
runnable) throws ReflectiveOperationException {
+ Method method = runnable.getClass().getDeclaredMethod("doStop");
+ method.setAccessible(true);
+ method.invoke(runnable);
+ }
+}