This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7380cc2e97a Add more test cases on YamlCDCJobConfigurationSwapperTest
(#37379)
7380cc2e97a is described below
commit 7380cc2e97a4f92225e5af54069898849f6672bb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 14 12:49:41 2025 +0800
Add more test cases on YamlCDCJobConfigurationSwapperTest (#37379)
---
.../YamlCDCJobConfigurationSwapperTest.java | 114 ++++++++++++++++++++-
1 file changed, 109 insertions(+), 5 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
index 288f7351037..7bc492f09f3 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/config/yaml/swapper/YamlCDCJobConfigurationSwapperTest.java
@@ -17,34 +17,64 @@
package org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper;
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration.SinkConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class YamlCDCJobConfigurationSwapperTest {
- private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
+ private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "H2");
+
+ private final YamlCDCJobConfigurationSwapper swapper = new
YamlCDCJobConfigurationSwapper();
@Test
void assertSwapToObject() {
- YamlCDCJobConfiguration yamlJobConfig =
createYamlCDCJobConfiguration();
- CDCJobConfiguration actual = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
+ YamlCDCJobConfiguration yamlJobConfig =
createYamlCDCJobConfigurationWithDataNodes();
+ CDCJobConfiguration actual = swapper.swapToObject(yamlJobConfig);
assertThat(actual.getJobId(),
is("j0302p00007a8bf46da145dc155ba25c710b550220"));
assertThat(actual.getDatabaseName(), is("test_db"));
assertThat(actual.getSchemaTableNames(),
is(Arrays.asList("test.t_order", "t_order_item")));
assertTrue(actual.isFull());
+ assertThat(actual.getTablesFirstDataNodes().marshal(),
is("foo_tbl:foo_ds.foo_tbl"));
+ assertThat(actual.getJobShardingDataNodes(), hasSize(1));
+ assertThat(actual.getJobShardingDataNodes().get(0).marshal(),
is("bar_tbl:bar_ds.bar_tbl"));
+ assertThat(actual.getSinkConfig().getSinkType(),
is(CDCSinkType.SOCKET));
+ assertThat(actual.getSinkConfig().getProps().getProperty("foo_key"),
is("foo_value"));
+ assertThat(actual.getConcurrency(), is(2));
+ assertThat(actual.getRetryTimes(), is(3));
+ assertThat(actual.getDataSourceConfig().getType(),
is("ShardingSphereJDBC"));
+ assertTrue(actual.isDecodeWithTX());
}
private YamlCDCJobConfiguration createYamlCDCJobConfiguration() {
@@ -60,14 +90,88 @@ class YamlCDCJobConfigurationSwapperTest {
return result;
}
+ private YamlCDCJobConfiguration
createYamlCDCJobConfigurationWithDataNodes() {
+ YamlCDCJobConfiguration result = createYamlCDCJobConfiguration();
+
result.setDataSourceConfiguration(createYamlPipelineDataSourceConfiguration());
+ result.setTablesFirstDataNodes(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("foo_tbl",
Collections.singletonList(new DataNode("foo_ds.foo_tbl"))))).marshal());
+ List<JobDataNodeLine> jobShardingDataNodes = Collections.singletonList(
+ new JobDataNodeLine(Collections.singletonList(new
JobDataNodeEntry("bar_tbl", Collections.singletonList(new
DataNode("bar_ds.bar_tbl"))))));
+
result.setJobShardingDataNodes(Collections.singletonList(jobShardingDataNodes.get(0).marshal()));
+ result.setDecodeWithTX(true);
+ Properties sinkProps = new Properties();
+ sinkProps.setProperty("foo_key", "foo_value");
+ result.getSinkConfig().setProps(sinkProps);
+ result.setConcurrency(2);
+ result.setRetryTimes(3);
+ return result;
+ }
+
+ private YamlPipelineDataSourceConfiguration
createYamlPipelineDataSourceConfiguration() {
+ YamlRootConfiguration rootConfig = new YamlRootConfiguration();
+ rootConfig.setDatabaseName("foo_db");
+ Map<String, Object> dataSource = new LinkedHashMap<>(4, 1F);
+ dataSource.put("url", "jdbc:h2:mem:foo_db;MODE=MySQL");
+ dataSource.put("username", "root");
+ dataSource.put("password", "root");
+ Map<String, Map<String, Object>> dataSources = new LinkedHashMap<>(1,
1F);
+ dataSources.put("foo_ds", dataSource);
+ rootConfig.setDataSources(dataSources);
+ rootConfig.setRules(new LinkedList<>());
+ rootConfig.setProps(new Properties());
+ YamlPipelineDataSourceConfiguration result = new
YamlPipelineDataSourceConfiguration();
+ result.setType("ShardingSphereJDBC");
+ result.setParameter(YamlEngine.marshal(rootConfig));
+ return result;
+ }
+
@Test
- void assertSwapToYamlConfig() {
+ void assertSwapToYamlConfiguration() {
CDCJobConfiguration jobConfig = new
CDCJobConfiguration("j0302p00007a8bf46da145dc155ba25c710b550220", "test_db",
Arrays.asList("t_order", "t_order_item"), true, databaseType,
null, null, null, true, new
SinkConfiguration(CDCSinkType.SOCKET, new Properties()), 1, 1);
- YamlCDCJobConfiguration actual = new
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(jobConfig);
+ YamlCDCJobConfiguration actual =
swapper.swapToYamlConfiguration(jobConfig);
assertThat(actual.getJobId(),
is("j0302p00007a8bf46da145dc155ba25c710b550220"));
assertThat(actual.getDatabaseName(), is("test_db"));
assertThat(actual.getSchemaTableNames(), is(Arrays.asList("t_order",
"t_order_item")));
assertTrue(actual.isFull());
+ assertNull(actual.getTablesFirstDataNodes());
+ assertNull(actual.getJobShardingDataNodes());
+ assertThat(actual.getSinkConfig().getSinkType(),
is(CDCSinkType.SOCKET.name()));
+ assertThat(actual.getConcurrency(), is(1));
+ assertThat(actual.getRetryTimes(), is(1));
+ }
+
+ @Test
+ void assertSwapToYamlConfigurationWithDataNodes() {
+ CDCJobConfiguration jobConfig = new
CDCJobConfiguration("j0302p00007a8bf46da145dc155ba25c710b550220", "test_db",
Arrays.asList("t_order", "t_order_item"),
+ true, databaseType, null, new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("foo_tbl",
Collections.singletonList(new DataNode("foo_ds.foo_tbl"))))),
+ Collections.singletonList(new
JobDataNodeLine(Collections.singletonList(new JobDataNodeEntry("bar_tbl",
Collections.singletonList(new DataNode("bar_ds.bar_tbl")))))),
+ false, new SinkConfiguration(CDCSinkType.SOCKET,
PropertiesBuilder.build(new Property("foo_key", "foo_value"))), 2, 4);
+ YamlCDCJobConfiguration actual =
swapper.swapToYamlConfiguration(jobConfig);
+ assertThat(actual.getTablesFirstDataNodes(),
is("foo_tbl:foo_ds.foo_tbl"));
+ assertThat(actual.getJobShardingDataNodes(),
is(Collections.singletonList("bar_tbl:bar_ds.bar_tbl")));
+ assertThat(actual.getSinkConfig().getProps().getProperty("foo_key"),
is("foo_value"));
+ assertThat(actual.getConcurrency(), is(2));
+ assertThat(actual.getRetryTimes(), is(4));
+ }
+
+ @Test
+ void assertSwapToObjectFromJobParam() {
+ YamlCDCJobConfiguration yamlConfig = createYamlCDCJobConfiguration();
+
yamlConfig.setDataSourceConfiguration(createYamlPipelineDataSourceConfiguration());
+ yamlConfig.getSinkConfig().setProps(PropertiesBuilder.build(new
Property("foo_key", "foo_value")));
+ yamlConfig.setDecodeWithTX(false);
+ String jobParam = YamlEngine.marshal(yamlConfig);
+ CDCJobConfiguration actual = swapper.swapToObject(jobParam);
+ assertThat(actual.getJobId(),
is("j0302p00007a8bf46da145dc155ba25c710b550220"));
+ assertThat(actual.getJobShardingDataNodes(), empty());
+ assertNull(actual.getTablesFirstDataNodes());
+ assertThat(actual.getDataSourceConfig(),
instanceOf(ShardingSpherePipelineDataSourceConfiguration.class));
+ assertThat(actual.getSinkConfig().getProps().getProperty("foo_key"),
is("foo_value"));
+ assertFalse(actual.isDecodeWithTX());
+ }
+
+ @Test
+ void assertSwapToObjectFromJobParamWithNullJobParam() {
+ assertNull(swapper.swapToObject((String) null));
}
}