This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 24ca9fecc0e Review and improve pipeline code (#32768)
24ca9fecc0e is described below

commit 24ca9fecc0ec48c47f13cd63411fd3cb3238cf23
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Sep 2 15:46:34 2024 +0800

    Review and improve pipeline code (#32768)
    
    * Compatible with timestamp millisecond precision
    
    * Transfer partial proxy props into ShardingSphereDataSource for dynamic 
change
    
    * Improve InventoryRecordsCountCalculator.getEstimatedCount for branch 
database
    
    * Add MigrationJobConfiguration.getJobDataNodeLine
    
    * Add CDCJobId.sinkType
    
    * Replace getProcessedRecordsCount to getInventoryRecordsCount as data 
consistency check initial records count
    
    * Update unit test
---
 .../infra/util/DatabaseTypeUtils.java              | 35 ++++++++++------------
 .../DataConsistencyCheckUtils.java                 |  5 ++++
 .../ShardingSpherePipelineDataSourceCreator.java   |  7 +++++
 .../InventoryRecordsCountCalculator.java           | 11 +++----
 .../ConsistencyCheckDataBuilder.java               |  2 +-
 .../DataConsistencyCheckUtilsTest.java             | 10 +++++++
 .../shardingsphere/data/pipeline/cdc/CDCJobId.java |  3 ++
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  2 +-
 .../data/pipeline/cdc/CDCJobIdTest.java            |  3 +-
 .../migration/MigrationJobExecutorCallback.java    |  2 +-
 .../MigrationDataConsistencyChecker.java           |  2 +-
 .../config/MigrationJobConfiguration.java          | 12 +++++++-
 12 files changed, 64 insertions(+), 30 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/util/DatabaseTypeUtils.java
similarity index 54%
copy from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
copy to 
infra/common/src/main/java/org/apache/shardingsphere/infra/util/DatabaseTypeUtils.java
index 9ad7b418237..a6cc70e5c92 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/util/DatabaseTypeUtils.java
@@ -15,28 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc;
+package org.apache.shardingsphere.infra.util;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-
-import java.util.List;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 /**
- * CDC job id.
+ * Database type utility class.
  */
-@RequiredArgsConstructor
-@Getter
-public final class CDCJobId implements PipelineJobId {
-    
-    private final PipelineJobType jobType = new CDCJobType();
-    
-    private final PipelineContextKey contextKey;
-    
-    private final List<String> schemaTableNames;
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DatabaseTypeUtils {
     
-    private final boolean full;
+    /**
+     * Get trunk database type.
+     *
+     * @param databaseType database type
+     * @return trunk database type
+     */
+    public static DatabaseType getTrunkDatabaseType(final DatabaseType 
databaseType) {
+        return databaseType.getTrunkDatabaseType().orElse(databaseType);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
index d2f985dbe6c..2936e34cfa1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
@@ -30,6 +30,7 @@ import java.math.RoundingMode;
 import java.sql.Array;
 import java.sql.SQLException;
 import java.sql.SQLXML;
+import java.sql.Timestamp;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
@@ -85,6 +86,10 @@ public final class DataConsistencyCheckUtils {
         if (thisColumnValue instanceof SQLXML && thatColumnValue instanceof 
SQLXML) {
             return ((SQLXML) thisColumnValue).getString().equals(((SQLXML) 
thatColumnValue).getString());
         }
+        // TODO Use different match strategy for heterogeneous database or not
+        if (thisColumnValue instanceof Timestamp && thatColumnValue instanceof 
Timestamp) {
+            return ((Timestamp) thisColumnValue).getTime() / 1000L * 1000L == 
((Timestamp) thatColumnValue).getTime() / 1000L * 1000L;
+        }
         if (thisColumnValue instanceof Array && thatColumnValue instanceof 
Array) {
             return Objects.deepEquals(((Array) thisColumnValue).getArray(), 
((Array) thatColumnValue).getArray());
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index a1abbc4a274..8be926d1002 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -42,6 +42,7 @@ import 
org.apache.shardingsphere.single.yaml.config.pojo.YamlSingleRuleConfigura
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -83,6 +84,12 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
     
     private void updateConfigurationProperties(final YamlRootConfiguration 
yamlRootConfig) {
         Properties newProps = new Properties();
+        for (String each : 
Arrays.asList(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(), 
ConfigurationPropertyKey.SYSTEM_LOG_LEVEL.getKey(), 
ConfigurationPropertyKey.SQL_SHOW.getKey())) {
+            Object value = yamlRootConfig.getProps().get(each);
+            if (null != value) {
+                newProps.put(each, value);
+            }
+        }
         
newProps.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(),
 String.valueOf(Boolean.FALSE));
         // Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY, 
make sure streaming query work.
         
newProps.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 
100000);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
index 5d48ea0ca0f..51f1809ca48 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryRecordsCountCalculator.java
@@ -20,14 +20,15 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.database.mariadb.type.MariaDBDatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.DatabaseTypeUtils;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -57,7 +58,7 @@ public final class InventoryRecordsCountCalculator {
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
         Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
         try {
-            if (sql.isPresent()) {
+            if (sql.isPresent() && 
dumperContext.getCommonContext().getDataSourceConfig() instanceof 
StandardPipelineDataSourceConfiguration) {
                 DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, 
dataSource.getDatabaseType().getType());
                 long result = getEstimatedCount(databaseType, dataSource, 
sql.get());
                 return result > 0L ? result : getCount(dataSource, 
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
@@ -73,7 +74,7 @@ public final class InventoryRecordsCountCalculator {
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(estimatedCountSQL)) {
-            if (databaseType instanceof MySQLDatabaseType || databaseType 
instanceof MariaDBDatabaseType) {
+            if (DatabaseTypeUtils.getTrunkDatabaseType(databaseType) 
instanceof MySQLDatabaseType) {
                 preparedStatement.setString(1, connection.getCatalog());
             }
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
index 8174bc01255..0219a17bd25 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
@@ -103,7 +103,7 @@ public final class ConsistencyCheckDataBuilder {
             return new Date(((Date) value).getTime() - 1L);
         }
         if (value instanceof Timestamp) {
-            return new Timestamp(((Timestamp) value).getTime() - 1L);
+            return new Timestamp(((Timestamp) value).getTime() - 1000L);
         }
         if (value instanceof int[]) {
             int[] result = ((int[]) value).clone();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
index 7e2de6daa18..a494d544d84 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtilsTest.java
@@ -21,7 +21,9 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class DataConsistencyCheckUtilsTest {
@@ -42,4 +44,12 @@ class DataConsistencyCheckUtilsTest {
         BigDecimal another = BigDecimal.valueOf(33220L, 2);
         assertTrue(DataConsistencyCheckUtils.isBigDecimalEquals(one, another));
     }
+    
+    @Test
+    void assertTimestampEquals() {
+        EqualsBuilder equalsBuilder = new EqualsBuilder();
+        long time = System.currentTimeMillis();
+        assertTrue(DataConsistencyCheckUtils.isMatched(equalsBuilder, new 
Timestamp(time), new Timestamp(time / 10L * 10L + 1L)));
+        assertFalse(DataConsistencyCheckUtils.isMatched(equalsBuilder, new 
Timestamp(time), new Timestamp(time + 1000L)));
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
index 9ad7b418237..c86734b2c2c 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
@@ -39,4 +40,6 @@ public final class CDCJobId implements PipelineJobId {
     private final List<String> schemaTableNames;
     
     private final boolean full;
+    
+    private final CDCSinkType sinkType;
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 2e9d6187cf8..483a07ced48 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -146,7 +146,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
         List<String> schemaTableNames = param.getSchemaTableNames();
         Collections.sort(schemaTableNames);
-        result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, 
schemaTableNames, param.isFull())));
+        result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, 
schemaTableNames, param.isFull(), sinkType)));
         result.setDatabaseName(param.getDatabaseName());
         result.setSchemaTableNames(schemaTableNames);
         result.setFull(param.isFull());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
index eeff0294f48..ba4fcc73857 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc;
 
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -31,7 +32,7 @@ class CDCJobIdTest {
     
     @Test
     void assertParseJobType() {
-        String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new 
PipelineContextKey("sharding_db", InstanceType.PROXY), 
Collections.singletonList("foo"), true));
+        String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new 
PipelineContextKey("sharding_db", InstanceType.PROXY), 
Collections.singletonList("foo"), true, CDCSinkType.SOCKET));
         assertThat(PipelineJobIdUtils.parseJobType(jobId), 
instanceOf(CDCJobType.class));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index 0d0b136c2fd..2fde01e7632 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -63,7 +63,7 @@ public final class MigrationJobExecutorCallback implements 
DistributedPipelineJo
     }
     
     private MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
-        IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+        IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobDataNodeLine(jobShardingItem));
         Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
         Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 767d856c728..2ed17f99198 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -105,7 +105,7 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     
     private long getRecordsCount() {
         Map<Integer, TransmissionJobItemProgress> jobProgress = new 
TransmissionJobManager(new MigrationJobType()).getJobProgress(jobConfig);
-        return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum();
+        return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getInventoryRecordsCount).sum();
     }
     
     private boolean checkTableInventoryDataUnmatchedAndBreak(final 
JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index f8eae8eb2d5..0a467418ba3 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -21,8 +21,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.List;
@@ -67,4 +67,14 @@ public final class MigrationJobConfiguration implements 
PipelineJobConfiguration
     public int getJobShardingCount() {
         return jobShardingDataNodes.size();
     }
+    
+    /**
+     * Get job data node line.
+     *
+     * @param jobShardingItem job sharding item
+     * @return job data node line
+     */
+    public JobDataNodeLine getJobDataNodeLine(final int jobShardingItem) {
+        return jobShardingDataNodes.get(jobShardingItem);
+    }
 }

Reply via email to