This is an automated email from the ASF dual-hosted git repository. menghaoran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 216c0d715bd Clean pipeline code (#32720) 216c0d715bd is described below commit 216c0d715bd3df57b9ce9917c1dce3a4aa8605a9 Author: Hongsheng Zhong <zhonghongsh...@apache.org> AuthorDate: Thu Aug 29 14:03:26 2024 +0800 Clean pipeline code (#32720) * Clean TODO * Remove unused JobStatus enums * Clean TODO * Refactor RecordSingleTableInventoryCalculator * Remove useless unit test * Enable IncrementalTaskTest --- .../table/MatchingTableInventoryChecker.java | 7 +------ .../CRC32SingleTableInventoryCalculator.java | 1 - .../RecordSingleTableInventoryCalculator.java | 5 ++--- .../ShardingSpherePipelineDataSourceCreator.java | 1 - .../core/importer/ImporterConfiguration.java | 2 -- .../data/pipeline/core/job/JobStatus.java | 19 +------------------ .../YamlJobItemIncrementalTasksProgressSwapper.java | 1 - .../core/listener/PipelineElasticJobListener.java | 2 +- .../core/metadata/model/PipelineTableMetaData.java | 1 - .../splitter/InventoryDumperContextSplitter.java | 1 - .../PipelineDataConsistencyCalculateSQLBuilder.java | 1 - .../YamlTransmissionJobItemProgressSwapperTest.java | 14 -------------- .../src/test/resources/job-progress-failure.yaml | 21 --------------------- .../dumper/PostgreSQLIncrementalDumper.java | 1 - .../distsql/statement/pojo/SourceTargetEntry.java | 1 - .../pipeline/core/task/IncrementalTaskTest.java | 6 ++---- 16 files changed, 7 insertions(+), 77 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java index f9faf20e2ce..ddf99a8eea1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java @@ -61,7 +61,7 @@ public abstract class MatchingTableInventoryChecker implements TableInventoryChe @Override public TableDataConsistencyCheckResult checkSingleTableInventoryData() { - ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + "-matching-check-%d"); + ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build(param.getJobId() + "-matching-check-%d"); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory); try { return checkSingleTableInventoryData(param, executor); @@ -126,11 +126,6 @@ public abstract class MatchingTableInventoryChecker implements TableInventoryChe return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult); } - // TODO use digest (crc32, murmurhash) - private String getJobIdDigest(final String jobId) { - return jobId.length() <= 6 ? jobId : jobId.substring(0, 6); - } - private <T> T waitFuture(final Future<T> future) { try { return future.get(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java index 455fca98844..e967f236345 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java @@ -113,7 +113,6 @@ public final class CRC32SingleTableInventoryCalculator extends AbstractSingleTab return result; } - // TODO not support now @Override public Optional<Object> getMaxUniqueKeyValue() { return Optional.empty(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java index 4a37f45b45d..010ea7ba8da 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java @@ -221,11 +221,10 @@ public final class RecordSingleTableInventoryCalculator extends AbstractStreamin return Optional.of(new RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records)); } - private SingleTableInventoryCalculateParameter buildNewCalculateParameter(final SingleTableInventoryCalculateParameter param, final Object maxUniqueKeyValue) { + private SingleTableInventoryCalculateParameter buildNewCalculateParameter(final SingleTableInventoryCalculateParameter param, final Object minUniqueKeyValue) { SingleTableInventoryCalculateParameter result = new SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(), param.getColumnNames(), Collections.singletonList(param.getFirstUniqueKey()), QueryType.POINT_QUERY); - result.setUniqueKeysValues(Collections.singletonList(maxUniqueKeyValue)); - result.setQueryRange(param.getQueryRange()); + result.setUniqueKeysValues(Collections.singletonList(minUniqueKeyValue)); result.setShardingColumnsNames(param.getShardingColumnsNames()); result.setShardingColumnsValues(param.getShardingColumnsValues()); return result; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java index 05e7d7a008c..033a2e4af0d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java @@ -81,7 +81,6 @@ public final class ShardingSpherePipelineDataSourceCreator implements PipelineDa private void updateConfigurationProperties(final YamlRootConfiguration yamlRootConfig) { Properties newProps = new Properties(); newProps.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(), String.valueOf(Boolean.FALSE)); - // TODO Another way is improving ExecuteQueryCallback.executeSQL to enable streaming query, then remove it // Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY, make sure streaming query work. newProps.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 100000); yamlRootConfig.setProps(newProps); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java index a8576f0789b..58cfd0e3909 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java @@ -45,7 +45,6 @@ public final class ImporterConfiguration { private final PipelineDataSourceConfiguration dataSourceConfig; - // TODO columnName case-insensitive? private final Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap; private final TableAndSchemaNameMapper tableAndSchemaNameMapper; @@ -56,7 +55,6 @@ public final class ImporterConfiguration { private final int retryTimes; - // TODO Remove concurrency private final int concurrency; /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java index 8aeca6cb7e9..26ea68cce90 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java @@ -47,7 +47,6 @@ public enum JobStatus { */ EXECUTE_INCREMENTAL_TASK(true), - // TODO rename to SUCCESS /** * Job is finished. */ @@ -56,23 +55,7 @@ public enum JobStatus { /** * Consistency check job execute failed. */ - CONSISTENCY_CHECK_FAILURE(false), + CONSISTENCY_CHECK_FAILURE(false); - /** - * Task has stopped by failing to prepare work. - */ - PREPARING_FAILURE(false), - - /** - * Task has stopped by failing to execute inventory task. - */ - EXECUTE_INVENTORY_TASK_FAILURE(false), - - /** - * Task has stopped by failing to execute incremental task. - */ - EXECUTE_INCREMENTAL_TASK_FAILURE(false); - - // TODO Remove unused field; Remove unused enum private final boolean running; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java index 40cca25094c..6e45671257a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java @@ -61,7 +61,6 @@ public final class YamlJobItemIncrementalTasksProgressSwapper { if (null == yamlProgress) { return new JobItemIncrementalTasksProgress(null); } - // TODO consider to remove parameter databaseType DialectIncrementalPositionManager positionInitializer = DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, TypedSPILoader.getService(DatabaseType.class, databaseType)); IncrementalTaskProgress taskProgress = new IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition())); taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay()); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java index 7861b576f78..9884afda10a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java @@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public final class PipelineElasticJobListener implements ElasticJobListener { - // TODO ElasticJobListenerFactory.createListener return new class instance, it's the reason why static variables + // ElasticJobListenerFactory.createListener return new class instance, it's why static variable is used private static final Map<String, Long> RUNNING_JOBS = new ConcurrentHashMap<>(); @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java index e627462ca23..f108aa3469a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java @@ -76,7 +76,6 @@ public final class PipelineTableMetaData { * @param columnIndex the first column is 1, the second is 2, ... * @return column meta data */ - // TODO Remove it. Get column meta data by column name for incremental dumper, since columns ordering might be changed. public PipelineColumnMetaData getColumnMetaData(final int columnIndex) { return getColumnMetaData(columnNames.get(columnIndex - 1)); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java index 651b8da2380..6be5240a655 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java @@ -126,7 +126,6 @@ public final class InventoryDumperContextSplitter { return InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(tableRecordsCount, uniqueKeyValuesRange, shardingSize); } if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) { - // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases. return Collections.singleton(new StringPrimaryKeyIngestPosition(null, null)); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java index 4064502548d..d03949ae0f3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java @@ -112,7 +112,6 @@ public final class PipelineDataConsistencyCalculateSQLBuilder { if (null == shardingColumnsNames || shardingColumnsNames.isEmpty()) { return uniqueKeys; } - // TODO Avoid new list creation List<String> result = new ArrayList<>(uniqueKeys.size() + shardingColumnsNames.size()); result.addAll(uniqueKeys); result.addAll(shardingColumnsNames); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java index 12884e9188d..695e49d4949 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java @@ -53,20 +53,6 @@ class YamlTransmissionJobItemProgressSwapperTest { assertThat(YamlEngine.marshal(actual), is(YamlEngine.marshal(yamlProgress))); } - @Test - void assertSwapWithoutTransmission() { - YamlTransmissionJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-failure.yaml"), YamlTransmissionJobItemProgress.class); - TransmissionJobItemProgress progress = SWAPPER.swapToObject(yamlProgress); - assertNotNull(progress.getInventory()); - assertNotNull(progress.getIncremental()); - assertThat(progress.getDataSourceName(), is("ds_0")); - assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L)); - YamlTransmissionJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress); - assertNotNull(actual.getInventory()); - assertNotNull(actual.getIncremental()); - assertThat(YamlEngine.marshal(actual), is(YamlEngine.marshal(yamlProgress))); - } - @Test void assertSwapWithRunningConfig() { YamlTransmissionJobItemProgress yamlProgress = YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-running.yaml"), YamlTransmissionJobItemProgress.class); diff --git a/kernel/data-pipeline/core/src/test/resources/job-progress-failure.yaml b/kernel/data-pipeline/core/src/test/resources/job-progress-failure.yaml deleted file mode 100644 index e3fc461f8e0..00000000000 --- a/kernel/data-pipeline/core/src/test/resources/job-progress-failure.yaml +++ /dev/null @@ -1,21 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# - -dataSourceName: ds_0 -sourceDatabaseType: H2 -status: PREPARING_FAILURE diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java index 073370c9618..60c486304c2 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java @@ -108,7 +108,6 @@ public final class PostgreSQLIncrementalDumper extends AbstractPipelineLifecycle @SneakyThrows(InterruptedException.class) private void dump() throws SQLException { - // TODO use unified PgConnection try ( Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()); PGReplicationStream stream = logicalReplication.createReplicationStream( diff --git a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java index 8aef5d39d34..a5948354af0 100644 --- a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java +++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java @@ -30,7 +30,6 @@ import org.apache.shardingsphere.infra.datanode.DataNode; @EqualsAndHashCode(of = {"source", "targetTableName"}) public final class SourceTargetEntry { - // TODO Remove targetDatabaseName, or keep it but rebuild it private final String targetDatabaseName; private final DataNode source; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java index dff879eb38d..ca2dd41c980 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java @@ -17,18 +17,17 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.task; +import org.apache.shardingsphere.data.pipeline.core.importer.Importer; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition; -import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; -import org.apache.shardingsphere.data.pipeline.core.importer.Importer; import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; +import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder; import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -65,7 +64,6 @@ class IncrementalTaskTest { } @Test - @Disabled("H2 doesn't support incremental") void assertStart() throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture.allOf(incrementalTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS); assertThat(incrementalTask.getTaskId(), is("ds_0"));