This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 14227d2e172 Refactor ConsistencyCheckTasksRunnerTest (#37256)
14227d2e172 is described below
commit 14227d2e172063dc28c049a8888aaabea0311a62
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 2 15:34:25 2025 +0800
Refactor ConsistencyCheckTasksRunnerTest (#37256)
* Add StatisticsManagerTest
* Add more test cases on DatabaseMetaDataManagerTest
* Refactor ConsistencyCheckTasksRunnerTest
* Refactor ConsistencyCheckTasksRunnerTest
* Refactor ConsistencyCheckTasksRunnerTest
---
.../task/ConsistencyCheckTasksRunnerTest.java | 279 +++++++++++++++------
1 file changed, 209 insertions(+), 70 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
index 669115c09a4..b84769fc28c 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunnerTest.java
@@ -60,13 +60,11 @@ import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.internal.configuration.plugins.Plugins;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -143,12 +141,13 @@ class ConsistencyCheckTasksRunnerTest {
ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
-
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
PipelineProcessConfigurationPersistService processConfigPersistService
= mock(PipelineProcessConfigurationPersistService.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
runner, processConfigPersistService);
PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null), new
PipelineWriteConfiguration(1, 1, null), null);
PipelineJobType<PipelineJobConfiguration> parentJobType =
mock(PipelineJobType.class);
when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
PipelineJobConfiguration parentJobConfig =
mock(PipelineJobConfiguration.class);
when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
@@ -164,17 +163,29 @@ class ConsistencyCheckTasksRunnerTest {
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
when(jobFacade.getCheck()).thenReturn(checkRepository);
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenAnswer(invocation
-> {
+ PipelineLifecycleRunnable runnable = invocation.getArgument(0);
+ runnable.start();
+ return CompletableFuture.completedFuture(null);
+ });
try (
MockedConstruction<PipelineJobConfigurationManager> ignore =
mockConstruction(PipelineJobConfigurationManager.class,
(mock, context) ->
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
- MockedStatic<PipelineProcessConfigurationUtils>
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+ MockedStatic<PipelineProcessConfigurationUtils>
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class);
+ MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
processConfigMocked.when(() ->
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig),
any(TransmissionProcessContext.class),
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
- AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
- invokeRunBlocking(runnable);
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
null);
+ runner.start();
verify(jobItemManager).persistProgress(jobItemContext);
verify(checkRepository).persistCheckJobResult(PARENT_JOB_ID,
CHECK_JOB_ID, checkResult);
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(),
notNullValue());
+ runner.stop();
+ verify(checker).cancel();
}
}
@@ -182,17 +193,17 @@ class ConsistencyCheckTasksRunnerTest {
@Test
void assertRunBlockingSkipPersistWhenStopping() throws
ReflectiveOperationException {
ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
- jobItemContext.setStopping(true);
ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
PipelineProcessConfigurationPersistService processConfigPersistService
= mock(PipelineProcessConfigurationPersistService.class);
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("processConfigPersistService"),
runner, processConfigPersistService);
- PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null),
- new PipelineWriteConfiguration(1, 1, null), null);
+ PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(
+ new PipelineReadConfiguration(1, 1, 1, null), new
PipelineWriteConfiguration(1, 1, null), null);
PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
PipelineJobType<PipelineJobConfiguration> parentJobType =
mock(PipelineJobType.class);
when(parentJobType.getType()).thenReturn("CONSISTENCY_CHECK");
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
when(PipelineJobIdUtils.parseJobType(PARENT_JOB_ID)).thenReturn(parentJobType);
PipelineJobConfiguration parentJobConfig =
mock(PipelineJobConfiguration.class);
when(parentJobConfig.getJobId()).thenReturn(PARENT_JOB_ID);
@@ -200,16 +211,29 @@ class ConsistencyCheckTasksRunnerTest {
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
when(processConfigPersistService.load(parentContextKey,
"CONSISTENCY_CHECK")).thenReturn(processConfig);
Map<String, TableDataConsistencyCheckResult> checkResult =
Collections.singletonMap("t_user", new TableDataConsistencyCheckResult(true));
-
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(),
jobItemContext.getJobConfig().getAlgorithmProps())).thenReturn(checkResult);
+
when(checker.check(jobItemContext.getJobConfig().getAlgorithmTypeName(),
jobItemContext.getJobConfig().getAlgorithmProps())).thenAnswer(invocation -> {
+ jobItemContext.setStopping(true);
+ return checkResult;
+ });
PipelineJobCheckGovernanceRepository checkRepository =
mock(PipelineJobCheckGovernanceRepository.class);
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenAnswer(invocation
-> {
+ PipelineLifecycleRunnable runnable = invocation.getArgument(0);
+ runnable.start();
+ return CompletableFuture.completedFuture(null);
+ });
try (
MockedConstruction<PipelineJobConfigurationManager> ignored =
mockConstruction(PipelineJobConfigurationManager.class,
(mock, context) ->
when(mock.getJobConfiguration(PARENT_JOB_ID)).thenReturn(parentJobConfig));
- MockedStatic<PipelineProcessConfigurationUtils>
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class)) {
+ MockedStatic<PipelineProcessConfigurationUtils>
processConfigMocked = mockStatic(PipelineProcessConfigurationUtils.class);
+ MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
processConfigMocked.when(() ->
PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
when(parentJobType.buildDataConsistencyChecker(eq(parentJobConfig),
any(TransmissionProcessContext.class),
eq(jobItemContext.getProgressContext()))).thenReturn(checker);
- AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
- invokeRunBlocking(runnable);
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
null);
+ runner.start();
verify(jobItemManager).persistProgress(jobItemContext);
verifyNoInteractions(checkRepository);
assertThat(jobItemContext.getProgressContext().getCheckEndTimeMillis(),
notNullValue());
@@ -220,13 +244,27 @@ class ConsistencyCheckTasksRunnerTest {
@Test
void assertOnSuccessWhenStopping() throws ReflectiveOperationException {
ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
- jobItemContext.setStopping(true);
ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
PipelineJobManager jobManager = mock(PipelineJobManager.class);
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
- ExecuteCallback callback = createCheckExecuteCallback(runner);
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+ AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ callbackRef.set(invocation.getArgument(1));
+ return null;
+ });
+ runner.start();
+ }
+ ExecuteCallback callback = callbackRef.get();
+ jobItemContext.setStopping(true);
callback.onSuccess();
assertThat(jobItemContext.getStatus(), is(JobStatus.RUNNING));
verifyNoInteractions(jobManager);
@@ -244,6 +282,7 @@ class ConsistencyCheckTasksRunnerTest {
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
PipelineContextKey parentContextKey = new
PipelineContextKey("parent_db", InstanceType.JDBC);
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
@@ -252,7 +291,20 @@ class ConsistencyCheckTasksRunnerTest {
when(jobFacade.getCheck()).thenReturn(checkRepository);
Map<String, TableDataConsistencyCheckResult> checkResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
when(checkRepository.getCheckJobResult(PARENT_JOB_ID,
CHECK_JOB_ID)).thenReturn(checkResult);
- ExecuteCallback callback = createCheckExecuteCallback(runner);
+ AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ callbackRef.set(invocation.getArgument(1));
+ return null;
+ });
+ runner.start();
+ }
+ ExecuteCallback callback = callbackRef.get();
callback.onSuccess();
assertThat(jobItemContext.getStatus(), is(JobStatus.FINISHED));
verify(jobItemManager).persistProgress(jobItemContext);
@@ -270,6 +322,7 @@ class ConsistencyCheckTasksRunnerTest {
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
PipelineContextKey parentContextKey = new
PipelineContextKey("parent_db", InstanceType.JDBC);
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
@@ -277,7 +330,68 @@ class ConsistencyCheckTasksRunnerTest {
PipelineJobCheckGovernanceRepository checkRepository =
mock(PipelineJobCheckGovernanceRepository.class);
when(jobFacade.getCheck()).thenReturn(checkRepository);
when(checkRepository.getCheckJobResult(PARENT_JOB_ID,
CHECK_JOB_ID)).thenReturn(Collections.emptyMap());
- ExecuteCallback callback = createCheckExecuteCallback(runner);
+ AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ callbackRef.set(invocation.getArgument(1));
+ return null;
+ });
+ runner.start();
+ }
+ ExecuteCallback callback = callbackRef.get();
+ callback.onSuccess();
+ assertThat(jobItemContext.getStatus(),
is(JobStatus.CONSISTENCY_CHECK_FAILURE));
+ verify(jobItemManager).persistProgress(jobItemContext);
+ verify(jobManager).stop(CHECK_JOB_ID);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertOnSuccessUpdateToFailureWhenResultContainsFailedItem() throws
ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
mock(PipelineJobItemManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobItemManager"),
runner, jobItemManager);
+ PipelineContextKey checkContextKey = new
PipelineContextKey("check_db_fail", InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+ PipelineContextKey parentContextKey = new
PipelineContextKey("parent_db", InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(PARENT_JOB_ID)).thenReturn(parentContextKey);
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
+ PipelineGovernanceFacade checkGovernanceFacade =
mock(PipelineGovernanceFacade.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(checkGovernanceFacade);
+ PipelineJobItemFacade jobItemFacade =
mock(PipelineJobItemFacade.class);
+ PipelineJobItemProcessGovernanceRepository processRepository =
mock(PipelineJobItemProcessGovernanceRepository.class);
+ when(jobItemFacade.getProcess()).thenReturn(processRepository);
+
when(checkGovernanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(parentContextKey)).thenReturn(governanceFacade);
+ PipelineJobFacade jobFacade = mock(PipelineJobFacade.class);
+ when(governanceFacade.getJobFacade()).thenReturn(jobFacade);
+ PipelineJobCheckGovernanceRepository checkRepository =
mock(PipelineJobCheckGovernanceRepository.class);
+ when(jobFacade.getCheck()).thenReturn(checkRepository);
+ Map<String, TableDataConsistencyCheckResult> checkResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(false));
+ when(checkRepository.getCheckJobResult(PARENT_JOB_ID,
CHECK_JOB_ID)).thenReturn(checkResult);
+ AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ callbackRef.set(invocation.getArgument(1));
+ return null;
+ });
+ runner.start();
+ }
+ ExecuteCallback callback = callbackRef.get();
callback.onSuccess();
assertThat(jobItemContext.getStatus(),
is(JobStatus.CONSISTENCY_CHECK_FAILURE));
verify(jobItemManager).persistProgress(jobItemContext);
@@ -292,8 +406,22 @@ class ConsistencyCheckTasksRunnerTest {
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
when(checker.isCanceling()).thenReturn(true);
- setConsistencyChecker(runner, checker);
- ExecuteCallback callback = createCheckExecuteCallback(runner);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker"),
runner, new AtomicReference<>(checker));
+ AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ callbackRef.set(invocation.getArgument(1));
+ return null;
+ });
+ runner.start();
+ }
+ ExecuteCallback callback = callbackRef.get();
callback.onFailure(new RuntimeException("cancel"));
verify(jobManager).stop(CHECK_JOB_ID);
}
@@ -306,16 +434,32 @@ class ConsistencyCheckTasksRunnerTest {
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
when(checker.isCanceling()).thenReturn(false);
- setConsistencyChecker(runner, checker);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker"),
runner, new AtomicReference<>(checker));
PipelineContextKey checkContextKey = new
PipelineContextKey("check_db", InstanceType.JDBC);
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
PipelineJobItemFacade jobItemFacade =
mock(PipelineJobItemFacade.class);
when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
PipelineJobItemErrorMessageGovernanceRepository errorRepository =
mock(PipelineJobItemErrorMessageGovernanceRepository.class);
+ PipelineJobItemProcessGovernanceRepository processRepository =
mock(PipelineJobItemProcessGovernanceRepository.class);
+ when(jobItemFacade.getProcess()).thenReturn(processRepository);
when(jobItemFacade.getErrorMessage()).thenReturn(errorRepository);
- ExecuteCallback callback = createCheckExecuteCallback(runner);
+ AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ callbackRef.set(invocation.getArgument(1));
+ return null;
+ });
+ runner.start();
+ }
+ ExecuteCallback callback = callbackRef.get();
RuntimeException failure = new RuntimeException("failure");
callback.onFailure(failure);
verify(errorRepository).update(CHECK_JOB_ID, 0, failure);
@@ -323,20 +467,49 @@ class ConsistencyCheckTasksRunnerTest {
}
@Test
- void assertDoStopWithoutChecker() throws ReflectiveOperationException {
- ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(createJobItemContext());
- AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
- invokeDoStop(runnable);
+ void assertOnFailurePersistErrorWhenCheckerAbsent() throws
ReflectiveOperationException {
+ ConsistencyCheckJobItemContext jobItemContext = createJobItemContext();
+ ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(jobItemContext);
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(ConsistencyCheckTasksRunner.class.getDeclaredField("jobManager"),
runner, jobManager);
+ PipelineContextKey checkContextKey = new
PipelineContextKey("check_db_null_checker", InstanceType.JDBC);
+
when(PipelineJobIdUtils.parseContextKey(CHECK_JOB_ID)).thenReturn(checkContextKey);
+ when(PipelineJobIdUtils.parseJobType(CHECK_JOB_ID)).thenReturn(new
ConsistencyCheckJobType());
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(checkContextKey)).thenReturn(governanceFacade);
+ PipelineJobItemFacade jobItemFacade =
mock(PipelineJobItemFacade.class);
+ PipelineJobItemProcessGovernanceRepository processRepository =
mock(PipelineJobItemProcessGovernanceRepository.class);
+ when(jobItemFacade.getProcess()).thenReturn(processRepository);
+ when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+ PipelineJobItemErrorMessageGovernanceRepository errorRepository =
mock(PipelineJobItemErrorMessageGovernanceRepository.class);
+ when(jobItemFacade.getErrorMessage()).thenReturn(errorRepository);
+ AtomicReference<ExecuteCallback> callbackRef = new AtomicReference<>();
+ ConsistencyCheckProcessContext processContext =
mock(ConsistencyCheckProcessContext.class);
+ PipelineExecuteEngine executeEngine =
mock(PipelineExecuteEngine.class);
+
when(processContext.getConsistencyCheckExecuteEngine()).thenReturn(executeEngine);
+
Plugins.getMemberAccessor().set(ConsistencyCheckJobItemContext.class.getDeclaredField("processContext"),
jobItemContext, processContext);
+
when(executeEngine.submit(any(PipelineLifecycleRunnable.class))).thenReturn(CompletableFuture.completedFuture(null));
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ callbackRef.set(invocation.getArgument(1));
+ return null;
+ });
+ runner.start();
+ }
+ ExecuteCallback callback = callbackRef.get();
+ RuntimeException failure = new RuntimeException("failure");
+ callback.onFailure(failure);
+ verify(errorRepository).update(CHECK_JOB_ID, 0, failure);
+ verify(jobManager).stop(CHECK_JOB_ID);
}
@Test
- void assertDoStopWithChecker() throws ReflectiveOperationException {
+ void assertStopWithoutConsistencyChecker() throws
ReflectiveOperationException {
ConsistencyCheckTasksRunner runner = new
ConsistencyCheckTasksRunner(createJobItemContext());
- PipelineDataConsistencyChecker checker =
mock(PipelineDataConsistencyChecker.class);
- setConsistencyChecker(runner, checker);
- AbstractPipelineLifecycleRunnable runnable =
createCheckPipelineLifecycleRunnable(runner);
- invokeDoStop(runnable);
- verify(checker).cancel();
+ PipelineLifecycleRunnable checkExecutor = (PipelineLifecycleRunnable)
Plugins.getMemberAccessor().get(ConsistencyCheckTasksRunner.class.getDeclaredField("checkExecutor"),
runner);
+ markLifecycleRunnableRunning(checkExecutor);
+ runner.stop();
+ assertTrue(runner.getJobItemContext().isStopping());
}
private ConsistencyCheckJobItemContext createJobItemContext() {
@@ -344,44 +517,10 @@ class ConsistencyCheckTasksRunnerTest {
return new ConsistencyCheckJobItemContext(jobConfig, 0,
JobStatus.RUNNING, null);
}
- private ExecuteCallback createCheckExecuteCallback(final
ConsistencyCheckTasksRunner runner) throws ReflectiveOperationException {
- for (Class<?> each :
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
- if ("CheckExecuteCallback".equals(each.getSimpleName())) {
- Constructor<?> constructor =
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
- constructor.setAccessible(true);
- return (ExecuteCallback) constructor.newInstance(runner);
- }
- }
- throw new IllegalStateException("CheckExecuteCallback not found");
- }
-
@SuppressWarnings("unchecked")
- private void setConsistencyChecker(final ConsistencyCheckTasksRunner
runner, final PipelineDataConsistencyChecker checker) throws
ReflectiveOperationException {
- Field field =
ConsistencyCheckTasksRunner.class.getDeclaredField("consistencyChecker");
- field.setAccessible(true);
-
((java.util.concurrent.atomic.AtomicReference<PipelineDataConsistencyChecker>)
field.get(runner)).set(checker);
- }
-
- private AbstractPipelineLifecycleRunnable
createCheckPipelineLifecycleRunnable(final ConsistencyCheckTasksRunner runner)
throws ReflectiveOperationException {
- for (Class<?> each :
ConsistencyCheckTasksRunner.class.getDeclaredClasses()) {
- if ("CheckPipelineLifecycleRunnable".equals(each.getSimpleName()))
{
- Constructor<?> constructor =
each.getDeclaredConstructor(ConsistencyCheckTasksRunner.class);
- constructor.setAccessible(true);
- return (AbstractPipelineLifecycleRunnable)
constructor.newInstance(runner);
- }
- }
- throw new IllegalStateException("CheckPipelineLifecycleRunnable not
found");
- }
-
- private void invokeRunBlocking(final AbstractPipelineLifecycleRunnable
runnable) throws ReflectiveOperationException {
- Method method = runnable.getClass().getDeclaredMethod("runBlocking");
- method.setAccessible(true);
- method.invoke(runnable);
- }
-
- private void invokeDoStop(final AbstractPipelineLifecycleRunnable
runnable) throws ReflectiveOperationException {
- Method method = runnable.getClass().getDeclaredMethod("doStop");
- method.setAccessible(true);
- method.invoke(runnable);
+ private void markLifecycleRunnableRunning(final PipelineLifecycleRunnable
runnable) throws ReflectiveOperationException {
+ AtomicReference<Boolean> running = (AtomicReference<Boolean>)
Plugins.getMemberAccessor().get(AbstractPipelineLifecycleRunnable.class.getDeclaredField("running"),
runnable);
+ running.set(true);
+
Plugins.getMemberAccessor().set(AbstractPipelineLifecycleRunnable.class.getDeclaredField("startTimeMillis"),
runnable, System.currentTimeMillis());
}
}