This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d914eaf9719 Refactor InventoryIncrementalJobItemProgress (#26859)
d914eaf9719 is described below
commit d914eaf97192e0d2cbc1b80818904bb06d91530b
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jul 9 18:39:45 2023 +0800
Refactor InventoryIncrementalJobItemProgress (#26859)
---
.../shardingsphere/encrypt/merge/dql/EncryptMergedResult.java | 7 +++----
.../common/job/progress/InventoryIncrementalJobItemProgress.java | 3 ++-
.../yaml/YamlInventoryIncrementalJobItemProgressSwapper.java | 6 ++++--
.../job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java | 2 +-
.../job/progress/InventoryIncrementalJobItemProgressTest.java | 2 +-
.../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 +-
6 files changed, 12 insertions(+), 10 deletions(-)
diff --git
a/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
b/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
index f5366a687c6..9507ac3b562 100644
---
a/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
+++
b/features/encrypt/core/src/main/java/org/apache/shardingsphere/encrypt/merge/dql/EncryptMergedResult.java
@@ -69,13 +69,12 @@ public final class EncryptMergedResult implements
MergedResult {
if (!tableName.isPresent()) {
return mergedResult.getValue(columnIndex, type);
}
- String originalColumnName = originalColumn.getName();
- if (!encryptRule.findEncryptTable(tableName.get()).map(optional ->
optional.isEncryptColumn(originalColumnName)).orElse(false)) {
+ if (!encryptRule.findEncryptTable(tableName.get()).map(optional ->
optional.isEncryptColumn(originalColumn.getName())).orElse(false)) {
return mergedResult.getValue(columnIndex, type);
}
Object cipherValue = mergedResult.getValue(columnIndex, Object.class);
- EncryptColumn encryptColumn =
encryptRule.getEncryptTable(tableName.get()).getEncryptColumn(originalColumnName);
- return encryptColumn.getCipher().decrypt(database.getName(),
schemaName, tableName.get(), originalColumnName, cipherValue);
+ EncryptColumn encryptColumn =
encryptRule.getEncryptTable(tableName.get()).getEncryptColumn(originalColumn.getName());
+ return encryptColumn.getCipher().decrypt(database.getName(),
schemaName, tableName.get(), originalColumn.getName(), cipherValue);
}
private Optional<String> findTableName(final ColumnProjection
columnProjection, final Map<String, String> columnTableNames) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
index 807aabf726e..32dd99e121f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgress.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
/**
* Inventory incremental job item progress.
@@ -30,7 +31,7 @@ public final class InventoryIncrementalJobItemProgress
implements PipelineJobIte
private JobStatus status = JobStatus.RUNNING;
- private String sourceDatabaseType;
+ private DatabaseType sourceDatabaseType;
private String dataSourceName;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 607261e324d..a7841163c7f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
@@ -34,7 +36,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
public YamlInventoryIncrementalJobItemProgress
swapToYamlConfiguration(final InventoryIncrementalJobItemProgress progress) {
YamlInventoryIncrementalJobItemProgress result = new
YamlInventoryIncrementalJobItemProgress();
result.setStatus(progress.getStatus().name());
- result.setSourceDatabaseType(progress.getSourceDatabaseType());
+
result.setSourceDatabaseType(progress.getSourceDatabaseType().getType());
result.setDataSourceName(progress.getDataSourceName());
result.setInventory(inventoryTasksProgressSwapper.swapToYaml(progress.getInventory()));
result.setIncremental(incrementalTasksProgressSwapper.swapToYaml(progress.getIncremental()));
@@ -47,7 +49,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
public InventoryIncrementalJobItemProgress swapToObject(final
YamlInventoryIncrementalJobItemProgress yamlProgress) {
InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
- result.setSourceDatabaseType(yamlProgress.getSourceDatabaseType());
+
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class,
yamlProgress.getSourceDatabaseType()));
result.setDataSourceName(yamlProgress.getDataSourceName());
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
yamlProgress.getIncremental()));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index ff43cf6fa40..9267949059d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -146,7 +146,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
InventoryIncrementalJobItemContext context =
(InventoryIncrementalJobItemContext) jobItemContext;
InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
jobItemProgress.setStatus(context.getStatus());
-
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType().getType());
+
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
jobItemProgress.setDataSourceName(context.getDataSourceName());
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
index 98a830c3dde..a154af9dc7f 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -44,7 +44,7 @@ class InventoryIncrementalJobItemProgressTest {
void assertInit() {
InventoryIncrementalJobItemProgress actual =
getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml"));
assertThat(actual.getStatus(), is(JobStatus.RUNNING));
- assertThat(actual.getSourceDatabaseType(), is("H2"));
+ assertThat(actual.getSourceDatabaseType().getType(), is("H2"));
assertThat(actual.getInventory().getProgresses().size(), is(4));
assertNotNull(actual.getIncremental().getIncrementalTaskProgress());
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index f351f866e85..a85e6c1696d 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -194,7 +194,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
final PipelineDataSourceManager dataSourceManager,
final DumperConfiguration dumperConfig) throws SQLException {
InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
-
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType().getType());
+ result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(dumperConfig.getDataSourceName());
IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));