This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 216c0d715bd Clean pipeline code (#32720)
216c0d715bd is described below

commit 216c0d715bd3df57b9ce9917c1dce3a4aa8605a9
Author: Hongsheng Zhong <zhonghongsh...@apache.org>
AuthorDate: Thu Aug 29 14:03:26 2024 +0800

    Clean pipeline code (#32720)
    
    * Clean TODO
    
    * Remove unused JobStatus enums
    
    * Clean TODO
    
    * Refactor RecordSingleTableInventoryCalculator
    
    * Remove useless unit test
    
    * Enable IncrementalTaskTest
---
 .../table/MatchingTableInventoryChecker.java        |  7 +------
 .../CRC32SingleTableInventoryCalculator.java        |  1 -
 .../RecordSingleTableInventoryCalculator.java       |  5 ++---
 .../ShardingSpherePipelineDataSourceCreator.java    |  1 -
 .../core/importer/ImporterConfiguration.java        |  2 --
 .../data/pipeline/core/job/JobStatus.java           | 19 +------------------
 .../YamlJobItemIncrementalTasksProgressSwapper.java |  1 -
 .../core/listener/PipelineElasticJobListener.java   |  2 +-
 .../core/metadata/model/PipelineTableMetaData.java  |  1 -
 .../splitter/InventoryDumperContextSplitter.java    |  1 -
 .../PipelineDataConsistencyCalculateSQLBuilder.java |  1 -
 .../YamlTransmissionJobItemProgressSwapperTest.java | 14 --------------
 .../src/test/resources/job-progress-failure.yaml    | 21 ---------------------
 .../dumper/PostgreSQLIncrementalDumper.java         |  1 -
 .../distsql/statement/pojo/SourceTargetEntry.java   |  1 -
 .../pipeline/core/task/IncrementalTaskTest.java     |  6 ++----
 16 files changed, 7 insertions(+), 77 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index f9faf20e2ce..ddf99a8eea1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -61,7 +61,7 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     
     @Override
     public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
-        ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + 
"-matching-check-%d");
+        ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build(param.getJobId() + "-matching-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
             return checkSingleTableInventoryData(param, executor);
@@ -126,11 +126,6 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
         return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
     }
     
-    // TODO use digest (crc32, murmurhash)
-    private String getJobIdDigest(final String jobId) {
-        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
-    }
-    
     private <T> T waitFuture(final Future<T> future) {
         try {
             return future.get();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
index 455fca98844..e967f236345 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
@@ -113,7 +113,6 @@ public final class CRC32SingleTableInventoryCalculator 
extends AbstractSingleTab
             return result;
         }
         
-        // TODO not support now
         @Override
         public Optional<Object> getMaxUniqueKeyValue() {
             return Optional.empty();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 4a37f45b45d..010ea7ba8da 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -221,11 +221,10 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         return Optional.of(new 
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records));
     }
     
-    private SingleTableInventoryCalculateParameter 
buildNewCalculateParameter(final SingleTableInventoryCalculateParameter param, 
final Object maxUniqueKeyValue) {
+    private SingleTableInventoryCalculateParameter 
buildNewCalculateParameter(final SingleTableInventoryCalculateParameter param, 
final Object minUniqueKeyValue) {
         SingleTableInventoryCalculateParameter result = new 
SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(), 
param.getColumnNames(),
                 Collections.singletonList(param.getFirstUniqueKey()), 
QueryType.POINT_QUERY);
-        
result.setUniqueKeysValues(Collections.singletonList(maxUniqueKeyValue));
-        result.setQueryRange(param.getQueryRange());
+        
result.setUniqueKeysValues(Collections.singletonList(minUniqueKeyValue));
         result.setShardingColumnsNames(param.getShardingColumnsNames());
         result.setShardingColumnsValues(param.getShardingColumnsValues());
         return result;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index 05e7d7a008c..033a2e4af0d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -81,7 +81,6 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
     private void updateConfigurationProperties(final YamlRootConfiguration 
yamlRootConfig) {
         Properties newProps = new Properties();
         
newProps.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(),
 String.valueOf(Boolean.FALSE));
-        // TODO Another way is improving ExecuteQueryCallback.executeSQL to 
enable streaming query, then remove it
         // Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY, 
make sure streaming query work.
         
newProps.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 
100000);
         yamlRootConfig.setProps(newProps);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index a8576f0789b..58cfd0e3909 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -45,7 +45,6 @@ public final class ImporterConfiguration {
     
     private final PipelineDataSourceConfiguration dataSourceConfig;
     
-    // TODO columnName case-insensitive?
     private final Map<CaseInsensitiveIdentifier, Set<String>> 
shardingColumnsMap;
     
     private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
@@ -56,7 +55,6 @@ public final class ImporterConfiguration {
     
     private final int retryTimes;
     
-    // TODO Remove concurrency
     private final int concurrency;
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java
index 8aeca6cb7e9..26ea68cce90 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobStatus.java
@@ -47,7 +47,6 @@ public enum JobStatus {
      */
     EXECUTE_INCREMENTAL_TASK(true),
     
-    // TODO rename to SUCCESS
     /**
      * Job is finished.
      */
@@ -56,23 +55,7 @@ public enum JobStatus {
     /**
      * Consistency check job execute failed.
      */
-    CONSISTENCY_CHECK_FAILURE(false),
+    CONSISTENCY_CHECK_FAILURE(false);
     
-    /**
-     * Task has stopped by failing to prepare work.
-     */
-    PREPARING_FAILURE(false),
-    
-    /**
-     * Task has stopped by failing to execute inventory task.
-     */
-    EXECUTE_INVENTORY_TASK_FAILURE(false),
-    
-    /**
-     * Task has stopped by failing to execute incremental task.
-     */
-    EXECUTE_INCREMENTAL_TASK_FAILURE(false);
-    
-    // TODO Remove unused field; Remove unused enum
     private final boolean running;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
index 40cca25094c..6e45671257a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -61,7 +61,6 @@ public final class YamlJobItemIncrementalTasksProgressSwapper 
{
         if (null == yamlProgress) {
             return new JobItemIncrementalTasksProgress(null);
         }
-        // TODO consider to remove parameter databaseType
         DialectIncrementalPositionManager positionInitializer = 
DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, 
TypedSPILoader.getService(DatabaseType.class, databaseType));
         IncrementalTaskProgress taskProgress = new 
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
         taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
index 7861b576f78..9884afda10a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 @Slf4j
 public final class PipelineElasticJobListener implements ElasticJobListener {
     
-    // TODO ElasticJobListenerFactory.createListener return new class 
instance, it's the reason why static variables
+    // ElasticJobListenerFactory.createListener return new class instance, 
it's why static variable is used
     private static final Map<String, Long> RUNNING_JOBS = new 
ConcurrentHashMap<>();
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index e627462ca23..f108aa3469a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -76,7 +76,6 @@ public final class PipelineTableMetaData {
      * @param columnIndex the first column is 1, the second is 2, ...
      * @return column meta data
      */
-    // TODO Remove it. Get column meta data by column name for incremental 
dumper, since columns ordering might be changed.
     public PipelineColumnMetaData getColumnMetaData(final int columnIndex) {
         return getColumnMetaData(columnNames.get(columnIndex - 1));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 651b8da2380..6be5240a655 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -126,7 +126,6 @@ public final class InventoryDumperContextSplitter {
                 return 
InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(tableRecordsCount,
 uniqueKeyValuesRange, shardingSize);
             }
             if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
-                // TODO Support string unique key table splitting. Ascii 
characters ordering are different in different versions of databases.
                 return Collections.singleton(new 
StringPrimaryKeyIngestPosition(null, null));
             }
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index 4064502548d..d03949ae0f3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -112,7 +112,6 @@ public final class 
PipelineDataConsistencyCalculateSQLBuilder {
         if (null == shardingColumnsNames || shardingColumnsNames.isEmpty()) {
             return uniqueKeys;
         }
-        // TODO Avoid new list creation
         List<String> result = new ArrayList<>(uniqueKeys.size() + 
shardingColumnsNames.size());
         result.addAll(uniqueKeys);
         result.addAll(shardingColumnsNames);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java
index 12884e9188d..695e49d4949 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapperTest.java
@@ -53,20 +53,6 @@ class YamlTransmissionJobItemProgressSwapperTest {
         assertThat(YamlEngine.marshal(actual), 
is(YamlEngine.marshal(yamlProgress)));
     }
     
-    @Test
-    void assertSwapWithoutTransmission() {
-        YamlTransmissionJobItemProgress yamlProgress = 
YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-failure.yaml"),
 YamlTransmissionJobItemProgress.class);
-        TransmissionJobItemProgress progress = 
SWAPPER.swapToObject(yamlProgress);
-        assertNotNull(progress.getInventory());
-        assertNotNull(progress.getIncremental());
-        assertThat(progress.getDataSourceName(), is("ds_0"));
-        
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), 
is(0L));
-        YamlTransmissionJobItemProgress actual = 
SWAPPER.swapToYamlConfiguration(progress);
-        assertNotNull(actual.getInventory());
-        assertNotNull(actual.getIncremental());
-        assertThat(YamlEngine.marshal(actual), 
is(YamlEngine.marshal(yamlProgress)));
-    }
-    
     @Test
     void assertSwapWithRunningConfig() {
         YamlTransmissionJobItemProgress yamlProgress = 
YamlEngine.unmarshal(ConfigurationFileUtils.readFile("job-progress-running.yaml"),
 YamlTransmissionJobItemProgress.class);
diff --git 
a/kernel/data-pipeline/core/src/test/resources/job-progress-failure.yaml 
b/kernel/data-pipeline/core/src/test/resources/job-progress-failure.yaml
deleted file mode 100644
index e3fc461f8e0..00000000000
--- a/kernel/data-pipeline/core/src/test/resources/job-progress-failure.yaml
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-dataSourceName: ds_0
-sourceDatabaseType: H2
-status: PREPARING_FAILURE
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java
index 073370c9618..60c486304c2 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumper.java
@@ -108,7 +108,6 @@ public final class PostgreSQLIncrementalDumper extends 
AbstractPipelineLifecycle
     
     @SneakyThrows(InterruptedException.class)
     private void dump() throws SQLException {
-        // TODO use unified PgConnection
         try (
                 Connection connection = 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig());
                 PGReplicationStream stream = 
logicalReplication.createReplicationStream(
diff --git 
a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java
 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java
index 8aef5d39d34..a5948354af0 100644
--- 
a/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java
+++ 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/statement/pojo/SourceTargetEntry.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
 @EqualsAndHashCode(of = {"source", "targetTableName"})
 public final class SourceTargetEntry {
     
-    // TODO Remove targetDatabaseName, or keep it but rebuild it
     private final String targetDatabaseName;
     
     private final DataNode source;
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
index dff879eb38d..ca2dd41c980 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
@@ -17,18 +17,17 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.task;
 
+import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -65,7 +64,6 @@ class IncrementalTaskTest {
     }
     
     @Test
-    @Disabled("H2 doesn't support incremental")
     void assertStart() throws ExecutionException, InterruptedException, 
TimeoutException {
         CompletableFuture.allOf(incrementalTask.start().toArray(new 
CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
         assertThat(incrementalTask.getTaskId(), is("ds_0"));

Reply via email to