This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new faa9642d656 Refactor PipelineJobType.getJobInfo() (#37108)
faa9642d656 is described below
commit faa9642d656730e5858fa378bb0b0ede7d50f71b
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 12:07:48 2025 +0800
Refactor PipelineJobType.getJobInfo() (#37108)
---
.../data/pipeline/core/job/service/PipelineJobManager.java | 4 +++-
.../data/pipeline/core/job/service/TransmissionJobManager.java | 3 ++-
.../shardingsphere/data/pipeline/core/job/type/PipelineJobType.java | 5 +++--
.../shardingsphere/data/pipeline/core/job/type/FixtureJobType.java | 3 ++-
.../org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java | 6 ++----
.../pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java | 3 ++-
.../data/pipeline/scenario/migration/MigrationJobType.java | 6 ++----
7 files changed, 16 insertions(+), 14 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index f0f3266d9fb..dcd238cc377 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItem
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -175,7 +176,8 @@ public final class PipelineJobManager {
try {
return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
.filter(each -> !each.getJobName().startsWith("_") &&
jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
- .map(each ->
jobType.getJobInfo(each.getJobName())).collect(Collectors.toList());
+ .map(each -> jobType.getOption().isTransmissionJob() ?
jobType.getJobInfo(new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName())))
: null)
+ .collect(Collectors.toList());
} catch (final UnsupportedOperationException ex) {
return Collections.emptyList();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 4e0020ea566..f47efd748e6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -56,7 +57,7 @@ public final class TransmissionJobManager {
long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<TransmissionJobItemInfo> result = new LinkedList<>();
- PipelineJobInfo jobInfo = jobType.getJobInfo(jobId);
+ PipelineJobInfo jobInfo = jobType.getJobInfo(new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)));
for (Entry<Integer, TransmissionJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TransmissionJobItemProgress jobItemProgress = entry.getValue();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index dfe627d89cf..fedcb006f23 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -43,10 +44,10 @@ public interface PipelineJobType extends TypedSPI {
/**
* Get pipeline job info.
*
- * @param jobId job ID
+ * @param jobMetaData job meta data
* @return pipeline job info
*/
- PipelineJobInfo getJobInfo(String jobId);
+ PipelineJobInfo getJobInfo(PipelineJobMetaData jobMetaData);
/**
* Build pipeline data consistency checker.
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
index 1be08c3f9da..c074a4a1ec1 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/type/FixtureJobType.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.type;
import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
/**
* Fixture job type.
@@ -30,7 +31,7 @@ public final class FixtureJobType implements PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final String jobId) {
+ public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
return null;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
index 793e6e77e48..a0264488739 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.cdc;
import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
@@ -38,9 +37,8 @@ public final class CDCJobType implements PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(new CDCJobType()).getJobConfiguration(jobId);
+ public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(new
CDCJobType()).getJobConfiguration(jobMetaData.getJobId());
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 1254f8497e5..08061c42174 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.Ya
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.swapper.YamlConsistencyCheckJobConfigurationSwapper;
/**
@@ -35,7 +36,7 @@ public final class ConsistencyCheckJobType implements
PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final String jobId) {
+ public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
return null;
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 040a844d58b..da4abe3186e 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -21,7 +21,6 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlTransmissionJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
@@ -47,10 +46,9 @@ public final class MigrationJobType implements
PipelineJobType {
}
@Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ public PipelineJobInfo getJobInfo(final PipelineJobMetaData jobMetaData) {
Collection<String> sourceTables = new LinkedList<>();
- new PipelineJobConfigurationManager(new
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+ new PipelineJobConfigurationManager(new
MigrationJobType()).<MigrationJobConfiguration>getJobConfiguration(jobMetaData.getJobId()).getJobShardingDataNodes()
.forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode -> sourceTables.add(dataNode.format()))));
return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
}