This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d1cb8458a4 Refactor PipelineJobType.getJobObjective() (#37112)
5d1cb8458a4 is described below

commit 5d1cb8458a4e1aaa1f97b029f4155a7eb42c4199
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 16:38:32 2025 +0800

    Refactor PipelineJobType.getJobObjective() (#37112)
---
 .../pipeline/core/job/service/PipelineJobManager.java |  5 ++++-
 .../core/job/service/TransmissionJobManager.java      | 11 ++++++-----
 .../data/pipeline/core/job/type/PipelineJobType.java  | 19 ++++++++++---------
 .../data/pipeline/core/pojo/PipelineJobInfo.java      |  4 +---
 ...PipelineJobInfo.java => PipelineJobObjective.java} |  6 ++----
 .../data/pipeline/core/job/type/FixtureJobType.java   |  8 ++++----
 .../shardingsphere/data/pipeline/cdc/CDCJobType.java  | 11 ++++-------
 .../handler/query/ShowStreamingListExecutor.java      |  2 +-
 .../consistencycheck/ConsistencyCheckJobType.java     |  8 ++++----
 .../pipeline/scenario/migration/MigrationJobType.java | 16 ++++++----------
 .../handler/query/ShowMigrationListExecutor.java      |  2 +-
 11 files changed, 43 insertions(+), 49 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index cb706f495ed..8ca96badf6d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class PipelineJobManager {
     
+    @SuppressWarnings("rawtypes")
     private final PipelineJobType jobType;
     
     /**
@@ -173,10 +174,12 @@ public final class PipelineJobManager {
      * @param contextKey context key
      * @return jobs info
      */
+    @SuppressWarnings("unchecked")
     public List<PipelineJobInfo> getJobInfos(final PipelineContextKey 
contextKey) {
         try {
             return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(this::isValidJob)
-                    .map(each -> jobType.getJobInfo(new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName()))))
+                    .map(each -> new PipelineJobInfo(new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName())),
+                            jobType.getJobObjective(new 
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(each.getJobName()))))
                     .collect(Collectors.toList());
         } catch (final UnsupportedOperationException ex) {
             return Collections.emptyList();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index d9f8277573a..086b2e86fa6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -24,8 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfig
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
 import 
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 
@@ -44,6 +43,7 @@ import java.util.stream.IntStream;
 @RequiredArgsConstructor
 public final class TransmissionJobManager {
     
+    @SuppressWarnings("rawtypes")
     private final PipelineJobType jobType;
     
     /**
@@ -52,22 +52,23 @@ public final class TransmissionJobManager {
      * @param jobId job ID
      * @return job item infos
      */
+    @SuppressWarnings("unchecked")
     public Collection<TransmissionJobItemInfo> getJobItemInfos(final String 
jobId) {
         PipelineJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, TransmissionJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<TransmissionJobItemInfo> result = new LinkedList<>();
-        PipelineJobInfo jobInfo = jobType.getJobInfo(new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)));
+        PipelineJobObjective jobObjective = jobType.getJobObjective(jobConfig);
         for (Entry<Integer, TransmissionJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             TransmissionJobItemProgress jobItemProgress = entry.getValue();
             String errorMessage = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().load(jobId,
 shardingItem);
             if (null == jobItemProgress) {
-                result.add(new TransmissionJobItemInfo(shardingItem, 
jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage));
+                result.add(new TransmissionJobItemInfo(shardingItem, 
jobObjective.getTableName(), null, startTimeMillis, 0, errorMessage));
                 continue;
             }
             int inventoryFinishedPercentage = 
getInventoryFinishedPercentage(jobItemProgress);
-            result.add(new TransmissionJobItemInfo(shardingItem, 
jobInfo.getTableName(), jobItemProgress, startTimeMillis, 
inventoryFinishedPercentage, errorMessage));
+            result.add(new TransmissionJobItemInfo(shardingItem, 
jobObjective.getTableName(), jobItemProgress, startTimeMillis, 
inventoryFinishedPercentage, errorMessage));
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index fedcb006f23..fc115063047 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -22,17 +22,18 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
 /**
  * Pipeline job type.
+ * 
+ * @param <T> type of pipeline job configuration
  */
 @SingletonSPI
 @JsonIgnoreType
-public interface PipelineJobType extends TypedSPI {
+public interface PipelineJobType<T extends PipelineJobConfiguration> extends 
TypedSPI {
     
     /**
      * Get pipeline job option.
@@ -42,23 +43,23 @@ public interface PipelineJobType extends TypedSPI {
     PipelineJobOption getOption();
     
     /**
-     * Get pipeline job info.
+     * Get pipeline job objective.
      *
-     * @param jobMetaData job meta data
-     * @return pipeline job info
+     * @param jobConfig pipeline job configuration
+     * @return pipeline pipeline job objective
      */
-    PipelineJobInfo getJobInfo(PipelineJobMetaData jobMetaData);
+    PipelineJobObjective getJobObjective(T jobConfig);
     
     /**
      * Build pipeline data consistency checker.
      *
-     * @param jobConfig job configuration
+     * @param jobConfig pipeline job configuration
      * @param processContext process context
      * @param progressContext consistency check job item progress context
      * @return all logic tables check result
      * @throws UnsupportedOperationException unsupported operation exception
      */
-    default PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig,
+    default PipelineDataConsistencyChecker buildDataConsistencyChecker(final T 
jobConfig,
                                                                        final 
TransmissionProcessContext processContext, final 
ConsistencyCheckJobItemProgressContext progressContext) {
         throw new UnsupportedOperationException("Build data consistency 
checker is not supported.");
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
index ea24e17b86c..7ed3c5d1fc3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
@@ -29,7 +29,5 @@ public final class PipelineJobInfo {
     
     private final PipelineJobMetaData jobMetaData;
     
-    private final String databaseName;
-    
-    private final String tableName;
+    private final PipelineJobObjective jobObjective;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobObjective.java
similarity index 90%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobObjective.java
index ea24e17b86c..5d1cdd83ee4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobObjective.java
@@ -21,13 +21,11 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 /**
- * Pipeline job info.
+ * Pipeline job objective.
  */
 @RequiredArgsConstructor
 @Getter
-public final class PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
+public final class PipelineJobObjective {
     
     private final String databaseName;
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
index c074a4a1ec1..efaad7bf5f9 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
@@ -17,13 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.type;
 
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
 
 /**
  * Fixture job type.
  */
-public final class FixtureJobType implements PipelineJobType {
+public final class FixtureJobType implements 
PipelineJobType<PipelineJobConfiguration> {
     
     @Override
     public PipelineJobOption getOption() {
@@ -31,7 +31,7 @@ public final class FixtureJobType implements PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
+    public PipelineJobObjective getJobObjective(final PipelineJobConfiguration 
jobConfig) {
         return null;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 78782ff27e3..2ce1ea04835 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -20,16 +20,14 @@ package org.apache.shardingsphere.data.pipeline.cdc;
 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
 
 /**
  * CDC job type.
  */
-public final class CDCJobType implements PipelineJobType {
+public final class CDCJobType implements PipelineJobType<CDCJobConfiguration> {
     
     @Override
     public PipelineJobOption getOption() {
@@ -37,9 +35,8 @@ public final class CDCJobType implements PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(getOption()).getJobConfiguration(jobMetaData.getJobId());
-        return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
+    public PipelineJobObjective getJobObjective(final CDCJobConfiguration 
jobConfig) {
+        return new PipelineJobObjective(jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
 
b/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index 7f283e75145..b23bf5de7da 100644
--- 
a/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++ 
b/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -45,7 +45,7 @@ public final class ShowStreamingListExecutor implements 
DistSQLQueryExecutor<Sho
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingListStatement sqlStatement, final ContextManager contextManager) {
         return pipelineJobManager.getJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
-                each.getDatabaseName(), each.getTableName(), 
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive(),
+                each.getJobObjective().getDatabaseName(), 
each.getJobObjective().getTableName(), each.getJobMetaData().getJobItemCount(), 
each.getJobMetaData().isActive(),
                 each.getJobMetaData().getCreateTime(), 
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 08061c42174..f33c104727b 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -17,17 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlConsistencyCheckJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.swapper.YamlConsistencyCheckJobConfigurationSwapper;
 
 /**
  * Consistency check job type.
  */
-public final class ConsistencyCheckJobType implements PipelineJobType {
+public final class ConsistencyCheckJobType implements 
PipelineJobType<PipelineJobConfiguration> {
     
     @Override
     public PipelineJobOption getOption() {
@@ -36,7 +36,7 @@ public final class ConsistencyCheckJobType implements 
PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
+    public PipelineJobObjective getJobObjective(final PipelineJobConfiguration 
jobConfig) {
         return null;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 966a299718c..6119d3cde43 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -20,13 +20,10 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
@@ -37,7 +34,7 @@ import java.util.LinkedList;
 /**
  * Migration job type.
  */
-public final class MigrationJobType implements PipelineJobType {
+public final class MigrationJobType implements 
PipelineJobType<MigrationJobConfiguration> {
     
     @Override
     public PipelineJobOption getOption() {
@@ -46,17 +43,16 @@ public final class MigrationJobType implements 
PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
-        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(getOption()).getJobConfiguration(jobMetaData.getJobId());
+    public PipelineJobObjective getJobObjective(final 
MigrationJobConfiguration jobConfig) {
         Collection<String> sourceTables = new LinkedList<>();
         jobConfig.getJobShardingDataNodes().forEach(each -> 
each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(dataNode.format()))));
-        return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
+        return new PipelineJobObjective(null, String.join(",", sourceTables));
     }
     
     @Override
-    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig,
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
MigrationJobConfiguration jobConfig,
                                                                       final 
TransmissionProcessContext processContext, final 
ConsistencyCheckJobItemProgressContext progressContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
jobConfig, processContext, progressContext);
+        return new MigrationDataConsistencyChecker(jobConfig, processContext, 
progressContext);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 76371697fb0..d75b42abd7a 100644
--- 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++ 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -53,7 +53,7 @@ public final class ShowMigrationListExecutor implements 
DistSQLQueryExecutor<Sho
     private LocalDataQueryResultRow getRow(final PipelineContextKey 
contextKey, final PipelineJobInfo jobInfo) {
         boolean active = jobInfo.getJobMetaData().isActive();
         String jobShardingNodes = active ? getJobShardingNodes(contextKey, 
jobInfo.getJobMetaData().getJobId()) : "";
-        return new 
LocalDataQueryResultRow(jobInfo.getJobMetaData().getJobId(), 
jobInfo.getTableName(), active, jobInfo.getJobMetaData().getCreateTime(),
+        return new 
LocalDataQueryResultRow(jobInfo.getJobMetaData().getJobId(), 
jobInfo.getJobObjective().getTableName(), active, 
jobInfo.getJobMetaData().getCreateTime(),
                 jobInfo.getJobMetaData().getStopTime(), 
jobInfo.getJobMetaData().getJobItemCount(), jobShardingNodes);
     }
     

Reply via email to