This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6d2b42028db Refactor PipelineJobManager (#37111)
6d2b42028db is described below
commit 6d2b42028db64c88ad462398b674f55382a1110d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 15 15:36:05 2025 +0800
Refactor PipelineJobManager (#37111)
---
.../data/pipeline/core/job/service/PipelineJobManager.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 4738540975c..cb706f495ed 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -174,15 +175,18 @@ public final class PipelineJobManager {
*/
public List<PipelineJobInfo> getJobInfos(final PipelineContextKey
contextKey) {
try {
- return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
- .filter(each -> !each.getJobName().startsWith("_") &&
jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
- .map(each -> jobType.getOption().isTransmissionJob() ?
jobType.getJobInfo(new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName())))
: null)
+ return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(this::isValidJob)
+ .map(each -> jobType.getJobInfo(new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName()))))
.collect(Collectors.toList());
} catch (final UnsupportedOperationException ex) {
return Collections.emptyList();
}
}
+ private boolean isValidJob(final JobBriefInfo jobInfo) {
+ return !jobInfo.getJobName().startsWith("_") &&
jobType.getOption().isTransmissionJob() &&
jobType.getType().equals(PipelineJobIdUtils.parseJobType(jobInfo.getJobName()).getType());
+ }
+
/**
* Get pipeline job sharding info.
*