This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5d1cb8458a4 Refactor PipelineJobType.getJobObjective() (#37112)
5d1cb8458a4 is described below
commit 5d1cb8458a4e1aaa1f97b029f4155a7eb42c4199
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 16:38:32 2025 +0800
Refactor PipelineJobType.getJobObjective() (#37112)
---
.../pipeline/core/job/service/PipelineJobManager.java | 5 ++++-
.../core/job/service/TransmissionJobManager.java | 11 ++++++-----
.../data/pipeline/core/job/type/PipelineJobType.java | 19 ++++++++++---------
.../data/pipeline/core/pojo/PipelineJobInfo.java | 4 +---
...PipelineJobInfo.java => PipelineJobObjective.java} | 6 ++----
.../data/pipeline/core/job/type/FixtureJobType.java | 8 ++++----
.../shardingsphere/data/pipeline/cdc/CDCJobType.java | 11 ++++-------
.../handler/query/ShowStreamingListExecutor.java | 2 +-
.../consistencycheck/ConsistencyCheckJobType.java | 8 ++++----
.../pipeline/scenario/migration/MigrationJobType.java | 16 ++++++----------
.../handler/query/ShowMigrationListExecutor.java | 2 +-
11 files changed, 43 insertions(+), 49 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index cb706f495ed..8ca96badf6d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
@Slf4j
public final class PipelineJobManager {
+ @SuppressWarnings("rawtypes")
private final PipelineJobType jobType;
/**
@@ -173,10 +174,12 @@ public final class PipelineJobManager {
* @param contextKey context key
* @return jobs info
*/
+ @SuppressWarnings("unchecked")
public List<PipelineJobInfo> getJobInfos(final PipelineContextKey
contextKey) {
try {
return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(this::isValidJob)
- .map(each -> jobType.getJobInfo(new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName()))))
+ .map(each -> new PipelineJobInfo(new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName())),
+ jobType.getJobObjective(new
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(each.getJobName()))))
.collect(Collectors.toList());
} catch (final UnsupportedOperationException ex) {
return Collections.emptyList();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index d9f8277573a..086b2e86fa6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -24,8 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfig
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
import
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -44,6 +43,7 @@ import java.util.stream.IntStream;
@RequiredArgsConstructor
public final class TransmissionJobManager {
+ @SuppressWarnings("rawtypes")
private final PipelineJobType jobType;
/**
@@ -52,22 +52,23 @@ public final class TransmissionJobManager {
* @param jobId job ID
* @return job item infos
*/
+ @SuppressWarnings("unchecked")
public Collection<TransmissionJobItemInfo> getJobItemInfos(final String
jobId) {
PipelineJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<TransmissionJobItemInfo> result = new LinkedList<>();
- PipelineJobInfo jobInfo = jobType.getJobInfo(new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)));
+ PipelineJobObjective jobObjective = jobType.getJobObjective(jobConfig);
for (Entry<Integer, TransmissionJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TransmissionJobItemProgress jobItemProgress = entry.getValue();
String errorMessage =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().load(jobId,
shardingItem);
if (null == jobItemProgress) {
- result.add(new TransmissionJobItemInfo(shardingItem,
jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage));
+ result.add(new TransmissionJobItemInfo(shardingItem,
jobObjective.getTableName(), null, startTimeMillis, 0, errorMessage));
continue;
}
int inventoryFinishedPercentage =
getInventoryFinishedPercentage(jobItemProgress);
- result.add(new TransmissionJobItemInfo(shardingItem,
jobInfo.getTableName(), jobItemProgress, startTimeMillis,
inventoryFinishedPercentage, errorMessage));
+ result.add(new TransmissionJobItemInfo(shardingItem,
jobObjective.getTableName(), jobItemProgress, startTimeMillis,
inventoryFinishedPercentage, errorMessage));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index fedcb006f23..fc115063047 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -22,17 +22,18 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
/**
* Pipeline job type.
+ *
+ * @param <T> type of pipeline job configuration
*/
@SingletonSPI
@JsonIgnoreType
-public interface PipelineJobType extends TypedSPI {
+public interface PipelineJobType<T extends PipelineJobConfiguration> extends
TypedSPI {
/**
* Get pipeline job option.
@@ -42,23 +43,23 @@ public interface PipelineJobType extends TypedSPI {
PipelineJobOption getOption();
/**
- * Get pipeline job info.
+ * Get pipeline job objective.
*
- * @param jobMetaData job meta data
- * @return pipeline job info
+ * @param jobConfig pipeline job configuration
+ * @return pipeline pipeline job objective
*/
- PipelineJobInfo getJobInfo(PipelineJobMetaData jobMetaData);
+ PipelineJobObjective getJobObjective(T jobConfig);
/**
* Build pipeline data consistency checker.
*
- * @param jobConfig job configuration
+ * @param jobConfig pipeline job configuration
* @param processContext process context
* @param progressContext consistency check job item progress context
* @return all logic tables check result
* @throws UnsupportedOperationException unsupported operation exception
*/
- default PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig,
+ default PipelineDataConsistencyChecker buildDataConsistencyChecker(final T
jobConfig,
final
TransmissionProcessContext processContext, final
ConsistencyCheckJobItemProgressContext progressContext) {
throw new UnsupportedOperationException("Build data consistency
checker is not supported.");
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
index ea24e17b86c..7ed3c5d1fc3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
@@ -29,7 +29,5 @@ public final class PipelineJobInfo {
private final PipelineJobMetaData jobMetaData;
- private final String databaseName;
-
- private final String tableName;
+ private final PipelineJobObjective jobObjective;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobObjective.java
similarity index 90%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobObjective.java
index ea24e17b86c..5d1cdd83ee4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobInfo.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/pojo/PipelineJobObjective.java
@@ -21,13 +21,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * Pipeline job info.
+ * Pipeline job objective.
*/
@RequiredArgsConstructor
@Getter
-public final class PipelineJobInfo {
-
- private final PipelineJobMetaData jobMetaData;
+public final class PipelineJobObjective {
private final String databaseName;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
index c074a4a1ec1..efaad7bf5f9 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
@@ -17,13 +17,13 @@
package org.apache.shardingsphere.data.pipeline.core.job.type;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
/**
* Fixture job type.
*/
-public final class FixtureJobType implements PipelineJobType {
+public final class FixtureJobType implements
PipelineJobType<PipelineJobConfiguration> {
@Override
public PipelineJobOption getOption() {
@@ -31,7 +31,7 @@ public final class FixtureJobType implements PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
+ public PipelineJobObjective getJobObjective(final PipelineJobConfiguration
jobConfig) {
return null;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 78782ff27e3..2ce1ea04835 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -20,16 +20,14 @@ package org.apache.shardingsphere.data.pipeline.cdc;
import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
/**
* CDC job type.
*/
-public final class CDCJobType implements PipelineJobType {
+public final class CDCJobType implements PipelineJobType<CDCJobConfiguration> {
@Override
public PipelineJobOption getOption() {
@@ -37,9 +35,8 @@ public final class CDCJobType implements PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(getOption()).getJobConfiguration(jobMetaData.getJobId());
- return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
+ public PipelineJobObjective getJobObjective(final CDCJobConfiguration
jobConfig) {
+ return new PipelineJobObjective(jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
b/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index 7f283e75145..b23bf5de7da 100644
---
a/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++
b/kernel/data-pipeline/scenario/cdc/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -45,7 +45,7 @@ public final class ShowStreamingListExecutor implements
DistSQLQueryExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingListStatement sqlStatement, final ContextManager contextManager) {
return pipelineJobManager.getJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
- each.getDatabaseName(), each.getTableName(),
each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive(),
+ each.getJobObjective().getDatabaseName(),
each.getJobObjective().getTableName(), each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive(),
each.getJobMetaData().getCreateTime(),
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
}
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 08061c42174..f33c104727b 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -17,17 +17,17 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.swapper.YamlConsistencyCheckJobConfigurationSwapper;
/**
* Consistency check job type.
*/
-public final class ConsistencyCheckJobType implements PipelineJobType {
+public final class ConsistencyCheckJobType implements
PipelineJobType<PipelineJobConfiguration> {
@Override
public PipelineJobOption getOption() {
@@ -36,7 +36,7 @@ public final class ConsistencyCheckJobType implements
PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
+ public PipelineJobObjective getJobObjective(final PipelineJobConfiguration
jobConfig) {
return null;
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 966a299718c..6119d3cde43 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -20,13 +20,10 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobObjective;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
@@ -37,7 +34,7 @@ import java.util.LinkedList;
/**
* Migration job type.
*/
-public final class MigrationJobType implements PipelineJobType {
+public final class MigrationJobType implements
PipelineJobType<MigrationJobConfiguration> {
@Override
public PipelineJobOption getOption() {
@@ -46,17 +43,16 @@ public final class MigrationJobType implements
PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(getOption()).getJobConfiguration(jobMetaData.getJobId());
+ public PipelineJobObjective getJobObjective(final
MigrationJobConfiguration jobConfig) {
Collection<String> sourceTables = new LinkedList<>();
jobConfig.getJobShardingDataNodes().forEach(each ->
each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode ->
sourceTables.add(dataNode.format()))));
- return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
+ return new PipelineJobObjective(null, String.join(",", sourceTables));
}
@Override
- public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig,
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
MigrationJobConfiguration jobConfig,
final
TransmissionProcessContext processContext, final
ConsistencyCheckJobItemProgressContext progressContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
jobConfig, processContext, progressContext);
+ return new MigrationDataConsistencyChecker(jobConfig, processContext,
progressContext);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 76371697fb0..d75b42abd7a 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -53,7 +53,7 @@ public final class ShowMigrationListExecutor implements
DistSQLQueryExecutor<Sho
private LocalDataQueryResultRow getRow(final PipelineContextKey
contextKey, final PipelineJobInfo jobInfo) {
boolean active = jobInfo.getJobMetaData().isActive();
String jobShardingNodes = active ? getJobShardingNodes(contextKey,
jobInfo.getJobMetaData().getJobId()) : "";
- return new
LocalDataQueryResultRow(jobInfo.getJobMetaData().getJobId(),
jobInfo.getTableName(), active, jobInfo.getJobMetaData().getCreateTime(),
+ return new
LocalDataQueryResultRow(jobInfo.getJobMetaData().getJobId(),
jobInfo.getJobObjective().getTableName(), active,
jobInfo.getJobMetaData().getCreateTime(),
jobInfo.getJobMetaData().getStopTime(),
jobInfo.getJobMetaData().getJobItemCount(), jobShardingNodes);
}