This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git

The following commit(s) were added to refs/heads/master by this push:
     new fe845fd5eee Refactor PipelineJobInfo (#29105)
fe845fd5eee is described below

commit fe845fd5eee7e67e12ffbce56f342e3f5166f334
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Tue Nov 21 17:17:42 2023 +0800

    Refactor PipelineJobInfo (#29105)
    
    * Refactor PipelineJobInfo
    
    * Refactor PipelineJobInfo
---
 .../data/pipeline/common/pojo/PipelineJobInfo.java | 21 ++++++-----
 .../common/pojo/TableBasedPipelineJobInfo.java     | 42 ----------------------
 .../service/InventoryIncrementalJobManager.java    |  4 +--
 .../handler/query/ShowStreamingListExecutor.java   |  3 +-
 .../handler/query/ShowMigrationListExecutor.java   |  3 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  6 ++--
 .../migration/api/impl/MigrationJobAPI.java        |  6 ++--
 7 files changed, 23 insertions(+), 62 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java
index 8f028d3b234..44ba79c5206 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java
@@ -17,15 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.common.pojo;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
 /**
- * Pipeline job meta data.
+ * Pipeline job info.
  */
-public interface PipelineJobInfo {
+@RequiredArgsConstructor
+@Getter
+public final class PipelineJobInfo {
+    
+    private final PipelineJobMetaData jobMetaData;
+    
+    private final String databaseName;
     
-    /**
-     * Get job meta data.
-     * 
-     * @return job meta data
-     */
-    PipelineJobMetaData getJobMetaData();
+    // TODO Rename
+    private final String table;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TableBasedPipelineJobInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TableBasedPipelineJobInfo.java
deleted file mode 100644
index bcf5fa013cf..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TableBasedPipelineJobInfo.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.pojo;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Table based pipeline job info.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
-    
-    private final PipelineJobMetaData jobMetaData;
-    
-    private final String databaseName;
-    
-    // TODO Rename
-    private final String table;
-    
-    public TableBasedPipelineJobInfo(final PipelineJobMetaData jobMetaData, 
final String table) {
-        this.jobMetaData = jobMetaData;
-        databaseName = null;
-        this.table = table;
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index 7e41b403aac..dbdf9aadcb0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -80,7 +80,7 @@ public final class InventoryIncrementalJobManager {
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
-        TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
jobAPI.getJobInfo(jobId);
+        PipelineJobInfo jobInfo = jobAPI.getJobInfo(jobId);
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index c5c64bb35ff..4abb07b7ce5 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -41,7 +40,7 @@ public final class ShowStreamingListExecutor implements 
QueryableRALExecutor<Sho
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingListStatement sqlStatement) {
         return pipelineJobManager.getJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
-                ((TableBasedPipelineJobInfo) each).getDatabaseName(), 
((TableBasedPipelineJobInfo) each).getTable(),
+                each.getDatabaseName(), each.getTable(),
                 each.getJobMetaData().getJobItemCount(), 
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index ce1877a5719..3f1ec3deb4c 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -40,7 +39,7 @@ public final class ShowMigrationListExecutor implements 
QueryableRALExecutor<Sho
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationListStatement sqlStatement) {
         return pipelineJobManager.getJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
-                ((TableBasedPipelineJobInfo) each).getTable(), 
each.getJobMetaData().getJobItemCount(),
+                each.getTable(), each.getJobMetaData().getJobItemCount(),
                 each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index c6cd6652687..405d0f5a5ad 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -50,8 +50,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
@@ -290,10 +290,10 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
     }
     
     @Override
-    public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
+    public PipelineJobInfo getJobInfo(final String jobId) {
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
-        return new TableBasedPipelineJobInfo(jobMetaData, 
jobConfig.getDatabaseName(), String.join(", ", 
jobConfig.getSchemaTableNames()));
+        return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 6ffa6371715..d80211588ae 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -39,8 +39,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipeli
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
@@ -209,12 +209,12 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
     }
     
     @Override
-    public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
+    public PipelineJobInfo getJobInfo(final String jobId) {
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         List<String> sourceTables = new LinkedList<>();
         new 
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
                 .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
-        return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", 
sourceTables));
+        return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
     }
     
     @Override

Reply via email to