This is an automated email from the ASF dual-hosted git repository. azexin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new bf25028149e Review and improve pipeline code (#30341) bf25028149e is described below commit bf25028149e666dff1dbbb9007fe8ff4362ca325 Author: Hongsheng Zhong <zhonghongsh...@apache.org> AuthorDate: Thu Feb 29 09:45:59 2024 +0800 Review and improve pipeline code (#30341) * Merge DataRecord base fields in DataRecordGroupEngine * Persist job item progress after it completed * Improve inventory check break for migration --- .../ingest/record/group/DataRecordGroupEngine.java | 8 ++++++++ .../core/job/AbstractSeparablePipelineJob.java | 2 ++ .../record/group/DataRecordGroupEngineTest.java | 20 ++++++++++++++++++++ .../consistency/MigrationDataConsistencyChecker.java | 12 ++++++++---- 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java index 82624900af8..4e2e5b578de 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java @@ -120,6 +120,7 @@ public final class DataRecordGroupEngine { () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord)); if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) { DataRecord mergedDataRecord = new DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), dataRecord.getPosition(), dataRecord.getColumnCount()); + mergeBaseFields(dataRecord, mergedDataRecord); for (int i = 0; i < dataRecord.getColumnCount(); i++) { mergedDataRecord.addColumn(new Column(dataRecord.getColumn(i).getName(), dataRecord.getColumn(i).isUniqueKey() ? beforeDataRecord.getColumn(i).getOldValue() : beforeDataRecord.getColumn(i).getValue(), @@ -132,6 +133,12 @@ public final class DataRecordGroupEngine { } } + private void mergeBaseFields(final DataRecord sourceRecord, final DataRecord targetRecord) { + targetRecord.setActualTableName(sourceRecord.getActualTableName()); + targetRecord.setCsn(sourceRecord.getCsn()); + targetRecord.setCommitTime(sourceRecord.getCommitTime()); + } + private boolean isUniqueKeyUpdated(final DataRecord dataRecord) { // TODO Compatible with multiple unique indexes for (Column each : dataRecord.getColumns()) { @@ -144,6 +151,7 @@ public final class DataRecordGroupEngine { private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type, final String tableName, final DataRecord preDataRecord, final DataRecord curDataRecord) { DataRecord result = new DataRecord(type, tableName, curDataRecord.getPosition(), curDataRecord.getColumnCount()); + mergeBaseFields(curDataRecord, result); for (int i = 0; i < curDataRecord.getColumnCount(); i++) { result.addColumn(new Column( curDataRecord.getColumn(i).getName(), diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 855bd8ed717..b8ae593d3e4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils; +import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; @@ -97,6 +98,7 @@ public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfigur } } finally { if (started) { + PipelineJobProgressPersistService.persistNow(jobId, shardingItem); jobRunnerManager.getTasksRunner(shardingItem).ifPresent(PipelineTasksRunner::stop); } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java index 504143998c0..9bc7940b582 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java @@ -71,6 +71,8 @@ class DataRecordGroupEngineTest { DataRecord dataRecord = actual.iterator().next(); assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT)); assertThat(dataRecord.getTableName(), is("order")); + assertThat(dataRecord.getActualTableName(), is("order_0")); + assertThat(dataRecord.getCommitTime(), is(456L)); assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 1, true, true)); assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", null, 10, true, false)); assertColumnsMatched(dataRecord.getColumn(2), new Column("total_price", null, 200, true, false)); @@ -93,6 +95,8 @@ class DataRecordGroupEngineTest { DataRecord dataRecord = actual.iterator().next(); assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT)); assertThat(dataRecord.getTableName(), is("order")); + assertThat(dataRecord.getActualTableName(), is("order_0")); + assertThat(dataRecord.getCommitTime(), is(456L)); assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 2, true, true)); assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", null, 10, true, false)); assertColumnsMatched(dataRecord.getColumn(2), new Column("total_price", null, 50, true, false)); @@ -107,6 +111,8 @@ class DataRecordGroupEngineTest { DataRecord dataRecord = actual.iterator().next(); assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE)); assertThat(dataRecord.getTableName(), is("order")); + assertThat(dataRecord.getActualTableName(), is("order_0")); + assertThat(dataRecord.getCommitTime(), is(456L)); assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 1, false, true)); assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 10, 10, false, false)); assertColumnsMatched(dataRecord.getColumn(2), new Column("total_price", 50, 200, true, false)); @@ -121,6 +127,8 @@ class DataRecordGroupEngineTest { DataRecord dataRecord = actual.iterator().next(); assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE)); assertThat(dataRecord.getTableName(), is("order")); + assertThat(dataRecord.getActualTableName(), is("order_0")); + assertThat(dataRecord.getCommitTime(), is(456L)); assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, true, true)); assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 10, 10, false, false)); assertColumnsMatched(dataRecord.getColumn(2), new Column("total_price", 50, 200, true, false)); @@ -135,6 +143,8 @@ class DataRecordGroupEngineTest { DataRecord dataRecord = actual.iterator().next(); assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE)); assertThat(dataRecord.getTableName(), is("order")); + assertThat(dataRecord.getActualTableName(), is("order_0")); + assertThat(dataRecord.getCommitTime(), is(456L)); assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, true, true)); assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 10, 10, false, false)); assertColumnsMatched(dataRecord.getColumn(2), new Column("total_price", 50, 200, true, false)); @@ -149,6 +159,8 @@ class DataRecordGroupEngineTest { DataRecord dataRecord = actual.iterator().next(); assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE)); assertThat(dataRecord.getTableName(), is("order")); + assertThat(dataRecord.getActualTableName(), is("order_0")); + assertThat(dataRecord.getCommitTime(), is(456L)); assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 3, true, true)); assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 10, 10, false, false)); assertColumnsMatched(dataRecord.getColumn(2), new Column("total_price", 50, 50, false, false)); @@ -188,6 +200,8 @@ class DataRecordGroupEngineTest { DataRecord dataRecord = actual.iterator().next(); assertThat(dataRecord.getType(), is(PipelineSQLOperationType.DELETE)); assertThat(dataRecord.getTableName(), is("order")); + assertThat(dataRecord.getActualTableName(), is("order_0")); + assertThat(dataRecord.getCommitTime(), is(789L)); assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, null, true, true)); assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 10, null, true, false)); assertColumnsMatched(dataRecord.getColumn(2), new Column("total_price", 50, null, true, false)); @@ -230,6 +244,8 @@ class DataRecordGroupEngineTest { private DataRecord mockInsertDataRecord(final String tableName, final int id, final int userId, final int totalPrice) { DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, tableName, new IngestPlaceholderPosition(), 3); + result.setActualTableName("order_0"); + result.setCommitTime(123L); result.addColumn(new Column("id", id, true, true)); result.addColumn(new Column("user_id", userId, true, false)); result.addColumn(new Column("total_price", totalPrice, true, false)); @@ -250,6 +266,8 @@ class DataRecordGroupEngineTest { private DataRecord mockUpdateDataRecord(final String tableName, final Integer oldId, final int id, final int userId, final int totalPrice) { DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE, tableName, new IngestPlaceholderPosition(), 3); + result.setActualTableName("order_0"); + result.setCommitTime(456L); result.addColumn(new Column("id", oldId, id, !Objects.deepEquals(oldId, id), true)); result.addColumn(new Column("user_id", userId, userId, false, false)); result.addColumn(new Column("total_price", 50, totalPrice, 50 != totalPrice, false)); @@ -262,6 +280,8 @@ class DataRecordGroupEngineTest { private DataRecord mockDeleteDataRecord(final String tableName, final int id, final int userId, final int totalPrice) { DataRecord result = new DataRecord(PipelineSQLOperationType.DELETE, tableName, new IngestPlaceholderPosition(), 3); + result.setActualTableName("order_0"); + result.setCommitTime(789L); result.addColumn(new Column("id", id, null, true, true)); result.addColumn(new Column("user_id", userId, null, true, false)); result.addColumn(new Column("total_price", totalPrice, null, true, false)); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index 2d9a57595fd..767d856c728 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -95,7 +95,9 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); TableDataConsistencyChecker tableChecker = TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) { for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) { - checkTableInventoryData(each, tableChecker, result, dataSourceManager); + if (checkTableInventoryDataUnmatchedAndBreak(each, tableChecker, result, dataSourceManager)) { + return result.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), Entry::getValue)); + } } } return result.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), Entry::getValue)); @@ -106,18 +108,20 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum(); } - private void checkTableInventoryData(final JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker, - final Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> checkResultMap, final PipelineDataSourceManager dataSourceManager) { + private boolean checkTableInventoryDataUnmatchedAndBreak(final JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker, + final Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> checkResultMap, + final PipelineDataSourceManager dataSourceManager) { for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) { for (DataNode each : entry.getDataNodes()) { TableDataConsistencyCheckResult checkResult = checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, dataSourceManager); checkResultMap.put(new CaseInsensitiveQualifiedTable(each.getSchemaName(), each.getTableName()), checkResult); if (!checkResult.isMatched() && tableChecker.isBreakOnInventoryCheckNotMatched()) { log.info("Unmatched on table '{}', ignore left tables", DataNodeUtils.formatWithSchema(each)); - return; + return true; } } } + return false; } private TableDataConsistencyCheckResult checkSingleTableInventoryData(final String targetTableName, final DataNode dataNode,