This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 24ca9fecc0e Review and improve pipeline code (#32768)
24ca9fecc0e is described below
commit 24ca9fecc0ec48c47f13cd63411fd3cb3238cf23
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Sep 2 15:46:34 2024 +0800
Review and improve pipeline code (#32768)
* Compatible with timestamp millisecond precision
* Transfer partial proxy props into ShardingSphereDataSource for dynamic
change
* Improve InventoryRecordsCountCalculator.getEstimatedCount for branch
database
* Add MigrationJobConfiguration.getJobDataNodeLine
* Add CDCJobId.sinkType
* Replace getProcessedRecordsCount to getInventoryRecordsCount as data
consistency check initial records count
* Update unit test
---
.../infra/util/DatabaseTypeUtils.java | 35 ++++++++++------------
.../DataConsistencyCheckUtils.java | 5 ++++
.../ShardingSpherePipelineDataSourceCreator.java | 7 +++++
.../InventoryRecordsCountCalculator.java | 11 +++----
.../ConsistencyCheckDataBuilder.java | 2 +-
.../DataConsistencyCheckUtilsTest.java | 10 +++++++
.../shardingsphere/data/pipeline/cdc/CDCJobId.java | 3 ++
.../data/pipeline/cdc/api/CDCJobAPI.java | 2 +-
.../data/pipeline/cdc/CDCJobIdTest.java | 3 +-
.../migration/MigrationJobExecutorCallback.java | 2 +-
.../MigrationDataConsistencyChecker.java | 2 +-
.../config/MigrationJobConfiguration.java | 12 +++++++-
12 files changed, 64 insertions(+), 30 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/util/DatabaseTypeUtils.java
similarity index 54%
copy from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
copy to
infra/common/src/main/java/org/apache/shardingsphere/infra/util/DatabaseTypeUtils.java
index 9ad7b418237..a6cc70e5c92 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/util/DatabaseTypeUtils.java
@@ -15,28 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc;
+package org.apache.shardingsphere.infra.util;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-
-import java.util.List;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
/**
- * CDC job id.
+ * Database type utility class.
*/
-@RequiredArgsConstructor
-@Getter
-public final class CDCJobId implements PipelineJobId {
-
- private final PipelineJobType jobType = new CDCJobType();
-
- private final PipelineContextKey contextKey;
-
- private final List<String> schemaTableNames;
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DatabaseTypeUtils {
- private final boolean full;
+ /**
+ * Get trunk database type.
+ *
+ * @param databaseType database type
+ * @return trunk database type
+ */
+ public static DatabaseType getTrunkDatabaseType(final DatabaseType
databaseType) {
+ return databaseType.getTrunkDatabaseType().orElse(databaseType);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
index d2f985dbe6c..2936e34cfa1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
@@ -30,6 +30,7 @@ import java.math.RoundingMode;
import java.sql.Array;
import java.sql.SQLException;
import java.sql.SQLXML;
+import java.sql.Timestamp;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
@@ -85,6 +86,10 @@ public final class DataConsistencyCheckUtils {
if (thisColumnValue instanceof SQLXML && thatColumnValue instanceof
SQLXML) {
return ((SQLXML) thisColumnValue).getString().equals(((SQLXML)
thatColumnValue).getString());
}
+ // TODO Use different match strategy for heterogeneous database or not
+ if (thisColumnValue instanceof Timestamp && thatColumnValue instanceof
Timestamp) {
+ return ((Timestamp) thisColumnValue).getTime() / 1000L * 1000L ==
((Timestamp) thatColumnValue).getTime() / 1000L * 1000L;
+ }
if (thisColumnValue instanceof Array && thatColumnValue instanceof
Array) {
return Objects.deepEquals(((Array) thisColumnValue).getArray(),
((Array) thatColumnValue).getArray());
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index a1abbc4a274..8be926d1002 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -42,6 +42,7 @@ import
org.apache.shardingsphere.single.yaml.config.pojo.YamlSingleRuleConfigura
import javax.sql.DataSource;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -83,6 +84,12 @@ public final class ShardingSpherePipelineDataSourceCreator
implements PipelineDa
private void updateConfigurationProperties(final YamlRootConfiguration
yamlRootConfig) {
Properties newProps = new Properties();
+ for (String each :
Arrays.asList(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(),
ConfigurationPropertyKey.SYSTEM_LOG_LEVEL.getKey(),
ConfigurationPropertyKey.SQL_SHOW.getKey())) {
+ Object value = yamlRootConfig.getProps().get(each);
+ if (null != value) {
+ newProps.put(each, value);
+ }
+ }
newProps.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(),
String.valueOf(Boolean.FALSE));
// Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY,
make sure streaming query work.
newProps.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(),
100000);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
index 5d48ea0ca0f..51f1809ca48 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
@@ -20,14 +20,15 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.database.mariadb.type.MariaDBDatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.DatabaseTypeUtils;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -57,7 +58,7 @@ public final class InventoryRecordsCountCalculator {
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
try {
- if (sql.isPresent()) {
+ if (sql.isPresent() &&
dumperContext.getCommonContext().getDataSourceConfig() instanceof
StandardPipelineDataSourceConfiguration) {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class,
dataSource.getDatabaseType().getType());
long result = getEstimatedCount(databaseType, dataSource,
sql.get());
return result > 0L ? result : getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
@@ -73,7 +74,7 @@ public final class InventoryRecordsCountCalculator {
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(estimatedCountSQL)) {
- if (databaseType instanceof MySQLDatabaseType || databaseType
instanceof MariaDBDatabaseType) {
+ if (DatabaseTypeUtils.getTrunkDatabaseType(databaseType)
instanceof MySQLDatabaseType) {
preparedStatement.setString(1, connection.getCatalog());
}
try (ResultSet resultSet = preparedStatement.executeQuery()) {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
index 8174bc01255..0219a17bd25 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
@@ -103,7 +103,7 @@ public final class ConsistencyCheckDataBuilder {
return new Date(((Date) value).getTime() - 1L);
}
if (value instanceof Timestamp) {
- return new Timestamp(((Timestamp) value).getTime() - 1L);
+ return new Timestamp(((Timestamp) value).getTime() - 1000L);
}
if (value instanceof int[]) {
int[] result = ((int[]) value).clone();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
index 7e2de6daa18..a494d544d84 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
@@ -21,7 +21,9 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
+import java.sql.Timestamp;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class DataConsistencyCheckUtilsTest {
@@ -42,4 +44,12 @@ class DataConsistencyCheckUtilsTest {
BigDecimal another = BigDecimal.valueOf(33220L, 2);
assertTrue(DataConsistencyCheckUtils.isBigDecimalEquals(one, another));
}
+
+ @Test
+ void assertTimestampEquals() {
+ EqualsBuilder equalsBuilder = new EqualsBuilder();
+ long time = System.currentTimeMillis();
+ assertTrue(DataConsistencyCheckUtils.isMatched(equalsBuilder, new
Timestamp(time), new Timestamp(time / 10L * 10L + 1L)));
+ assertFalse(DataConsistencyCheckUtils.isMatched(equalsBuilder, new
Timestamp(time), new Timestamp(time + 1000L)));
+ }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
index 9ad7b418237..c86734b2c2c 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
@@ -39,4 +40,6 @@ public final class CDCJobId implements PipelineJobId {
private final List<String> schemaTableNames;
private final boolean full;
+
+ private final CDCSinkType sinkType;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 2e9d6187cf8..483a07ced48 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -146,7 +146,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
List<String> schemaTableNames = param.getSchemaTableNames();
Collections.sort(schemaTableNames);
- result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey,
schemaTableNames, param.isFull())));
+ result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey,
schemaTableNames, param.isFull(), sinkType)));
result.setDatabaseName(param.getDatabaseName());
result.setSchemaTableNames(schemaTableNames);
result.setFull(param.isFull());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
index eeff0294f48..ba4fcc73857 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.cdc;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -31,7 +32,7 @@ class CDCJobIdTest {
@Test
void assertParseJobType() {
- String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new
PipelineContextKey("sharding_db", InstanceType.PROXY),
Collections.singletonList("foo"), true));
+ String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new
PipelineContextKey("sharding_db", InstanceType.PROXY),
Collections.singletonList("foo"), true, CDCSinkType.SOCKET));
assertThat(PipelineJobIdUtils.parseJobType(jobId),
instanceOf(CDCJobType.class));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index 0d0b136c2fd..2fde01e7632 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -63,7 +63,7 @@ public final class MigrationJobExecutorCallback implements
DistributedPipelineJo
}
private MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
- IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+ IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobDataNodeLine(jobShardingItem));
Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 767d856c728..2ed17f99198 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -105,7 +105,7 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
private long getRecordsCount() {
Map<Integer, TransmissionJobItemProgress> jobProgress = new
TransmissionJobManager(new MigrationJobType()).getJobProgress(jobConfig);
- return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum();
+ return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getInventoryRecordsCount).sum();
}
private boolean checkTableInventoryDataUnmatchedAndBreak(final
JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index f8eae8eb2d5..0a467418ba3 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -21,8 +21,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.List;
@@ -67,4 +67,14 @@ public final class MigrationJobConfiguration implements
PipelineJobConfiguration
public int getJobShardingCount() {
return jobShardingDataNodes.size();
}
+
+ /**
+ * Get job data node line.
+ *
+ * @param jobShardingItem job sharding item
+ * @return job data node line
+ */
+ public JobDataNodeLine getJobDataNodeLine(final int jobShardingItem) {
+ return jobShardingDataNodes.get(jobShardingItem);
+ }
}