This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new faa9642d656 Refactor PipelineJobType.getJobInfo() (#37108)
faa9642d656 is described below

commit faa9642d656730e5858fa378bb0b0ede7d50f71b
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 12:07:48 2025 +0800

    Refactor PipelineJobType.getJobInfo() (#37108)
---
 .../data/pipeline/core/job/service/PipelineJobManager.java          | 4 +++-
 .../data/pipeline/core/job/service/TransmissionJobManager.java      | 3 ++-
 .../shardingsphere/data/pipeline/core/job/type/PipelineJobType.java | 5 +++--
 .../shardingsphere/data/pipeline/core/job/type/FixtureJobType.java  | 3 ++-
 .../org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java     | 6 ++----
 .../pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java | 3 ++-
 .../data/pipeline/scenario/migration/MigrationJobType.java          | 6 ++----
 7 files changed, 16 insertions(+), 14 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index f0f3266d9fb..dcd238cc377 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItem
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -175,7 +176,8 @@ public final class PipelineJobManager {
         try {
             return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
                     .filter(each -> !each.getJobName().startsWith("_") && 
jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
-                    .map(each -> 
jobType.getJobInfo(each.getJobName())).collect(Collectors.toList());
+                    .map(each -> jobType.getOption().isTransmissionJob() ? 
jobType.getJobInfo(new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName())))
 : null)
+                    .collect(Collectors.toList());
         } catch (final UnsupportedOperationException ex) {
             return Collections.emptyList();
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 4e0020ea566..f47efd748e6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 
@@ -56,7 +57,7 @@ public final class TransmissionJobManager {
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, TransmissionJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<TransmissionJobItemInfo> result = new LinkedList<>();
-        PipelineJobInfo jobInfo = jobType.getJobInfo(jobId);
+        PipelineJobInfo jobInfo = jobType.getJobInfo(new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)));
         for (Entry<Integer, TransmissionJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             TransmissionJobItemProgress jobItemProgress = entry.getValue();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index dfe627d89cf..fedcb006f23 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
@@ -43,10 +44,10 @@ public interface PipelineJobType extends TypedSPI {
     /**
      * Get pipeline job info.
      *
-     * @param jobId job ID
+     * @param jobMetaData job meta data
      * @return pipeline job info
      */
-    PipelineJobInfo getJobInfo(String jobId);
+    PipelineJobInfo getJobInfo(PipelineJobMetaData jobMetaData);
     
     /**
      * Build pipeline data consistency checker.
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
index 1be08c3f9da..c074a4a1ec1 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.job.type;
 
 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
 
 /**
  * Fixture job type.
@@ -30,7 +31,7 @@ public final class FixtureJobType implements PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
+    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
         return null;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 793e6e77e48..a0264488739 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.cdc;
 
 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
@@ -38,9 +37,8 @@ public final class CDCJobType implements PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(new CDCJobType()).getJobConfiguration(jobId);
+    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(new 
CDCJobType()).getJobConfiguration(jobMetaData.getJobId());
         return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 1254f8497e5..08061c42174 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.Ya
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.swapper.YamlConsistencyCheckJobConfigurationSwapper;
 
 /**
@@ -35,7 +36,7 @@ public final class ConsistencyCheckJobType implements 
PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
+    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
         return null;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 040a844d58b..da4abe3186e 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -21,7 +21,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
@@ -47,10 +46,9 @@ public final class MigrationJobType implements 
PipelineJobType {
     }
     
     @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+    public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
         Collection<String> sourceTables = new LinkedList<>();
-        new PipelineJobConfigurationManager(new 
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+        new PipelineJobConfigurationManager(new 
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobMetaData.getJobId()).getJobShardingDataNodes()
                 .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> sourceTables.add(dataNode.format()))));
         return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
     }

Reply via email to