This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 51718e1072d Review and improve pipeline code (#28372)
51718e1072d is described below
commit 51718e1072db7442e33851283cc1ec9af73009f4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Sep 6 16:56:06 2023 +0800
Review and improve pipeline code (#28372)
---
.../data/pipeline/api/metadata/SchemaTableName.java | 10 ++++++++++
.../check/consistency/MigrationDataConsistencyChecker.java | 8 ++++----
.../e2e/data/pipeline/cases/PipelineContainerComposer.java | 2 +-
.../test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java | 6 ++++++
.../scenario/migration/api/impl/MigrationJobAPITest.java | 2 +-
.../check/consistency/MigrationDataConsistencyCheckerTest.java | 2 +-
6 files changed, 23 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
index ab4496aa6c5..985ab7607a2 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
@@ -42,4 +42,14 @@ public class SchemaTableName {
this.schemaName = new SchemaName(schemaName);
this.tableName = new TableName(tableName);
}
+
+ /**
+ * Marshal to text.
+ *
+ * @return text
+ */
+ public String marshal() {
+ String schemaName = this.schemaName.getOriginal();
+ return null == schemaName ? tableName.getOriginal() : schemaName + "."
+ tableName.getOriginal();
+ }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index f4fef5957a4..13dde2fb842 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -86,14 +86,14 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
progressContext.setRecordsCount(getRecordsCount());
progressContext.getTableNames().addAll(sourceTableNames);
progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
- Map<DataNode, TableDataConsistencyCheckResult> result = new
LinkedHashMap<>();
+ Map<SchemaTableName, TableDataConsistencyCheckResult> result = new
LinkedHashMap<>();
TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
try (PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager()) {
for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
checkTableInventoryData(each, tableChecker, result,
dataSourceManager);
}
}
- return result.entrySet().stream().collect(Collectors.toMap(entry ->
DataNodeUtils.formatWithSchema(entry.getKey()), Entry::getValue));
+ return result.entrySet().stream().collect(Collectors.toMap(entry ->
entry.getKey().marshal(), Entry::getValue));
}
private long getRecordsCount() {
@@ -102,11 +102,11 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
}
private void checkTableInventoryData(final JobDataNodeLine
jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
- final Map<DataNode,
TableDataConsistencyCheckResult> checkResultMap, final
PipelineDataSourceManager dataSourceManager) {
+ final Map<SchemaTableName,
TableDataConsistencyCheckResult> checkResultMap, final
PipelineDataSourceManager dataSourceManager) {
for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
for (DataNode each : entry.getDataNodes()) {
TableDataConsistencyCheckResult checkResult =
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker,
dataSourceManager);
- checkResultMap.put(each, checkResult);
+ checkResultMap.put(new SchemaTableName(each.getSchemaName(),
each.getTableName()), checkResult);
if (!checkResult.isMatched() &&
tableChecker.isBreakOnInventoryCheckNotMatched()) {
log.info("Unmatched on table '{}', ignore left tables",
DataNodeUtils.formatWithSchema(each));
return;
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index ac21dc787b2..420b42823e2 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -372,7 +372,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
* @throws SQLException SQL exception
*/
public void proxyExecuteWithLog(final String sql, final int sleepSeconds)
throws SQLException {
- log.info("proxy execute :{}", sql);
+ log.info("proxy execute: {}", sql);
List<String> sqlList =
Splitter.on(";").trimResults().omitEmptyStrings().splitToList(sql);
try (Connection connection = proxyDataSource.getConnection()) {
for (String each : sqlList) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index d03bb3b7a86..a79edb85aef 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -44,6 +45,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
@Slf4j
@@ -70,6 +72,7 @@ public final class E2EIncrementalTask extends
BaseIncrementTask {
private final int loopCount;
+ @SneakyThrows(InterruptedException.class)
@Override
public void run() {
List<Object[]> orderInsertData =
PipelineCaseHelper.generateOrderInsertData(databaseType,
primaryKeyGenerateAlgorithm, loopCount);
@@ -77,16 +80,19 @@ public final class E2EIncrementalTask extends
BaseIncrementTask {
for (Object[] each : orderInsertData) {
primaryKeys.add(each[0]);
insertOrder(each);
+ TimeUnit.MILLISECONDS.sleep(100L);
}
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
// TODO 0000-00-00 00:00:00 now will cause consistency check
failed of MySQL.
// DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
updateOrderById(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
+ TimeUnit.MILLISECONDS.sleep(500L);
}
for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
setNullToAllFields(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
deleteOrderById(primaryKeys.remove(random.nextInt(0,
primaryKeys.size())));
+ TimeUnit.MILLISECONDS.sleep(500L);
}
log.info("increment task runnable execute successfully.");
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 239ff019d3e..d052d9609ae 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -179,7 +179,7 @@ class MigrationJobAPITest {
Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobAPI.buildPipelineDataConsistencyChecker(
jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0)).check("FIXTURE", null);
assertThat(checkResultMap.size(), is(1));
- String checkKey = "ds_0.t_order";
+ String checkKey = "t_order";
assertTrue(checkResultMap.get(checkKey).getCountCheckResult().isMatched());
assertThat(checkResultMap.get(checkKey).getCountCheckResult().getTargetRecordsCount(),
is(2L));
assertTrue(checkResultMap.get(checkKey).getContentCheckResult().isMatched());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 5821727cf9c..f4e849174e1 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -65,7 +65,7 @@ class MigrationDataConsistencyCheckerTest {
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(),
0, "");
Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null);
- String checkKey = "ds_0.t_order";
+ String checkKey = "t_order";
assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(),
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get(checkKey).getContentCheckResult().isMatched());