This is an automated email from the ASF dual-hosted git repository. jianglongtao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 00a4c569f93 Improve data consistency check table without unique key with DATA_MATCH (#35298) 00a4c569f93 is described below commit 00a4c569f9363f3b5fe154c61c242697e75248ff Author: Hongsheng Zhong <zhonghongsh...@apache.org> AuthorDate: Wed Apr 30 19:17:52 2025 +0800 Improve data consistency check table without unique key with DATA_MATCH (#35298) * Add preCheck default method for TableInventoryChecker interface * Override preCheck method for DataMatchTableInventoryChecker to check table without unique key * Integrate MigrationDataConsistencyChecker with TableInventoryChecker.preCheck --- .../DataMatchTableDataConsistencyChecker.java | 11 ++++ .../table/MatchingTableInventoryChecker.java | 3 + .../table/TableInventoryChecker.java | 11 ++++ .../table/DataMatchTableInventoryCheckerTest.java | 67 ++++++++++++++++++++++ .../MigrationDataConsistencyChecker.java | 11 +++- .../MigrationDataConsistencyCheckerTest.java | 44 +++++++++----- 6 files changed, 131 insertions(+), 16 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java index 84236229316..59243852019 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java @@ -18,6 +18,8 @@ package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table; import com.google.common.base.Strings; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; @@ -26,6 +28,7 @@ import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.infra.spi.annotation.SPIDescription; import java.util.Collection; +import java.util.Optional; import java.util.Properties; /** @@ -90,6 +93,14 @@ public final class DataMatchTableDataConsistencyChecker implements TableDataCons this.chunkSize = chunkSize; } + @Override + public Optional<TableDataConsistencyCheckResult> preCheck() { + if (getParam().getUniqueKeys().isEmpty()) { + return Optional.of(new TableDataConsistencyCheckResult(TableDataConsistencyCheckIgnoredType.NO_UNIQUE_KEY)); + } + return Optional.empty(); + } + @Override protected SingleTableInventoryCalculator buildSingleTableInventoryCalculator() { return new RecordSingleTableInventoryCalculator(chunkSize); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java index 690ce1e89b0..f17f66b6914 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java @@ -17,6 +17,8 @@ package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table; +import lombok.AccessLevel; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult; @@ -51,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public abstract class MatchingTableInventoryChecker implements TableInventoryChecker { + @Getter(AccessLevel.PROTECTED) private final TableInventoryCheckParameter param; private final AtomicBoolean canceling = new AtomicBoolean(false); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java index 5248d0aa14a..23bde5ecdd8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java @@ -20,11 +20,22 @@ package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import java.util.Optional; + /** * Table inventory checker. */ public interface TableInventoryChecker extends PipelineCancellable { + /** + * Pre-check for table inventory data. + * + * @return check result + */ + default Optional<TableDataConsistencyCheckResult> preCheck() { + return Optional.empty(); + } + /** * Data consistency check for single table inventory data. * diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableInventoryCheckerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableInventoryCheckerTest.java new file mode 100644 index 00000000000..4be11edd9ec --- /dev/null +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableInventoryCheckerTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table; + +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; +import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable; +import org.junit.jupiter.api.Test; + +import java.sql.Types; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DataMatchTableInventoryCheckerTest { + + @Test + void assertPreCheckWithUniqueKey() { + try (DataMatchTableDataConsistencyChecker checker = new DataMatchTableDataConsistencyChecker()) { + List<PipelineColumnMetaData> uniqueKeys = Collections.singletonList(new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "int", false, true, true)); + TableInventoryCheckParameter param = buildTableInventoryCheckParameter(uniqueKeys); + TableInventoryChecker tableInventoryChecker = checker.buildTableInventoryChecker(param); + Optional<TableDataConsistencyCheckResult> actual = tableInventoryChecker.preCheck(); + assertFalse(actual.isPresent()); + } + } + + @Test + void assertPreCheckWithoutUniqueKey() { + try (DataMatchTableDataConsistencyChecker checker = new DataMatchTableDataConsistencyChecker()) { + TableInventoryCheckParameter param = buildTableInventoryCheckParameter(Collections.emptyList()); + TableInventoryChecker tableInventoryChecker = checker.buildTableInventoryChecker(param); + Optional<TableDataConsistencyCheckResult> actual = tableInventoryChecker.preCheck(); + assertTrue(actual.isPresent()); + assertNotNull(actual.get().getIgnoredType()); + assertThat(actual.get().getIgnoredType(), is(TableDataConsistencyCheckIgnoredType.NO_UNIQUE_KEY)); + } + } + + private TableInventoryCheckParameter buildTableInventoryCheckParameter(final List<PipelineColumnMetaData> uniqueKeys) { + return new TableInventoryCheckParameter("jobId1", null, null, new QualifiedTable(null, "t_order"), new QualifiedTable(null, "t_order"), + Arrays.asList("order_id", "user_id", "status"), uniqueKeys, null, null); + } +} diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index 06c9a484ef1..1d2ac52b3d8 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -113,6 +113,11 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis for (DataNode each : entry.getDataNodes()) { TableDataConsistencyCheckResult checkResult = checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, dataSourceManager); checkResultMap.put(new QualifiedTable(each.getSchemaName(), each.getTableName()), checkResult); + if (checkResult.isIgnored()) { + progressContext.getIgnoredTableNames().add(each.format()); + log.info("Table '{}' is ignored, ignore type: {}", checkResult.getIgnoredType(), each.format()); + continue; + } if (!checkResult.isMatched() && tableChecker.isBreakOnInventoryCheckNotMatched()) { log.info("Unmatched on table '{}', ignore left tables", each.format()); return true; @@ -138,15 +143,15 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext); TableInventoryChecker tableInventoryChecker = tableChecker.buildTableInventoryChecker(param); currentTableInventoryChecker.set(tableInventoryChecker); - TableDataConsistencyCheckResult result = tableInventoryChecker.checkSingleTableInventoryData(); - currentTableInventoryChecker.set(null); - return result; + Optional<TableDataConsistencyCheckResult> preCheckResult = tableInventoryChecker.preCheck(); + return preCheckResult.orElseGet(tableInventoryChecker::checkSingleTableInventoryData); } @Override public void cancel() { canceling.set(true); Optional.ofNullable(currentTableInventoryChecker.get()).ifPresent(TableInventoryChecker::cancel); + currentTableInventoryChecker.set(null); } @Override diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index 35f6f358aed..7044c4f81fe 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -19,11 +19,12 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consist import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource; +import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -43,6 +44,9 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Map; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; class MigrationDataConsistencyCheckerTest { @@ -53,8 +57,25 @@ class MigrationDataConsistencyCheckerTest { } @Test - void assertCountAndDataCheck() throws SQLException { - MigrationJobConfiguration jobConfig = createJobConfiguration(); + void assertFixtureCheck() throws SQLException { + MigrationJobConfiguration jobConfig = createJobConfiguration(true); + Map<String, TableDataConsistencyCheckResult> checkResultMap = check(jobConfig, "FIXTURE"); + TableDataConsistencyCheckResult actual = checkResultMap.get("t_order"); + assertTrue(actual.isMatched()); + assertFalse(actual.isIgnored()); + } + + @Test + void assertDataMatchCheck() throws SQLException { + MigrationJobConfiguration jobConfig = createJobConfiguration(false); + Map<String, TableDataConsistencyCheckResult> checkResultMap = check(jobConfig, "DATA_MATCH"); + TableDataConsistencyCheckResult actual = checkResultMap.get("t_order"); + assertFalse(actual.isMatched()); + assertTrue(actual.isIgnored()); + assertThat(actual.getIgnoredType(), is(TableDataConsistencyCheckIgnoredType.NO_UNIQUE_KEY)); + } + + private Map<String, TableDataConsistencyCheckResult> check(final MigrationJobConfiguration jobConfig, final String algorithmType) { JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO(); jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); jobConfigurationPOJO.setJobName(jobConfig.getJobId()); @@ -62,11 +83,8 @@ class MigrationDataConsistencyCheckerTest { PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO)); governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, ""); - Map<String, TableDataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new TransmissionProcessContext(jobConfig.getJobId(), null), - createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null); - String checkKey = "t_order"; - assertTrue(actual.get(checkKey).isMatched()); - assertTrue(actual.get(checkKey).isMatched()); + return new MigrationDataConsistencyChecker(jobConfig, new TransmissionProcessContext(jobConfig.getJobId(), null), + createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check(algorithmType, null); } private ClusterPersistRepository getClusterPersistRepository() { @@ -78,21 +96,21 @@ class MigrationDataConsistencyCheckerTest { return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2"); } - private MigrationJobConfiguration createJobConfiguration() throws SQLException { + private MigrationJobConfiguration createJobConfiguration(final boolean orderHasUniqueKey) throws SQLException { MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()); - initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig()); - initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig()); + initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig(), orderHasUniqueKey); + initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), orderHasUniqueKey); return jobItemContext.getJobConfig(); } - private void initTableData(final PipelineDataSourceConfiguration dataSourceConfig) throws SQLException { + private void initTableData(final PipelineDataSourceConfiguration dataSourceConfig, final boolean orderHasUniqueKey) throws SQLException { try ( PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); PipelineDataSource dataSource = dataSourceManager.getDataSource(dataSourceConfig); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); - statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT(11))"); + statement.execute(String.format("CREATE TABLE t_order (order_id INT %s, user_id INT(11))", orderHasUniqueKey ? "PRIMARY KEY" : "")); statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 1), (999, 10)"); } }