This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 00a4c569f93 Improve data consistency check table without unique key 
with DATA_MATCH (#35298)
00a4c569f93 is described below

commit 00a4c569f9363f3b5fe154c61c242697e75248ff
Author: Hongsheng Zhong <zhonghongsh...@apache.org>
AuthorDate: Wed Apr 30 19:17:52 2025 +0800

    Improve data consistency check table without unique key with DATA_MATCH 
(#35298)
    
    * Add preCheck default method for TableInventoryChecker interface
    
    * Override preCheck method for DataMatchTableInventoryChecker to check 
table without unique key
    
    * Integrate MigrationDataConsistencyChecker with 
TableInventoryChecker.preCheck
---
 .../DataMatchTableDataConsistencyChecker.java      | 11 ++++
 .../table/MatchingTableInventoryChecker.java       |  3 +
 .../table/TableInventoryChecker.java               | 11 ++++
 .../table/DataMatchTableInventoryCheckerTest.java  | 67 ++++++++++++++++++++++
 .../MigrationDataConsistencyChecker.java           | 11 +++-
 .../MigrationDataConsistencyCheckerTest.java       | 44 +++++++++-----
 6 files changed, 131 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 84236229316..59243852019 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 
 import com.google.common.base.Strings;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
@@ -26,6 +28,7 @@ import 
org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
 
 import java.util.Collection;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -90,6 +93,14 @@ public final class DataMatchTableDataConsistencyChecker 
implements TableDataCons
             this.chunkSize = chunkSize;
         }
         
+        @Override
+        public Optional<TableDataConsistencyCheckResult> preCheck() {
+            if (getParam().getUniqueKeys().isEmpty()) {
+                return Optional.of(new 
TableDataConsistencyCheckResult(TableDataConsistencyCheckIgnoredType.NO_UNIQUE_KEY));
+            }
+            return Optional.empty();
+        }
+        
         @Override
         protected SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator() {
             return new RecordSingleTableInventoryCalculator(chunkSize);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 690ce1e89b0..f17f66b6914 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 
+import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
@@ -51,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @Slf4j
 public abstract class MatchingTableInventoryChecker implements 
TableInventoryChecker {
     
+    @Getter(AccessLevel.PROTECTED)
     private final TableInventoryCheckParameter param;
     
     private final AtomicBoolean canceling = new AtomicBoolean(false);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
index 5248d0aa14a..23bde5ecdd8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryChecker.java
@@ -20,11 +20,22 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 
+import java.util.Optional;
+
 /**
  * Table inventory checker.
  */
 public interface TableInventoryChecker extends PipelineCancellable {
     
+    /**
+     * Pre-check for table inventory data.
+     *
+     * @return check result
+     */
+    default Optional<TableDataConsistencyCheckResult> preCheck() {
+        return Optional.empty();
+    }
+    
     /**
      * Data consistency check for single table inventory data.
      *
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableInventoryCheckerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableInventoryCheckerTest.java
new file mode 100644
index 00000000000..4be11edd9ec
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableInventoryCheckerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
+
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DataMatchTableInventoryCheckerTest {
+    
+    @Test
+    void assertPreCheckWithUniqueKey() {
+        try (DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker()) {
+            List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(new PipelineColumnMetaData(1, "order_id", 
Types.INTEGER, "int", false, true, true));
+            TableInventoryCheckParameter param = 
buildTableInventoryCheckParameter(uniqueKeys);
+            TableInventoryChecker tableInventoryChecker = 
checker.buildTableInventoryChecker(param);
+            Optional<TableDataConsistencyCheckResult> actual = 
tableInventoryChecker.preCheck();
+            assertFalse(actual.isPresent());
+        }
+    }
+    
+    @Test
+    void assertPreCheckWithoutUniqueKey() {
+        try (DataMatchTableDataConsistencyChecker checker = new 
DataMatchTableDataConsistencyChecker()) {
+            TableInventoryCheckParameter param = 
buildTableInventoryCheckParameter(Collections.emptyList());
+            TableInventoryChecker tableInventoryChecker = 
checker.buildTableInventoryChecker(param);
+            Optional<TableDataConsistencyCheckResult> actual = 
tableInventoryChecker.preCheck();
+            assertTrue(actual.isPresent());
+            assertNotNull(actual.get().getIgnoredType());
+            assertThat(actual.get().getIgnoredType(), 
is(TableDataConsistencyCheckIgnoredType.NO_UNIQUE_KEY));
+        }
+    }
+    
+    private TableInventoryCheckParameter 
buildTableInventoryCheckParameter(final List<PipelineColumnMetaData> 
uniqueKeys) {
+        return new TableInventoryCheckParameter("jobId1", null, null, new 
QualifiedTable(null, "t_order"), new QualifiedTable(null, "t_order"),
+                Arrays.asList("order_id", "user_id", "status"), uniqueKeys, 
null, null);
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 06c9a484ef1..1d2ac52b3d8 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -113,6 +113,11 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
             for (DataNode each : entry.getDataNodes()) {
                 TableDataConsistencyCheckResult checkResult = 
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, 
dataSourceManager);
                 checkResultMap.put(new QualifiedTable(each.getSchemaName(), 
each.getTableName()), checkResult);
+                if (checkResult.isIgnored()) {
+                    progressContext.getIgnoredTableNames().add(each.format());
+                    log.info("Table '{}' is ignored, ignore type: {}", 
checkResult.getIgnoredType(), each.format());
+                    continue;
+                }
                 if (!checkResult.isMatched() && 
tableChecker.isBreakOnInventoryCheckNotMatched()) {
                     log.info("Unmatched on table '{}', ignore left tables", 
each.format());
                     return true;
@@ -138,15 +143,15 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
                 jobConfig.getJobId(), sourceDataSource, targetDataSource, 
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, 
progressContext);
         TableInventoryChecker tableInventoryChecker = 
tableChecker.buildTableInventoryChecker(param);
         currentTableInventoryChecker.set(tableInventoryChecker);
-        TableDataConsistencyCheckResult result = 
tableInventoryChecker.checkSingleTableInventoryData();
-        currentTableInventoryChecker.set(null);
-        return result;
+        Optional<TableDataConsistencyCheckResult> preCheckResult = 
tableInventoryChecker.preCheck();
+        return 
preCheckResult.orElseGet(tableInventoryChecker::checkSingleTableInventoryData);
     }
     
     @Override
     public void cancel() {
         canceling.set(true);
         
Optional.ofNullable(currentTableInventoryChecker.get()).ifPresent(TableInventoryChecker::cancel);
+        currentTableInventoryChecker.set(null);
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 35f6f358aed..7044c4f81fe 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -19,11 +19,12 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consist
 
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -43,6 +44,9 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MigrationDataConsistencyCheckerTest {
@@ -53,8 +57,25 @@ class MigrationDataConsistencyCheckerTest {
     }
     
     @Test
-    void assertCountAndDataCheck() throws SQLException {
-        MigrationJobConfiguration jobConfig = createJobConfiguration();
+    void assertFixtureCheck() throws SQLException {
+        MigrationJobConfiguration jobConfig = createJobConfiguration(true);
+        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
check(jobConfig, "FIXTURE");
+        TableDataConsistencyCheckResult actual = checkResultMap.get("t_order");
+        assertTrue(actual.isMatched());
+        assertFalse(actual.isIgnored());
+    }
+    
+    @Test
+    void assertDataMatchCheck() throws SQLException {
+        MigrationJobConfiguration jobConfig = createJobConfiguration(false);
+        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
check(jobConfig, "DATA_MATCH");
+        TableDataConsistencyCheckResult actual = checkResultMap.get("t_order");
+        assertFalse(actual.isMatched());
+        assertTrue(actual.isIgnored());
+        assertThat(actual.getIgnoredType(), 
is(TableDataConsistencyCheckIgnoredType.NO_UNIQUE_KEY));
+    }
+    
+    private Map<String, TableDataConsistencyCheckResult> check(final 
MigrationJobConfiguration jobConfig, final String algorithmType) {
         JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
         jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new 
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         jobConfigurationPOJO.setJobName(jobConfig.getJobId());
@@ -62,11 +83,8 @@ class MigrationDataConsistencyCheckerTest {
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
         
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 
0, "");
-        Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
TransmissionProcessContext(jobConfig.getJobId(), null),
-                
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
-        String checkKey = "t_order";
-        assertTrue(actual.get(checkKey).isMatched());
-        assertTrue(actual.get(checkKey).isMatched());
+        return new MigrationDataConsistencyChecker(jobConfig, new 
TransmissionProcessContext(jobConfig.getJobId(), null),
+                
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check(algorithmType,
 null);
     }
     
     private ClusterPersistRepository getClusterPersistRepository() {
@@ -78,21 +96,21 @@ class MigrationDataConsistencyCheckerTest {
         return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
     }
     
-    private MigrationJobConfiguration createJobConfiguration() throws 
SQLException {
+    private MigrationJobConfiguration createJobConfiguration(final boolean 
orderHasUniqueKey) throws SQLException {
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
-        
initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig());
-        
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
+        
initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig(),
 orderHasUniqueKey);
+        
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
 orderHasUniqueKey);
         return jobItemContext.getJobConfig();
     }
     
-    private void initTableData(final PipelineDataSourceConfiguration 
dataSourceConfig) throws SQLException {
+    private void initTableData(final PipelineDataSourceConfiguration 
dataSourceConfig, final boolean orderHasUniqueKey) throws SQLException {
         try (
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 PipelineDataSource dataSource = 
dataSourceManager.getDataSource(dataSourceConfig);
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, 
user_id INT(11))");
+            statement.execute(String.format("CREATE TABLE t_order (order_id 
INT %s, user_id INT(11))", orderHasUniqueKey ? "PRIMARY KEY" : ""));
             statement.execute("INSERT INTO t_order (order_id, user_id) VALUES 
(1, 1), (999, 10)");
         }
     }

Reply via email to