This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 85ac022d1db Add CDCJobTest and remove unreach codes for CDCJob (#37416)
85ac022d1db is described below
commit 85ac022d1dbb821f285dff7781a20f94af3723ea
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 17 23:04:41 2025 +0800
Add CDCJobTest and remove unreach codes for CDCJob (#37416)
---
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 4 -
.../data/pipeline/cdc/CDCJobTest.java | 430 +++++++++++++++++++++
2 files changed, 430 insertions(+), 4 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index e6d0206e29c..12c10ca71d0 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -212,10 +212,6 @@ public final class CDCJob implements PipelineJob {
private void executeIncrementalTasks(final Collection<CDCJobItemContext>
jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (CDCJobItemContext each : jobItemContexts) {
- if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
- log.info("Job status has already EXECUTE_INCREMENTAL_TASK,
ignore.");
- return;
- }
updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK,
jobItemManager);
for (PipelineTask task : each.getIncrementalTasks()) {
if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobTest.java
new file mode 100644
index 00000000000..44878e176cb
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobTest.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc;
+
+import io.netty.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
+import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
+import
org.apache.shardingsphere.data.pipeline.core.importer.PipelineRequiredColumnsExtractor;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
+import
org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
+import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({
+ PipelineJobIdUtils.class, PipelineProcessConfigurationUtils.class,
PipelineDataSourceConfigurationFactory.class,
+ OrderedSPILoader.class, PipelineAPIFactory.class,
PipelineJobProgressPersistService.class,
+ PipelineDistributedBarrier.class, ElasticJobServiceLoader.class,
CDCResponseUtils.class
+})
+@MockitoSettings(strictness = Strictness.LENIENT)
+class CDCJobTest {
+
+ private static final PipelineContextKey CONTEXT_KEY = new
PipelineContextKey("logic_db", InstanceType.PROXY);
+
+ @Test
+ void assertExecuteWhenStopping() {
+ CDCJobConfiguration jobConfig = mockJobConfiguration(
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("logic_tbl",
Collections.singletonList(new DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("param");
+ prepareJobTypeAndContext(jobConfig);
+ try (
+ MockedConstruction<PipelineProcessConfigurationPersistService>
ignored = mockPersistService(
+ new PipelineProcessConfiguration(new
PipelineReadConfiguration(1, 1, 1, null), new PipelineWriteConfiguration(1, 1,
null), null))) {
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(CONTEXT_KEY)).thenReturn(mock(PipelineGovernanceFacade.class));
+
when(PipelineDistributedBarrier.getInstance(CONTEXT_KEY)).thenReturn(mock(PipelineDistributedBarrier.class));
+ CDCJob job = new CDCJob(mock(PipelineSink.class));
+ job.getJobRunnerManager().stop();
+ assertDoesNotThrow(() -> job.execute(shardingContext));
+ }
+ }
+
+ @Test
+ void assertExecuteSkipsWhenTasksRunnerMissing() {
+ CDCJobConfiguration jobConfig = mockJobConfiguration(
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("logic_tbl",
Collections.singletonList(new DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("job_param");
+ prepareJobTypeAndContext(jobConfig);
+ PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null), new
PipelineWriteConfiguration(1, 1, null), null);
+ try (MockedConstruction<PipelineProcessConfigurationPersistService>
ignored = mockPersistService(processConfig)) {
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(CONTEXT_KEY)).thenReturn(mock(PipelineGovernanceFacade.class,
RETURNS_DEEP_STUBS));
+
when(PipelineDistributedBarrier.getInstance(CONTEXT_KEY)).thenReturn(mock(PipelineDistributedBarrier.class));
+
when(OrderedSPILoader.getServices(eq(PipelineRequiredColumnsExtractor.class),
anyCollection())).thenReturn(Collections.emptyMap());
+
when(PipelineDataSourceConfigurationFactory.newInstance(anyString(),
anyString())).thenReturn(mock(PipelineDataSourceConfiguration.class));
+ CDCJob job = new CDCJob(mock(PipelineSink.class));
+ PipelineTasksRunner tasksRunner = mock(PipelineTasksRunner.class);
+
when(tasksRunner.getJobItemContext()).thenReturn(mock(org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext.class));
+ job.getJobRunnerManager().addTasksRunner(0, tasksRunner);
+ try (MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ job.execute(shardingContext);
+ triggerMocked.verifyNoInteractions();
+ }
+ }
+ }
+
+ @Test
+ void assertExecuteInitTasksFailureStopsJob() {
+ CDCJobConfiguration jobConfig = mockJobConfiguration(
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("logic_tbl",
Collections.singletonList(new DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("param");
+ prepareJobTypeAndContext(jobConfig);
+ CDCJobAPI jobAPI = mock(CDCJobAPI.class);
+ try (
+ MockedStatic<TypedSPILoader> typedSPILoader =
mockStatic(TypedSPILoader.class);
+ MockedConstruction<PipelineProcessConfigurationPersistService>
ignoredProcess = mockPersistService(
+ new PipelineProcessConfiguration(new
PipelineReadConfiguration(1, 1, 1, null), new PipelineWriteConfiguration(1, 1,
null), null));
+ MockedConstruction<CDCJobPreparer> ignoredPreparer =
mockConstruction(CDCJobPreparer.class, (mock, context) ->
doThrow(RuntimeException.class).when(mock).initTasks(anyCollection()));
+ MockedStatic<PipelineJobRegistry> jobRegistryMocked =
mockStatic(PipelineJobRegistry.class)) {
+ typedSPILoader.when(() ->
TypedSPILoader.getService(TransmissionJobAPI.class,
"STREAMING")).thenReturn(jobAPI);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class, RETURNS_DEEP_STUBS);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(CONTEXT_KEY)).thenReturn(governanceFacade);
+
when(PipelineDistributedBarrier.getInstance(CONTEXT_KEY)).thenReturn(mock(PipelineDistributedBarrier.class));
+
when(OrderedSPILoader.getServices(eq(PipelineRequiredColumnsExtractor.class),
anyCollection())).thenReturn(Collections.emptyMap());
+
when(PipelineDataSourceConfigurationFactory.newInstance(anyString(),
anyString())).thenReturn(mock(PipelineDataSourceConfiguration.class));
+ CDCJob job = new CDCJob(mock(PipelineSink.class));
+ assertThrows(RuntimeException.class, () ->
job.execute(shardingContext));
+
verify(governanceFacade.getJobItemFacade().getErrorMessage()).update(eq("foo_job_id"),
anyInt(), any(RuntimeException.class));
+ verify(jobAPI).disable("foo_job_id");
+ jobRegistryMocked.verify(() ->
PipelineJobRegistry.stop("foo_job_id"));
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Test
+ void assertExecuteInventorySuccessAndSkipIncrementalWhenRunning() {
+ CDCJobConfiguration jobConfig = mockJobConfiguration(
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("logic_tbl",
Collections.singletonList(new DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("jobParam");
+ prepareJobTypeAndContext(jobConfig);
+ PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null),
+ new PipelineWriteConfiguration(1, 1, new
AlgorithmConfiguration("RATE_LIMIT", new Properties())), new
AlgorithmConfiguration("MEMORY", new Properties()));
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(CONTEXT_KEY)).thenReturn(mock(PipelineGovernanceFacade.class,
RETURNS_DEEP_STUBS));
+
when(PipelineDistributedBarrier.getInstance(CONTEXT_KEY)).thenReturn(mock(PipelineDistributedBarrier.class));
+ when(PipelineDataSourceConfigurationFactory.newInstance(anyString(),
anyString())).thenReturn(mock(PipelineDataSourceConfiguration.class));
+ YamlRuleConfiguration ruleConfig = mock(YamlRuleConfiguration.class);
+ PipelineRequiredColumnsExtractor extractor =
mock(PipelineRequiredColumnsExtractor.class);
+ Map<ShardingSphereIdentifier, Collection<String>> requiredColumns =
Collections.singletonMap(new ShardingSphereIdentifier("logic_tbl"),
Collections.singleton("id"));
+ when(extractor.getTableAndRequiredColumnsMap(eq(ruleConfig),
anyCollection())).thenReturn(requiredColumns);
+
when(OrderedSPILoader.getServices(eq(PipelineRequiredColumnsExtractor.class),
anyCollection())).thenReturn(Collections.singletonMap(ruleConfig, extractor));
+ AtomicReference<CDCJobItemContext> capturedContext = new
AtomicReference<>();
+ try (
+ MockedStatic<TypedSPILoader> typedSPILoader =
mockStatic(TypedSPILoader.class);
+ MockedConstruction<PipelineProcessConfigurationPersistService>
ignoredProcess = mockPersistService(processConfig);
+ MockedConstruction<CDCJobPreparer> ignoredPreparer =
mockConstruction(CDCJobPreparer.class, (mock, context) -> doAnswer(invocation
-> {
+ CDCJobItemContext jobItemContext =
((Collection<CDCJobItemContext>) invocation.getArgument(0)).iterator().next();
+ jobItemContext.getInventoryTasks().add(mockTask(new
IngestFinishedPosition(), Collections.emptyList()));
+ jobItemContext.getInventoryTasks().add(mockTask(new
IngestPlaceholderPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null))));
+
jobItemContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
+ capturedContext.set(jobItemContext);
+ return null;
+ }).when(mock).initTasks(anyCollection()));
+ MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ typedSPILoader.when(() ->
TypedSPILoader.getService(eq(JobRateLimitAlgorithm.class), anyString(),
any(Properties.class))).thenReturn(mock(JobRateLimitAlgorithm.class));
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ ExecuteCallback callback = invocation.getArgument(1);
+ callback.onSuccess();
+ return null;
+ });
+ new CDCJob(mock(PipelineSink.class)).execute(shardingContext);
+ }
+ assertThat(capturedContext.get().getStatus(),
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertExecuteInventoryFuturesEmptyAndIncrementalSuccessWhenStopping()
{
+ CDCJobConfiguration jobConfig =
mockJobConfiguration(Collections.singletonList(
+ new JobDataNodeLine(Collections.singletonList(new
JobDataNodeEntry("logic_tbl", Collections.singletonList(new
DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("param");
+ prepareJobTypeAndContext(jobConfig);
+ try (
+ MockedConstruction<PipelineProcessConfigurationPersistService>
ignoredProcess = mockPersistService(
+ new PipelineProcessConfiguration(new
PipelineReadConfiguration(1, 1, 1, null), new PipelineWriteConfiguration(1, 1,
null), null));
+ MockedConstruction<CDCJobPreparer> ignoredPreparer =
mockConstruction(CDCJobPreparer.class, (mock, context) -> doAnswer(invocation
-> {
+ CDCJobItemContext jobItemContext =
((Collection<CDCJobItemContext>) invocation.getArgument(0)).iterator().next();
+ jobItemContext.setStopping(true);
+ jobItemContext.getInventoryTasks().add(mockTask(new
IngestFinishedPosition(), Collections.emptyList()));
+ jobItemContext.getIncrementalTasks().add(mockTask(new
IngestPlaceholderPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null))));
+ return null;
+ }).when(mock).initTasks(anyCollection()));
+ MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(CONTEXT_KEY)).thenReturn(mock(PipelineGovernanceFacade.class,
RETURNS_DEEP_STUBS));
+
when(PipelineDistributedBarrier.getInstance(CONTEXT_KEY)).thenReturn(mock(PipelineDistributedBarrier.class));
+
when(OrderedSPILoader.getServices(eq(PipelineRequiredColumnsExtractor.class),
anyCollection())).thenReturn(Collections.emptyMap());
+
when(PipelineDataSourceConfigurationFactory.newInstance(anyString(),
anyString())).thenReturn(mock(PipelineDataSourceConfiguration.class));
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ ExecuteCallback callback = invocation.getArgument(1);
+ callback.onSuccess();
+ return null;
+ });
+ CDCJob job = new CDCJob(mock(PipelineSink.class));
+ job.execute(shardingContext);
+ triggerMocked.verify(() ->
PipelineExecuteEngine.trigger(anyCollection(), any()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertExecuteIncrementalFailureSendError() {
+ CDCJobConfiguration jobConfig = mockJobConfiguration(
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("logic_tbl",
Collections.singletonList(new DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("param");
+ prepareJobTypeAndContext(jobConfig);
+ CDCJobAPI jobAPI = mock(CDCJobAPI.class);
+ try (
+ MockedStatic<TypedSPILoader> typedSPILoader =
mockStatic(TypedSPILoader.class);
+ MockedConstruction<PipelineProcessConfigurationPersistService>
ignoredProcess = mockPersistService(
+ new PipelineProcessConfiguration(new
PipelineReadConfiguration(1, 1, 1, null), new PipelineWriteConfiguration(1, 1,
null), null));
+ MockedConstruction<CDCJobPreparer> ignoredPreparer =
mockConstruction(CDCJobPreparer.class, (mock, context) -> doAnswer(invocation
-> {
+ CDCJobItemContext jobItemContext =
((Collection<CDCJobItemContext>) invocation.getArgument(0)).iterator().next();
+ jobItemContext.getInventoryTasks().add(mockTask(new
IngestPlaceholderPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null))));
+ jobItemContext.getIncrementalTasks().add(mockTask(new
IngestPlaceholderPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null))));
+ return null;
+ }).when(mock).initTasks(anyCollection()));
+ MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class);
+ MockedStatic<PipelineJobRegistry> jobRegistryMocked =
mockStatic(PipelineJobRegistry.class)) {
+ typedSPILoader.when(() ->
TypedSPILoader.getService(TransmissionJobAPI.class,
"STREAMING")).thenReturn(jobAPI);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class, RETURNS_DEEP_STUBS);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(CONTEXT_KEY)).thenReturn(governanceFacade);
+
when(PipelineDistributedBarrier.getInstance(CONTEXT_KEY)).thenReturn(mock(PipelineDistributedBarrier.class));
+
when(OrderedSPILoader.getServices(eq(PipelineRequiredColumnsExtractor.class),
anyCollection())).thenReturn(Collections.emptyMap());
+
when(PipelineDataSourceConfigurationFactory.newInstance(anyString(),
anyString())).thenReturn(mock(PipelineDataSourceConfiguration.class));
+ PipelineCDCSocketSink sink = mock(PipelineCDCSocketSink.class);
+ Channel channel = mock(Channel.class);
+ when(sink.getChannel()).thenReturn(channel);
+ CDCResponse response = mock(CDCResponse.class);
+ when(CDCResponseUtils.failed(anyString(), anyString(),
anyString())).thenReturn(response);
+ AtomicInteger triggerCounter = new AtomicInteger();
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ ExecuteCallback callback = invocation.getArgument(1);
+ if (0 == triggerCounter.getAndIncrement()) {
+ callback.onSuccess();
+ } else {
+ callback.onFailure(new RuntimeException("failure"));
+ }
+ return null;
+ });
+ CDCJob job = new CDCJob(sink);
+ job.execute(shardingContext);
+ verify(channel, atLeastOnce()).writeAndFlush(response);
+
verify(governanceFacade.getJobItemFacade().getErrorMessage()).update(eq("foo_job_id"),
anyInt(), any(RuntimeException.class));
+ verify(jobAPI).disable("foo_job_id");
+ jobRegistryMocked.verify(() ->
PipelineJobRegistry.stop("foo_job_id"));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertExecuteIncrementalFailureWithoutSocketSink() {
+ CDCJobConfiguration jobConfig = mockJobConfiguration(
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("logic_tbl",
Collections.singletonList(new DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("param");
+ prepareJobTypeAndContext(jobConfig);
+ CDCJobAPI jobAPI = mock(CDCJobAPI.class);
+ try (
+ MockedStatic<TypedSPILoader> typedSPILoader =
mockStatic(TypedSPILoader.class);
+ MockedConstruction<PipelineProcessConfigurationPersistService>
ignoredProcess = mockPersistService(
+ new PipelineProcessConfiguration(new
PipelineReadConfiguration(1, 1, 1, null), new PipelineWriteConfiguration(1, 1,
null), null));
+ MockedConstruction<CDCJobPreparer> ignoredPreparer =
mockConstruction(CDCJobPreparer.class, (mock, context) -> doAnswer(invocation
-> {
+ CDCJobItemContext jobItemContext =
((Collection<CDCJobItemContext>) invocation.getArgument(0)).iterator().next();
+ jobItemContext.getInventoryTasks().add(mockTask(new
IngestPlaceholderPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null))));
+ jobItemContext.getIncrementalTasks().add(mockTask(new
IngestPlaceholderPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null))));
+ return null;
+ }).when(mock).initTasks(anyCollection()));
+ MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class);
+ MockedStatic<PipelineJobRegistry> jobRegistryMocked =
mockStatic(PipelineJobRegistry.class)) {
+ typedSPILoader.when(() ->
TypedSPILoader.getService(TransmissionJobAPI.class,
"STREAMING")).thenReturn(jobAPI);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class, RETURNS_DEEP_STUBS);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(CONTEXT_KEY)).thenReturn(governanceFacade);
+
when(PipelineDistributedBarrier.getInstance(CONTEXT_KEY)).thenReturn(mock(PipelineDistributedBarrier.class));
+
when(OrderedSPILoader.getServices(eq(PipelineRequiredColumnsExtractor.class),
anyCollection())).thenReturn(Collections.emptyMap());
+ AtomicInteger triggerCounter = new AtomicInteger();
+ triggerMocked.when(() ->
PipelineExecuteEngine.trigger(anyCollection(), any())).thenAnswer(invocation ->
{
+ ExecuteCallback callback = invocation.getArgument(1);
+ if (0 == triggerCounter.getAndIncrement()) {
+ callback.onSuccess();
+ } else {
+ callback.onFailure(new RuntimeException("failure"));
+ }
+ return null;
+ });
+ new CDCJob(mock(PipelineSink.class)).execute(shardingContext);
+
verify(governanceFacade.getJobItemFacade().getErrorMessage()).update(eq("foo_job_id"),
anyInt(), any(RuntimeException.class));
+ verify(jobAPI).disable("foo_job_id");
+ jobRegistryMocked.verify(() ->
PipelineJobRegistry.stop("foo_job_id"));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertExecuteIncrementalSkipsFinishedTasks() {
+ CDCJobConfiguration jobConfig = mockJobConfiguration(
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("logic_tbl",
Collections.singletonList(new DataNode("ds_0.tbl_0")))))));
+ ShardingContext shardingContext = mockShardingContext("param");
+ prepareJobTypeAndContext(jobConfig);
+ PipelineProcessConfiguration processConfig = new
PipelineProcessConfiguration(new PipelineReadConfiguration(1, 1, 1, null), new
PipelineWriteConfiguration(1, 1, null), null);
+ AtomicReference<PipelineTask> finishedTaskRef = new
AtomicReference<>();
+ try (
+ MockedConstruction<PipelineProcessConfigurationPersistService>
ignoredProcess = mockPersistService(processConfig);
+ MockedConstruction<CDCJobPreparer> ignoredPreparer =
mockConstruction(CDCJobPreparer.class, (mock, context) -> doAnswer(invocation
-> {
+ CDCJobItemContext jobItemContext =
((Collection<CDCJobItemContext>) invocation.getArgument(0)).iterator().next();
+ PipelineTask finishedTask = mockTask(new
IngestFinishedPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null)));
+ finishedTaskRef.set(finishedTask);
+ PipelineTask activeTask = mockTask(new
IngestPlaceholderPosition(),
Collections.singletonList(CompletableFuture.completedFuture(null)));
+ jobItemContext.getInventoryTasks().add(mockTask(new
IngestFinishedPosition(), Collections.emptyList()));
+ jobItemContext.getIncrementalTasks().add(finishedTask);
+ jobItemContext.getIncrementalTasks().add(activeTask);
+ return null;
+ }).when(mock).initTasks(anyCollection()));
+ MockedStatic<PipelineExecuteEngine> triggerMocked =
mockStatic(PipelineExecuteEngine.class)) {
+ new CDCJob(mock(PipelineSink.class)).execute(shardingContext);
+ verify(finishedTaskRef.get(), never()).start();
+ triggerMocked.verify(() ->
PipelineExecuteEngine.trigger(anyCollection(), any()));
+ }
+ }
+
+ private CDCJobConfiguration mockJobConfiguration(final
List<JobDataNodeLine> jobShardingDataNodes) {
+ CDCJobConfiguration result = mock(CDCJobConfiguration.class);
+ when(result.getJobId()).thenReturn("foo_job_id");
+
when(result.getJobShardingCount()).thenReturn(jobShardingDataNodes.size());
+
when(result.getJobShardingDataNodes()).thenReturn(jobShardingDataNodes);
+ ShardingSpherePipelineDataSourceConfiguration dataSourceConfig =
mock(ShardingSpherePipelineDataSourceConfiguration.class);
+ when(dataSourceConfig.getType()).thenReturn("JDBC");
+ when(dataSourceConfig.getParameter()).thenReturn("param");
+
when(dataSourceConfig.getRootConfig()).thenReturn(createYAMLRootConfiguration());
+ when(result.getDataSourceConfig()).thenReturn(dataSourceConfig);
+ return result;
+ }
+
+ private YamlRootConfiguration createYAMLRootConfiguration() {
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setDatabaseName("logic_db");
+ result.setDataSources(Collections.singletonMap("ds_0",
Collections.emptyMap()));
+ return result;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void prepareJobTypeAndContext(final CDCJobConfiguration jobConfig)
{
+ YamlPipelineJobConfigurationSwapper jobConfigSwapper =
mock(YamlPipelineJobConfigurationSwapper.class);
+ when(jobConfigSwapper.swapToObject(anyString())).thenReturn(jobConfig);
+ PipelineJobType<?> jobType = mock(PipelineJobType.class,
RETURNS_DEEP_STUBS);
+
when(jobType.getOption().getYamlJobConfigurationSwapper()).thenReturn(jobConfigSwapper);
+ when(jobType.getType()).thenReturn("STREAMING");
+
when(PipelineJobIdUtils.parseJobType("foo_job_id")).thenReturn(jobType);
+
when(PipelineJobIdUtils.parseContextKey("foo_job_id")).thenReturn(CONTEXT_KEY);
+ }
+
+ private MockedConstruction<PipelineProcessConfigurationPersistService>
mockPersistService(final PipelineProcessConfiguration processConfig) {
+ MockedConstruction<PipelineProcessConfigurationPersistService> result
= mockConstruction(
+ PipelineProcessConfigurationPersistService.class, (mock,
context) -> when(mock.load(CONTEXT_KEY,
"STREAMING")).thenReturn(processConfig));
+
when(PipelineProcessConfigurationUtils.fillInDefaultValue(processConfig)).thenReturn(processConfig);
+ return result;
+ }
+
+ private ShardingContext mockShardingContext(final String jobParameter) {
+ ShardingContext result = mock(ShardingContext.class);
+ when(result.getJobName()).thenReturn("foo_job_id");
+ when(result.getJobParameter()).thenReturn(jobParameter);
+ return result;
+ }
+
+ private PipelineTask mockTask(final IngestPosition position, final
Collection<CompletableFuture<?>> futures) {
+ PipelineTask result = mock(PipelineTask.class, RETURNS_DEEP_STUBS);
+ when(result.getTaskProgress().getPosition()).thenReturn(position);
+ when(result.start()).thenReturn(futures);
+ return result;
+ }
+}