This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5dd4fa85fd0 Add CDCJobAPITest (#37378)
5dd4fa85fd0 is described below
commit 5dd4fa85fd0d828b1799fe4a9b452ce62fb6ca64
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 14 00:01:43 2025 +0800
Add CDCJobAPITest (#37378)
* Add CDCJobAPITest
* Add CDCJobAPITest
* Add CDCJobAPITest
* Add CDCJobAPITest
---
kernel/data-pipeline/scenario/cdc/core/pom.xml | 6 +
.../data/pipeline/cdc/api/CDCJobAPITest.java | 466 +++++++++++++++++++++
2 files changed, 472 insertions(+)
diff --git a/kernel/data-pipeline/scenario/cdc/core/pom.xml
b/kernel/data-pipeline/scenario/cdc/core/pom.xml
index d89ffd327fe..da5c04e33e0 100644
--- a/kernel/data-pipeline/scenario/cdc/core/pom.xml
+++ b/kernel/data-pipeline/scenario/cdc/core/pom.xml
@@ -38,6 +38,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-test-infra-framework</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-test-infra-fixture-database</artifactId>
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPITest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPITest.java
new file mode 100644
index 00000000000..9bb070381a0
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPITest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.api;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemFacade;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemProcessGovernanceRepository;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
+import
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({PipelineAPIFactory.class, PipelineJobIdUtils.class,
PipelineJobRegistry.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+class CDCJobAPITest {
+
+ private MockedStatic<DatabaseTypedSPILoader> databaseTypedSPILoader;
+
+ private CDCJobAPI jobAPI;
+
+ @BeforeEach
+ void setUp() throws ReflectiveOperationException {
+ databaseTypedSPILoader = mockStatic(DatabaseTypedSPILoader.class,
this::mockDatabaseTypedSPILoader);
+ jobAPI = createJobAPI();
+ }
+
+ private Object mockDatabaseTypedSPILoader(final InvocationOnMock
invocation) {
+ if ("findService".equals(invocation.getMethod().getName())) {
+ return Optional.empty();
+ }
+ Class<?> targetClass = invocation.getArgument(0);
+ if (DialectPipelineSQLBuilder.class.equals(targetClass)) {
+ return mock(DialectPipelineSQLBuilder.class);
+ }
+ if (DialectIncrementalPositionManager.class.equals(targetClass)) {
+ return mock(DialectIncrementalPositionManager.class);
+ }
+ return mock(targetClass);
+ }
+
+ private CDCJobAPI createJobAPI() throws ReflectiveOperationException {
+ when(PipelineJobIdUtils.marshal(any())).thenReturn("foo_job");
+ when(PipelineJobIdUtils.parseContextKey(anyString())).thenReturn(new
PipelineContextKey("foo_db", InstanceType.PROXY));
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(any())).thenReturn(mock(PipelineGovernanceFacade.class,
RETURNS_DEEP_STUBS));
+ YamlDataSourceConfigurationSwapper dataSourceSwapper =
mock(YamlDataSourceConfigurationSwapper.class);
+
when(dataSourceSwapper.swapToMap(any())).thenReturn(createStandardDataSourceProperties());
+ CDCJobAPI result = new CDCJobAPI();
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("dataSourceConfigSwapper"),
result, dataSourceSwapper);
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("ruleConfigSwapperEngine"),
result, mock(YamlRuleConfigurationSwapperEngine.class));
+ return result;
+ }
+
+ private Map<String, Object> createStandardDataSourceProperties() {
+ Map<String, Object> result = new LinkedHashMap<>(4, 1F);
+ result.put("url", "jdbc:h2:mem:foo_db");
+ result.put("username", "root");
+ result.put("password", "pwd");
+ result.put("dataSourceClassName",
"com.zaxxer.hikari.HikariDataSource");
+ return result;
+ }
+
+ @AfterEach
+ void tearDown() {
+ PipelineContextManager.removeContext(new PipelineContextKey("foo_db",
InstanceType.PROXY));
+ PipelineContextManager.removeContext(new
PipelineContextKey(InstanceType.PROXY));
+ if (null != databaseTypedSPILoader) {
+ databaseTypedSPILoader.close();
+ }
+ }
+
+ @Test
+ void assertCreateThrowsWhenJobShardingCountIsZero() {
+ putContext(Collections.singletonMap("foo_ds",
mock(StorageUnit.class)));
+ CDCJobConfiguration jobConfig = new CDCJobConfiguration("foo_job",
"foo_db", Collections.emptyList(), false, mock(DatabaseType.class),
+ mock(ShardingSpherePipelineDataSourceConfiguration.class), new
JobDataNodeLine(Collections.emptyList()), Collections.emptyList(),
+ false, new
CDCJobConfiguration.SinkConfiguration(CDCSinkType.SOCKET, new Properties()), 1,
0);
+ try (
+ MockedConstruction<YamlCDCJobConfigurationSwapper>
jobConfigSwapper = mockConstruction(YamlCDCJobConfigurationSwapper.class,
+ (mock, context) ->
when(mock.swapToObject(any(YamlCDCJobConfiguration.class))).thenReturn(jobConfig)))
{
+ StreamDataParameter param = new StreamDataParameter("foo_db", new
LinkedList<>(), false, Collections.emptyMap(), false);
+
assertThrows(PipelineJobCreationWithInvalidShardingCountException.class, () ->
jobAPI.create(param, CDCSinkType.SOCKET, new Properties()));
+ assertThat(jobConfigSwapper.constructed().size(), is(1));
+ }
+ }
+
+ @Test
+ void assertCreateSkipsExistingJob() throws ReflectiveOperationException {
+ putContext(Collections.singletonMap("foo_ds",
mock(StorageUnit.class)));
+ CDCJobConfiguration jobConfig = createJobConfiguration(1);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class, RETURNS_DEEP_STUBS);
+
when(governanceFacade.getJobFacade().getConfiguration().isExisted("foo_job")).thenReturn(true);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(any())).thenReturn(governanceFacade);
+ PipelineJobConfigurationManager jobConfigManager =
mock(PipelineJobConfigurationManager.class);
+
when(jobConfigManager.convertToJobConfigurationPOJO(jobConfig)).thenReturn(createJobConfigurationPOJO());
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("jobConfigManager"),
jobAPI, jobConfigManager);
+ try (
+ MockedConstruction<YamlCDCJobConfigurationSwapper> ignored =
mockConstruction(YamlCDCJobConfigurationSwapper.class,
+ (mock, context) ->
when(mock.swapToObject(any(YamlCDCJobConfiguration.class))).thenReturn(jobConfig)))
{
+ StreamDataParameter param = new StreamDataParameter("foo_db", new
LinkedList<>(Collections.singletonList("foo_schema.foo_tbl")), true,
+ Collections.singletonMap("foo_schema.foo_tbl",
Collections.singletonList(new DataNode("foo_ds" + ".foo_tbl"))), false);
+ assertThat(jobAPI.create(param, CDCSinkType.SOCKET, new
Properties()), is("foo_job"));
+ }
+ }
+
+ @Test
+ void assertCreateFullJobSkipsInitIncrementalPosition() throws
ReflectiveOperationException {
+ putContext(Collections.singletonMap("foo_ds",
mock(StorageUnit.class)));
+ CDCJobConfiguration jobConfig = new CDCJobConfiguration("foo_job",
"foo_db", Collections.singletonList("foo_schema.foo_tbl"), true,
+ mock(DatabaseType.class),
createJobConfiguration(1).getDataSourceConfig(), new
JobDataNodeLine(Collections.singletonList(
+ new JobDataNodeEntry("foo_tbl",
Collections.singletonList(new DataNode("foo_ds" + ".foo_tbl"))))),
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("foo_tbl",
+ Collections.singletonList(new DataNode("foo_ds" +
".foo_tbl")))))),
+ false, new
CDCJobConfiguration.SinkConfiguration(CDCSinkType.SOCKET, new Properties()), 1,
0);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(any())).thenReturn(mock(PipelineGovernanceFacade.class,
RETURNS_DEEP_STUBS));
+ PipelineJobConfigurationManager jobConfigManager =
mock(PipelineJobConfigurationManager.class);
+
when(jobConfigManager.convertToJobConfigurationPOJO(jobConfig)).thenReturn(createJobConfigurationPOJO());
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("jobConfigManager"),
jobAPI, jobConfigManager);
+ try (
+ MockedConstruction<YamlCDCJobConfigurationSwapper> ignored =
mockConstruction(YamlCDCJobConfigurationSwapper.class,
+ (mock, context) ->
when(mock.swapToObject(any(YamlCDCJobConfiguration.class))).thenReturn(jobConfig));
+ MockedConstruction<IncrementalTaskPositionManager>
positionManagerConstruction =
mockConstruction(IncrementalTaskPositionManager.class)) {
+ StreamDataParameter param = new StreamDataParameter("foo_db", new
LinkedList<>(Collections.singletonList("foo_schema.foo_tbl")), true,
+ Collections.singletonMap("foo_schema.foo_tbl",
Collections.singletonList(new DataNode("foo_ds" + ".foo_tbl"))), false);
+ assertThat(jobAPI.create(param, CDCSinkType.SOCKET, new
Properties()), is("foo_job"));
+ assertTrue(positionManagerConstruction.constructed().isEmpty());
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ void assertCreatePersistAndInitIncrementalPosition() throws
ReflectiveOperationException {
+ Map<String, StorageUnit> storageUnits = new LinkedHashMap<>(2, 1F);
+ storageUnits.put("foo_ds", mock(StorageUnit.class));
+ storageUnits.put("bar_ds", mock(StorageUnit.class));
+ putContext(storageUnits);
+ CDCJobConfiguration jobConfig = createJobConfiguration(2);
+ PipelineJobItemProcessGovernanceRepository processGovernanceRepository
= mock(PipelineJobItemProcessGovernanceRepository.class);
+ PipelineJobItemFacade jobItemFacade =
mock(PipelineJobItemFacade.class);
+
when(jobItemFacade.getProcess()).thenReturn(processGovernanceRepository);
+ PipelineGovernanceFacade governanceFacade =
mock(PipelineGovernanceFacade.class, RETURNS_DEEP_STUBS);
+ when(governanceFacade.getJobItemFacade()).thenReturn(jobItemFacade);
+
when(PipelineAPIFactory.getPipelineGovernanceFacade(any())).thenReturn(governanceFacade);
+ PipelineJobConfigurationManager jobConfigManager =
mock(PipelineJobConfigurationManager.class);
+
when(jobConfigManager.convertToJobConfigurationPOJO(jobConfig)).thenReturn(createJobConfigurationPOJO());
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("jobConfigManager"),
jobAPI, jobConfigManager);
+ try (
+ MockedConstruction<YamlCDCJobConfigurationSwapper>
ignoredJobConfigSwapper = mockConstruction(YamlCDCJobConfigurationSwapper.class,
+ (mock, context) ->
when(mock.swapToObject(any(YamlCDCJobConfiguration.class))).thenReturn(jobConfig));
+ MockedConstruction<PipelineJobItemManager>
ignoredJobItemManager = mockConstruction(PipelineJobItemManager.class,
+ (mock, context) -> {
+ when(mock.getProgress(anyString(),
anyInt())).thenReturn(Optional.empty());
+ when(mock.getProgress("foo_job",
0)).thenReturn(Optional.of(mock(TransmissionJobItemProgress.class)));
+ });
+ MockedConstruction<IncrementalTaskPositionManager>
ignoredPositionManagerConstruction =
mockConstruction(IncrementalTaskPositionManager.class,
+ (mock, context) -> when(mock.getPosition(any(), any(),
any())).thenReturn(new SimpleIngestPosition("binlog_position")))) {
+ StreamDataParameter param =
buildStreamDataParameter(storageUnits.keySet());
+ assertThat(jobAPI.create(param, CDCSinkType.SOCKET, new
Properties()), is("foo_job"));
+ verify(jobConfigManager).convertToJobConfigurationPOJO(jobConfig);
+ verify(processGovernanceRepository).persist(anyString(), anyInt(),
anyString());
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ void assertInitIncrementalPositionThrowsPrepareException() throws
ReflectiveOperationException {
+ putContext(Collections.singletonMap("foo_ds",
mock(StorageUnit.class)));
+ CDCJobConfiguration jobConfig = createJobConfiguration(1);
+ PipelineJobConfigurationManager jobConfigManager =
mock(PipelineJobConfigurationManager.class);
+
when(jobConfigManager.convertToJobConfigurationPOJO(jobConfig)).thenReturn(new
JobConfigurationPOJO());
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("jobConfigManager"),
jobAPI, jobConfigManager);
+ try (
+ MockedConstruction<YamlCDCJobConfigurationSwapper>
ignoredJobConfigSwapper = mockConstruction(YamlCDCJobConfigurationSwapper.class,
+ (mock, context) ->
when(mock.swapToObject(any(YamlCDCJobConfiguration.class))).thenReturn(jobConfig));
+ MockedConstruction<PipelineJobItemManager>
ignoredJobItemManager = mockConstruction(PipelineJobItemManager.class,
+ (mock, context) -> when(mock.getProgress("foo_job",
0)).thenReturn(Optional.empty()));
+ MockedConstruction<IncrementalTaskPositionManager>
positionManagerConstruction = mockConstruction(
+ IncrementalTaskPositionManager.class, (mock, context)
-> when(mock.getPosition(any(), any(), any())).thenThrow(SQLException.class))) {
+ StreamDataParameter param =
buildStreamDataParameter(Collections.singleton("foo_ds"));
+ assertThrows(PrepareJobWithGetBinlogPositionException.class, () ->
jobAPI.create(param, CDCSinkType.SOCKET, new Properties()));
+ assertThat(positionManagerConstruction.constructed().size(),
is(1));
+ }
+ }
+
+ @Test
+ void assertDropCleansUpAndHandlesSQLException() throws
ReflectiveOperationException {
+ PipelineJobManager jobManager = mock(PipelineJobManager.class);
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("jobManager"),
jobAPI, jobManager);
+ Map<String, Map<String, Object>> dataSources = new LinkedHashMap<>(2,
1F);
+ dataSources.put("foo_ds", createStandardDataSourceProperties());
+ dataSources.put("bar_ds", createStandardDataSourceProperties());
+ ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = new
ShardingSpherePipelineDataSourceConfiguration(buildYamlRootConfiguration(dataSources));
+ CDCJobConfiguration jobConfig = new CDCJobConfiguration("foo_job",
"foo_db", Collections.singletonList("foo_schema.foo_tbl"), false,
+ mock(DatabaseType.class), dataSourceConfig, new
JobDataNodeLine(Collections.singletonList(
+ new JobDataNodeEntry("foo_tbl",
Collections.singletonList(new DataNode("foo_ds" + ".foo_tbl"))))),
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("foo_tbl",
Collections.singletonList(new DataNode("foo_ds" + ".foo_tbl")))))), false,
+ new CDCJobConfiguration.SinkConfiguration(CDCSinkType.SOCKET,
new Properties()), 1, 0);
+ PipelineJobConfigurationManager jobConfigManager =
mock(PipelineJobConfigurationManager.class);
+
when(jobConfigManager.getJobConfiguration("foo_job")).thenReturn(jobConfig);
+
Plugins.getMemberAccessor().set(CDCJobAPI.class.getDeclaredField("jobConfigManager"),
jobAPI, jobConfigManager);
+ AtomicInteger positionManagerIndex = new AtomicInteger();
+ try (
+ MockedConstruction<IncrementalTaskPositionManager>
positionManagerConstruction = mockConstruction(
+ IncrementalTaskPositionManager.class, (mock, context)
-> {
+ if (0 == positionManagerIndex.getAndIncrement()) {
+
doNothing().when(mock).destroyPosition(eq("foo_job"),
any(StandardPipelineDataSourceConfiguration.class));
+ } else {
+
doThrow(SQLException.class).when(mock).destroyPosition(eq("foo_job"),
any(StandardPipelineDataSourceConfiguration.class));
+ }
+ })) {
+ jobAPI.drop("foo_job");
+ verify(jobManager).drop("foo_job");
+ assertThat(positionManagerConstruction.constructed().size(),
is(2));
+ }
+ }
+
+ @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
+ @Test
+ void assertGetJobItemInfosCoversPositions() throws SQLException {
+ Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(1)).thenReturn("bar_position");
+
when(connection.createStatement().executeQuery(anyString())).thenReturn(resultSet);
+ StorageUnit storageUnitWithSQL = mock(StorageUnit.class,
RETURNS_DEEP_STUBS);
+
when(storageUnitWithSQL.getDataSource().getConnection()).thenReturn(connection);
+ Map<String, StorageUnit> storageUnits = new LinkedHashMap<>(2, 1F);
+ StorageUnit storageUnitWithoutSQL = mock(StorageUnit.class);
+ storageUnits.put("foo_ds", storageUnitWithoutSQL);
+ storageUnits.put("bar_ds", storageUnitWithSQL);
+ putContext(storageUnits);
+ CDCJobConfiguration jobConfig = createJobConfiguration(1);
+ try (
+ MockedConstruction<PipelineJobConfigurationManager>
ignoredJobConfigManager =
mockConstruction(PipelineJobConfigurationManager.class,
+ (mock, context) ->
when(mock.getJobConfiguration("foo_job")).thenReturn(jobConfig));
+ MockedConstruction<TransmissionJobManager>
ignoredJobManagerConstruction = mockConstruction(TransmissionJobManager.class,
+ (mock, context) ->
when(mock.getJobItemInfos("foo_job")).thenReturn(
+ Arrays.asList(new TransmissionJobItemInfo(0,
"foo", null, 0L, 0, null),
+ buildJobItemInfo("foo_ds", null),
buildJobItemInfo("bar_ds", "binlog_001"))))) {
+ DialectPipelineSQLBuilder builderWithoutSQL =
mock(DialectPipelineSQLBuilder.class);
+
when(builderWithoutSQL.buildQueryCurrentPositionSQL()).thenReturn(Optional.empty());
+ DialectPipelineSQLBuilder builderWithSQL =
mock(DialectPipelineSQLBuilder.class);
+
when(builderWithSQL.buildQueryCurrentPositionSQL()).thenReturn(Optional.of("SELECT
1"));
+
when(DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
storageUnitWithoutSQL.getStorageType())).thenReturn(builderWithoutSQL);
+
when(DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
storageUnitWithSQL.getStorageType())).thenReturn(builderWithSQL);
+ List<CDCJobItemInfo> actual = new
ArrayList<>(jobAPI.getJobItemInfos("foo_job"));
+ assertThat(actual, hasSize(3));
+ assertThat(actual.get(0).getConfirmedPosition(), emptyString());
+ assertThat(actual.get(1).getConfirmedPosition(), emptyString());
+ assertThat(actual.get(2).getConfirmedPosition(), is("binlog_001"));
+ assertThat(actual.get(2).getCurrentPosition(), is("bar_position"));
+ }
+ }
+
+ @Test
+ void assertGetCurrentPositionThrowsException() throws SQLException {
+ StorageUnit storageUnit = mock(StorageUnit.class, RETURNS_DEEP_STUBS);
+ putContext(Collections.singletonMap("foo_ds", storageUnit));
+ CDCJobConfiguration jobConfig = createJobConfiguration(1);
+ try (
+ MockedConstruction<PipelineJobConfigurationManager>
ignoredJobConfigManager =
mockConstruction(PipelineJobConfigurationManager.class,
+ (mock, context) ->
when(mock.getJobConfiguration("foo_job")).thenReturn(jobConfig));
+ MockedConstruction<TransmissionJobManager>
ignoredJobManagerConstruction = mockConstruction(TransmissionJobManager.class,
+ (mock, context) ->
when(mock.getJobItemInfos("foo_job")).thenReturn(Collections.singletonList(buildJobItemInfo("foo_ds",
"binlog_002"))))) {
+ DialectPipelineSQLBuilder builder =
mock(DialectPipelineSQLBuilder.class);
+
when(builder.buildQueryCurrentPositionSQL()).thenReturn(Optional.of("SELECT
1"));
+
when(DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
storageUnit.getStorageType())).thenReturn(builder);
+ DataSource dataSource = storageUnit.getDataSource();
+ Connection connection = mock(Connection.class);
+ when(dataSource.getConnection()).thenReturn(connection);
+ Statement statement = mock(Statement.class);
+ when(connection.createStatement()).thenReturn(statement);
+
when(statement.executeQuery(anyString())).thenThrow(SQLException.class);
+ assertThrows(PipelineInternalException.class, () ->
jobAPI.getJobItemInfos("foo_job"));
+ }
+ }
+
+ @Test
+ void assertStartEnableDisableAndType() {
+ JobConfigurationPOJO jobConfigPOJO = createJobConfigurationPOJO();
+ jobConfigPOJO.setShardingTotalCount(1);
+
when(PipelineJobIdUtils.getElasticJobConfigurationPOJO("foo_job")).thenReturn(jobConfigPOJO);
+ JobConfigurationAPI jobConfigAPI = mock(JobConfigurationAPI.class);
+
when(PipelineAPIFactory.getJobConfigurationAPI(any())).thenReturn(jobConfigAPI);
+
when(PipelineAPIFactory.getRegistryCenter(any())).thenReturn(mock(CoordinatorRegistryCenter.class));
+ PipelineSink sink = mock(PipelineSink.class);
+ try (MockedConstruction<OneOffJobBootstrap> jobBootstrapConstruction =
mockConstruction(OneOffJobBootstrap.class)) {
+ jobAPI.start("foo_job", sink);
+
assertThat(jobConfigPOJO.getProps().getProperty("start_time_millis"),
is(jobConfigPOJO.getProps().getProperty("start_time_millis")));
+ verify(jobConfigAPI).updateJobConfiguration(jobConfigPOJO);
+ jobAPI.disable("foo_job");
+ assertNotNull(jobConfigPOJO.getProps().getProperty("stop_time"));
+ jobAPI.commit("foo_job");
+ jobAPI.rollback("foo_job");
+ assertThat(jobAPI.getType(), is("STREAMING"));
+ assertThat(jobBootstrapConstruction.constructed().size(), is(1));
+ }
+ }
+
+ private void putContext(final Map<String, StorageUnit> storageUnits) {
+ MetaDataContexts metaDataContexts = mock(MetaDataContexts.class,
RETURNS_DEEP_STUBS);
+
when(metaDataContexts.getMetaData().getDatabase("foo_db").getResourceMetaData().getStorageUnits()).thenReturn(storageUnits);
+ ContextManager contextManager = mock(ContextManager.class);
+
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
+ PipelineContextManager.putContext(new PipelineContextKey("foo_db",
InstanceType.PROXY), contextManager);
+ PipelineContextManager.putContext(new
PipelineContextKey(InstanceType.PROXY), contextManager);
+ }
+
+ private CDCJobConfiguration createJobConfiguration(final int
shardingCount) {
+ Map<String, Map<String, Object>> dataSources = new LinkedHashMap<>(2,
1F);
+ dataSources.put("foo_ds", createStandardDataSourceProperties());
+ dataSources.put("bar_ds", createStandardDataSourceProperties());
+ ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = new
ShardingSpherePipelineDataSourceConfiguration(buildYamlRootConfiguration(dataSources));
+ List<JobDataNodeLine> jobDataNodeLines = IntStream.range(0,
shardingCount).mapToObj(
+ i -> new JobDataNodeLine(Collections.singletonList(new
JobDataNodeEntry("foo_tbl", Collections.singletonList(new DataNode((0 == i ?
"foo_ds" : "bar_ds") + ".foo_tbl"))))))
+ .collect(Collectors.toList());
+ return new CDCJobConfiguration("foo_job", "foo_db",
Collections.singletonList("foo_schema.foo_tbl"), false, mock(),
+ dataSourceConfig, jobDataNodeLines.get(0), jobDataNodeLines,
false, new CDCJobConfiguration.SinkConfiguration(CDCSinkType.SOCKET, new
Properties()), 1, 0);
+ }
+
+ private YamlRootConfiguration buildYamlRootConfiguration(final Map<String,
Map<String, Object>> dataSources) {
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setDatabaseName("foo_db");
+ result.setDataSources(dataSources);
+ result.setRules(Collections.emptyList());
+ result.setProps(new Properties());
+ return result;
+ }
+
+ private JobConfigurationPOJO createJobConfigurationPOJO() {
+ JobConfigurationPOJO result = new JobConfigurationPOJO();
+ result.setJobName("foo_job");
+ return result;
+ }
+
+ private StreamDataParameter buildStreamDataParameter(final
Collection<String> dataSourceNames) {
+ Map<String, List<DataNode>> tableAndNodes = dataSourceNames.stream()
+ .collect(Collectors.toMap(each -> "foo_schema.foo_tbl", each
-> Collections.singletonList(new DataNode(each + ".foo_tbl")), (a, b) -> b,
LinkedHashMap::new));
+ return new StreamDataParameter("foo_db", new
LinkedList<>(Collections.singletonList("foo_schema.foo_tbl")), false,
tableAndNodes, false);
+ }
+
+ private TransmissionJobItemInfo buildJobItemInfo(final String
dataSourceName, final String incrementalPosition) {
+ TransmissionJobItemProgress progress = new
TransmissionJobItemProgress();
+ progress.setDataSourceName(dataSourceName);
+ progress.setIncremental(new JobItemIncrementalTasksProgress(null ==
incrementalPosition ? null : new IncrementalTaskProgress(new
SimpleIngestPosition(incrementalPosition))));
+ return new TransmissionJobItemInfo(0, "foo_tbl", progress, 0L, 0,
null);
+ }
+
+ @RequiredArgsConstructor
+ private static final class SimpleIngestPosition implements IngestPosition {
+
+ private final String value;
+
+ @Override
+ public String toString() {
+ return value;
+ }
+ }
+}