This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d914eaf9719 Refactor InventoryIncrementalJobItemProgress (#26859)
d914eaf9719 is described below

commit d914eaf97192e0d2cbc1b80818904bb06d91530b
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jul 9 18:39:45 2023 +0800

    Refactor InventoryIncrementalJobItemProgress (#26859)
---
 .../shardingsphere/encrypt/merge/dql/EncryptMergedResult.java      | 7 +++----
 .../common/job/progress/InventoryIncrementalJobItemProgress.java   | 3 ++-
 .../yaml/YamlInventoryIncrementalJobItemProgressSwapper.java       | 6 ++++--
 .../job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java   | 2 +-
 .../job/progress/InventoryIncrementalJobItemProgressTest.java      | 2 +-
 .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java       | 2 +-
 6 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
 
b/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
index f5366a687c6..9507ac3b562 100644
--- 
a/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
+++ 
b/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
@@ -69,13 +69,12 @@ public final class EncryptMergedResult implements 
MergedResult {
         if (!tableName.isPresent()) {
             return mergedResult.getValue(columnIndex, type);
         }
-        String originalColumnName = originalColumn.getName();
-        if (!encryptRule.findEncryptTable(tableName.get()).map(optional -> 
optional.isEncryptColumn(originalColumnName)).orElse(false)) {
+        if (!encryptRule.findEncryptTable(tableName.get()).map(optional -> 
optional.isEncryptColumn(originalColumn.getName())).orElse(false)) {
             return mergedResult.getValue(columnIndex, type);
         }
         Object cipherValue = mergedResult.getValue(columnIndex, Object.class);
-        EncryptColumn encryptColumn = 
encryptRule.getEncryptTable(tableName.get()).getEncryptColumn(originalColumnName);
-        return encryptColumn.getCipher().decrypt(database.getName(), 
schemaName, tableName.get(), originalColumnName, cipherValue);
+        EncryptColumn encryptColumn = 
encryptRule.getEncryptTable(tableName.get()).getEncryptColumn(originalColumn.getName());
+        return encryptColumn.getCipher().decrypt(database.getName(), 
schemaName, tableName.get(), originalColumn.getName(), cipherValue);
     }
     
     private Optional<String> findTableName(final ColumnProjection 
columnProjection, final Map<String, String> columnTableNames) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
index 807aabf726e..32dd99e121f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
 /**
  * Inventory incremental job item progress.
@@ -30,7 +31,7 @@ public final class InventoryIncrementalJobItemProgress 
implements PipelineJobIte
     
     private JobStatus status = JobStatus.RUNNING;
     
-    private String sourceDatabaseType;
+    private DatabaseType sourceDatabaseType;
     
     private String dataSourceName;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 607261e324d..a7841163c7f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -19,6 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
 
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 /**
@@ -34,7 +36,7 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
     public YamlInventoryIncrementalJobItemProgress 
swapToYamlConfiguration(final InventoryIncrementalJobItemProgress progress) {
         YamlInventoryIncrementalJobItemProgress result = new 
YamlInventoryIncrementalJobItemProgress();
         result.setStatus(progress.getStatus().name());
-        result.setSourceDatabaseType(progress.getSourceDatabaseType());
+        
result.setSourceDatabaseType(progress.getSourceDatabaseType().getType());
         result.setDataSourceName(progress.getDataSourceName());
         
result.setInventory(inventoryTasksProgressSwapper.swapToYaml(progress.getInventory()));
         
result.setIncremental(incrementalTasksProgressSwapper.swapToYaml(progress.getIncremental()));
@@ -47,7 +49,7 @@ public final class 
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
     public InventoryIncrementalJobItemProgress swapToObject(final 
YamlInventoryIncrementalJobItemProgress yamlProgress) {
         InventoryIncrementalJobItemProgress result = new 
InventoryIncrementalJobItemProgress();
         result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
-        result.setSourceDatabaseType(yamlProgress.getSourceDatabaseType());
+        
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, 
yamlProgress.getSourceDatabaseType()));
         result.setDataSourceName(yamlProgress.getDataSourceName());
         
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
         
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index ff43cf6fa40..9267949059d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -146,7 +146,7 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         InventoryIncrementalJobItemContext context = 
(InventoryIncrementalJobItemContext) jobItemContext;
         InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
         jobItemProgress.setStatus(context.getStatus());
-        
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType().getType());
+        
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
         jobItemProgress.setDataSourceName(context.getDataSourceName());
         
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
         
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
index 98a830c3dde..a154af9dc7f 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -44,7 +44,7 @@ class InventoryIncrementalJobItemProgressTest {
     void assertInit() {
         InventoryIncrementalJobItemProgress actual = 
getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml"));
         assertThat(actual.getStatus(), is(JobStatus.RUNNING));
-        assertThat(actual.getSourceDatabaseType(), is("H2"));
+        assertThat(actual.getSourceDatabaseType().getType(), is("H2"));
         assertThat(actual.getInventory().getProgresses().size(), is(4));
         assertNotNull(actual.getIncremental().getIncrementalTaskProgress());
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index f351f866e85..a85e6c1696d 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -194,7 +194,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
                                                                                
               final PipelineDataSourceManager dataSourceManager,
                                                                                
               final DumperConfiguration dumperConfig) throws SQLException {
         InventoryIncrementalJobItemProgress result = new 
InventoryIncrementalJobItemProgress();
-        
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType().getType());
+        result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
         result.setDataSourceName(dumperConfig.getDataSourceName());
         IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, 
dumperConfig, dataSourceManager));
         result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));

Reply via email to