This is an automated email from the ASF dual-hosted git repository. zhonghongsheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new cc2bd6e1d56 Extract YamlJobProgress.YamlIncremental and YamlJobProgress.YamlInventory (#19875) cc2bd6e1d56 is described below commit cc2bd6e1d5671dd8bc73c698114d2d833f94a230 Author: Da Xiang Huang <localhos...@foxmail.com> AuthorDate: Fri Aug 5 22:13:37 2022 +0800 Extract YamlJobProgress.YamlIncremental and YamlJobProgress.YamlInventory (#19875) --- .../job/progress/JobIncrementalTaskProgress.java | 63 +++++++++++++++ ...Progress.java => JobInventoryTaskProgress.java} | 65 +++------------- .../pipeline/api/job/progress/JobProgress.java | 36 +++------ .../api/task/progress/IncrementalTaskProgress.java | 6 +- .../core/api/impl/GovernanceRepositoryAPIImpl.java | 31 ++++---- .../yaml/YamlIncrementalTaskProgress.java} | 20 ++--- .../yaml/YamlIncrementalTaskProgressSwapper.java | 68 +++++++++++++++++ .../progress/yaml/YamlInventoryTaskProgress.java} | 19 ++--- .../yaml/YamlInventoryTaskProgressSwapper.java | 89 ++++++++++++++++++++++ .../core/job/progress/yaml/YamlJobProgress.java | 26 +------ .../job/progress/yaml/YamlJobProgressSwapper.java | 78 +------------------ .../data/pipeline/core/task/IncrementalTask.java | 9 ++- ...dleRuleAlteredJobCompletionDetectAlgorithm.java | 4 +- .../core/job/progress/JobProgressTest.java | 4 +- .../progress/yaml/YamlJobProgressSwapperTest.java | 8 +- ...uleAlteredJobCompletionDetectAlgorithmTest.java | 13 ++-- .../prepare/InventoryTaskSplitterTest.java | 10 ++- .../src/test/resources/governance-repository.yaml | 10 +-- .../test/resources/job-progress-all-finished.yaml | 10 +-- .../test/resources/job-progress-no-finished.yaml | 12 +-- .../test/resources/job-progress-no-inventory.yaml | 16 ++-- .../src/test/resources/job-progress.yaml | 10 +-- 22 files changed, 329 insertions(+), 278 deletions(-) diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobIncrementalTaskProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobIncrementalTaskProgress.java new file mode 100644 index 00000000000..5b6be6dd1cc --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobIncrementalTaskProgress.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.job.progress; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; +import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress; + +/** + * Job incrementalTask progress. + */ +@RequiredArgsConstructor +@Getter +public class JobIncrementalTaskProgress { + + private final Map<String, IncrementalTaskProgress> incrementalTaskProgressMap; + + /** + * Get incremental position. + * + * @param dataSourceName dataSource + * @return incremental position + */ + public Optional<IngestPosition<?>> getIncrementalPosition(final String dataSourceName) { + Optional<IncrementalTaskProgress> incrementalTaskProgressItem = incrementalTaskProgressMap.entrySet().stream() + .filter(entry -> dataSourceName.equals(entry.getKey())) + .map(Map.Entry::getValue) + .findAny(); + return incrementalTaskProgressItem.map(IncrementalTaskProgress::getPosition); + } + + /** + * Get incremental latest active time milliseconds. + * + * @return latest active time, <code>0</code> is there is no activity + */ + public long getIncrementalLatestActiveTimeMillis() { + List<Long> delays = incrementalTaskProgressMap.values().stream() + .map(each -> each.getIncrementalTaskDelay().getLatestActiveTimeMillis()) + .collect(Collectors.toList()); + return delays.stream().reduce(Long::max).orElse(0L); + } +} diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobInventoryTaskProgress.java similarity index 53% copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobInventoryTaskProgress.java index 2f0be9743b1..ff75c1bff5c 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobInventoryTaskProgress.java @@ -17,49 +17,23 @@ package org.apache.shardingsphere.data.pipeline.api.job.progress; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - /** - * Job progress. + * Job inventoryTask progress. */ +@RequiredArgsConstructor @Getter -@Setter -// TODO now rename -public final class JobProgress implements PipelineJobProgress { +public class JobInventoryTaskProgress { - private JobStatus status = JobStatus.RUNNING; - - private String sourceDatabaseType; - - private boolean active; - - private Map<String, InventoryTaskProgress> inventoryTaskProgressMap; - - private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap; - - /** - * Get incremental position. - * - * @param dataSourceName data source name - * @return incremental position - */ - public Optional<IngestPosition<?>> getIncrementalPosition(final String dataSourceName) { - IncrementalTaskProgress progress = incrementalTaskProgressMap.get(dataSourceName); - return Optional.ofNullable(null != progress ? progress.getPosition() : null); - } + private final Map<String, InventoryTaskProgress> inventoryTaskProgressMap; /** * Get inventory position. @@ -71,16 +45,7 @@ public final class JobProgress implements PipelineJobProgress { Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", tableName)); return inventoryTaskProgressMap.entrySet().stream() .filter(entry -> pattern.matcher(entry.getKey()).find()) - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition())); - } - - /** - * Get data source. - * - * @return data source - */ - public String getDataSource() { - return incrementalTaskProgressMap.keySet().stream().findAny().orElse(""); + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition())); } /** @@ -94,16 +59,4 @@ public final class JobProgress implements PipelineJobProgress { .count(); return inventoryTaskProgressMap.isEmpty() ? 0 : (int) (finished * 100 / inventoryTaskProgressMap.size()); } - - /** - * Get incremental latest active time milliseconds. - * - * @return latest active time, <code>0</code> is there is no activity - */ - public long getIncrementalLatestActiveTimeMillis() { - List<Long> delays = incrementalTaskProgressMap.values().stream() - .map(each -> each.getIncrementalTaskDelay().getLatestActiveTimeMillis()) - .collect(Collectors.toList()); - return delays.stream().reduce(Long::max).orElse(0L); - } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java index 2f0be9743b1..a902fa37884 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java @@ -19,18 +19,11 @@ package org.apache.shardingsphere.data.pipeline.api.job.progress; import lombok.Getter; import lombok.Setter; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress; -import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; -import java.util.regex.Pattern; -import java.util.stream.Collectors; /** * Job progress. @@ -46,19 +39,17 @@ public final class JobProgress implements PipelineJobProgress { private boolean active; - private Map<String, InventoryTaskProgress> inventoryTaskProgressMap; + private JobInventoryTaskProgress jobInventoryTask; - private Map<String, IncrementalTaskProgress> incrementalTaskProgressMap; + private JobIncrementalTaskProgress jobIncrementalTask; /** - * Get incremental position. - * - * @param dataSourceName data source name + * get incremental position. + * @param dataSourceName dataSource * @return incremental position */ public Optional<IngestPosition<?>> getIncrementalPosition(final String dataSourceName) { - IncrementalTaskProgress progress = incrementalTaskProgressMap.get(dataSourceName); - return Optional.ofNullable(null != progress ? progress.getPosition() : null); + return jobIncrementalTask.getIncrementalPosition(dataSourceName); } /** @@ -68,10 +59,7 @@ public final class JobProgress implements PipelineJobProgress { * @return inventory position */ public Map<String, IngestPosition<?>> getInventoryPosition(final String tableName) { - Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", tableName)); - return inventoryTaskProgressMap.entrySet().stream() - .filter(entry -> pattern.matcher(entry.getKey()).find()) - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition())); + return jobInventoryTask.getInventoryPosition(tableName); } /** @@ -80,7 +68,7 @@ public final class JobProgress implements PipelineJobProgress { * @return data source */ public String getDataSource() { - return incrementalTaskProgressMap.keySet().stream().findAny().orElse(""); + return jobIncrementalTask.getIncrementalTaskProgressMap().keySet().stream().findAny().orElse(""); } /** @@ -89,10 +77,7 @@ public final class JobProgress implements PipelineJobProgress { * @return finished percentage */ public int getInventoryFinishedPercentage() { - long finished = inventoryTaskProgressMap.values().stream() - .filter(each -> each.getPosition() instanceof FinishedPosition) - .count(); - return inventoryTaskProgressMap.isEmpty() ? 0 : (int) (finished * 100 / inventoryTaskProgressMap.size()); + return jobInventoryTask.getInventoryFinishedPercentage(); } /** @@ -101,9 +86,6 @@ public final class JobProgress implements PipelineJobProgress { * @return latest active time, <code>0</code> is there is no activity */ public long getIncrementalLatestActiveTimeMillis() { - List<Long> delays = incrementalTaskProgressMap.values().stream() - .map(each -> each.getIncrementalTaskDelay().getLatestActiveTimeMillis()) - .collect(Collectors.toList()); - return delays.stream().reduce(Long::max).orElse(0L); + return null == jobIncrementalTask ? 0L : jobIncrementalTask.getIncrementalLatestActiveTimeMillis(); } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java index d8eb199e8e0..938065f6e6c 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java @@ -17,9 +17,8 @@ package org.apache.shardingsphere.data.pipeline.api.task.progress; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -27,8 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio /** * Incremental task progress. */ -@NoArgsConstructor -@AllArgsConstructor +@RequiredArgsConstructor @Getter @Setter @ToString diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java index d9f035831a6..06c5a61a25a 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java @@ -21,9 +21,9 @@ import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobIncrementalTaskProgress; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobInventoryTaskProgress; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; -import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress; -import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress; import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper; import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress; @@ -35,9 +35,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -62,26 +60,23 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP JobProgress jobProgress = new JobProgress(); jobProgress.setStatus(jobContext.getStatus()); jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType()); - jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext)); - jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext)); + jobProgress.setJobIncrementalTask(getIncrementalTaskProgress(jobContext)); + jobProgress.setJobInventoryTask(getInventoryTaskProgress(jobContext)); String value = YamlEngine.marshal(SWAPPER.swapToYaml(jobProgress)); repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobContext.getJobId(), jobContext.getShardingItem()), value); } - private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final RuleAlteredJobContext jobContext) { - Map<String, IncrementalTaskProgress> result = new HashMap<>(jobContext.getIncrementalTasks().size(), 1); - for (IncrementalTask each : jobContext.getIncrementalTasks()) { - result.put(each.getTaskId(), each.getProgress()); - } - return result; + private JobIncrementalTaskProgress getIncrementalTaskProgress(final RuleAlteredJobContext jobContext) { + return new JobIncrementalTaskProgress( + jobContext.getIncrementalTasks() + .stream().collect(Collectors.toMap(IncrementalTask::getTaskId, IncrementalTask::getProgress))); } - private Map<String, InventoryTaskProgress> getInventoryTaskProgressMap(final RuleAlteredJobContext jobContext) { - Map<String, InventoryTaskProgress> result = new HashMap<>(jobContext.getInventoryTasks().size(), 1); - for (InventoryTask each : jobContext.getInventoryTasks()) { - result.put(each.getTaskId(), each.getProgress()); - } - return result; + private JobInventoryTaskProgress getInventoryTaskProgress(final RuleAlteredJobContext jobContext) { + return new JobInventoryTaskProgress( + jobContext.getInventoryTasks() + .stream() + .collect(Collectors.toMap(InventoryTask::getTaskId, InventoryTask::getProgress))); } @Override diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgress.java similarity index 62% copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgress.java index d8eb199e8e0..431b69c4e08 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgress.java @@ -15,26 +15,22 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.task.progress; +package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.Setter; -import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; +import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskDelay; /** - * Incremental task progress. + * Yaml IncrementalTaskProgress. */ -@NoArgsConstructor -@AllArgsConstructor @Getter @Setter -@ToString -public final class IncrementalTaskProgress implements TaskProgress { +public final class YamlIncrementalTaskProgress { - private volatile IngestPosition<?> position; + private String dataSourceName; - private IncrementalTaskDelay incrementalTaskDelay = new IncrementalTaskDelay(); + private String position; + + private IncrementalTaskDelay delay; } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgressSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgressSwapper.java new file mode 100644 index 00000000000..8c7376fda7d --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlIncrementalTaskProgressSwapper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; + +import java.util.Collections; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobIncrementalTaskProgress; +import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory; + +/** + * YAML IncrementalTask progress swapper. + */ +public final class YamlIncrementalTaskProgressSwapper { + + /** + * Swap to YAML. + * + * @param jobIncrementalTask incrementalTask progress + * @return IncrementalTaskProgress + */ + public YamlIncrementalTaskProgress swapToYaml(final JobIncrementalTaskProgress jobIncrementalTask) { + if (null == jobIncrementalTask) { + return null; + } + return jobIncrementalTask.getIncrementalTaskProgressMap() + .entrySet() + .stream() + .map(entry -> { + YamlIncrementalTaskProgress yamlIncrementalTaskProgress = new YamlIncrementalTaskProgress(); + yamlIncrementalTaskProgress.setDataSourceName(entry.getKey()); + yamlIncrementalTaskProgress.setPosition(entry.getValue().getPosition().toString()); + yamlIncrementalTaskProgress.setDelay(entry.getValue().getIncrementalTaskDelay()); + return yamlIncrementalTaskProgress; + }).findAny().orElse(null); + } + + /** + * Swap to object. + * + * @param databaseType databaseType + * @param incremental yaml incrementalTask progress + * @return incrementalTask progress + */ + public JobIncrementalTaskProgress swapToObject(final String databaseType, final YamlIncrementalTaskProgress incremental) { + if (null == incremental) { + return null; + } + IncrementalTaskProgress result = new IncrementalTaskProgress(); + result.setPosition(PositionInitializerFactory.getInstance(databaseType).init(incremental.getPosition())); + result.setIncrementalTaskDelay(incremental.getDelay()); + return new JobIncrementalTaskProgress(Collections.singletonMap(incremental.getDataSourceName(), result)); + } +} diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgress.java similarity index 62% copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgress.java index d8eb199e8e0..9aefc9ab639 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgress.java @@ -15,26 +15,21 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.task.progress; +package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; -import lombok.AllArgsConstructor; +import java.util.HashMap; +import java.util.Map; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.Setter; -import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; /** - * Incremental task progress. + * Yaml InventoryTaskProgress. */ -@NoArgsConstructor -@AllArgsConstructor @Getter @Setter -@ToString -public final class IncrementalTaskProgress implements TaskProgress { +public final class YamlInventoryTaskProgress { - private volatile IngestPosition<?> position; + private String[] finished = new String[0]; - private IncrementalTaskDelay incrementalTaskDelay = new IncrementalTaskDelay(); + private Map<String, String> unfinished = new HashMap<>(); } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgressSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgressSwapper.java new file mode 100644 index 00000000000..d5578437f40 --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryTaskProgressSwapper.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; + +import com.google.common.base.Strings; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition; +import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; +import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobInventoryTaskProgress; +import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress; + +/** + * YAML InventoryTask progress swapper. + */ +public final class YamlInventoryTaskProgressSwapper { + + /** + * Swap to YAML. + * + * @param inventory inventoryTask progress + * @return YAML inventoryTask progress + */ + public YamlInventoryTaskProgress swapToYaml(final JobInventoryTaskProgress inventory) { + YamlInventoryTaskProgress result = new YamlInventoryTaskProgress(); + if (inventory != null) { + result.setFinished(getFinished(inventory)); + result.setUnfinished(getUnfinished(inventory)); + } + return result; + } + + private String[] getFinished(final JobInventoryTaskProgress jobInventoryTaskProgress) { + return jobInventoryTaskProgress.getInventoryTaskProgressMap().entrySet().stream() + .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition) + .map(Map.Entry::getKey) + .toArray(String[]::new); + } + + private Map<String, String> getUnfinished(final JobInventoryTaskProgress jobInventoryTaskProgress) { + return jobInventoryTaskProgress.getInventoryTaskProgressMap().entrySet().stream() + .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition)) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition().toString())); + } + + /** + * Swap to object. + * + * @param inventory yaml inventoryTask progress + * @return inventoryTask progress + */ + public JobInventoryTaskProgress swapToObject(final YamlInventoryTaskProgress inventory) { + if (null == inventory) { + return null; + } + Map<String, InventoryTaskProgress> inventoryTaskProgressItemMap = new HashMap<>(); + inventoryTaskProgressItemMap.putAll(Arrays.stream(inventory.getFinished()) + .collect(Collectors.toMap(key -> key, value -> new InventoryTaskProgress(new FinishedPosition())))); + inventoryTaskProgressItemMap.putAll(inventory.getUnfinished().entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, getInventoryTaskProgressFunction()))); + return new JobInventoryTaskProgress(inventoryTaskProgressItemMap); + } + + private Function<Map.Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() { + return entry -> new InventoryTaskProgress( + Strings.isNullOrEmpty(entry.getValue()) ? new PlaceholderPosition() : PrimaryKeyPositionFactory.newInstance(entry.getValue())); + } +} diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java index 259aab86eb5..6f060f6ba1d 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgress.java @@ -19,10 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; import lombok.Getter; import lombok.Setter; -import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskDelay; - -import java.util.HashMap; -import java.util.Map; /** * Yaml job progress. @@ -35,25 +31,7 @@ public final class YamlJobProgress { private String sourceDatabaseType; - private YamlInventory inventory; - - private Map<String, YamlIncremental> incremental; - - @Getter - @Setter - public static final class YamlInventory { - - private String[] finished = new String[0]; - - private Map<String, String> unfinished = new HashMap<>(); - } + private YamlInventoryTaskProgress inventory; - @Getter - @Setter - public static final class YamlIncremental { - - private String position; - - private IncrementalTaskDelay delay; - } + private YamlIncrementalTaskProgress incremental; } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java index b89557e7cda..6b0387ab6d3 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapper.java @@ -17,23 +17,8 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml; -import com.google.common.base.Strings; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; -import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; -import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress; -import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress; -import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.function.Function; -import java.util.stream.Collectors; /** * YAML Job progress swapper. @@ -50,41 +35,11 @@ public final class YamlJobProgressSwapper { YamlJobProgress result = new YamlJobProgress(); result.setStatus(jobProgress.getStatus().name()); result.setSourceDatabaseType(jobProgress.getSourceDatabaseType()); - result.setInventory(getYamlInventory(jobProgress.getInventoryTaskProgressMap())); - result.setIncremental(getYamlIncremental(jobProgress.getIncrementalTaskProgressMap())); + result.setInventory(new YamlInventoryTaskProgressSwapper().swapToYaml(jobProgress.getJobInventoryTask())); + result.setIncremental(new YamlIncrementalTaskProgressSwapper().swapToYaml(jobProgress.getJobIncrementalTask())); return result; } - private YamlJobProgress.YamlInventory getYamlInventory(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) { - YamlJobProgress.YamlInventory result = new YamlJobProgress.YamlInventory(); - result.setFinished(getFinished(inventoryTaskProgressMap)); - result.setUnfinished(getUnfinished(inventoryTaskProgressMap)); - return result; - } - - private String[] getFinished(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) { - return inventoryTaskProgressMap.entrySet().stream() - .filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition) - .map(Entry::getKey) - .toArray(String[]::new); - } - - private Map<String, String> getUnfinished(final Map<String, InventoryTaskProgress> inventoryTaskProgressMap) { - return inventoryTaskProgressMap.entrySet().stream() - .filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition)) - .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition().toString())); - } - - private Map<String, YamlJobProgress.YamlIncremental> getYamlIncremental(final Map<String, IncrementalTaskProgress> incrementalTaskProgressMap) { - return incrementalTaskProgressMap.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, entry -> { - YamlJobProgress.YamlIncremental result = new YamlJobProgress.YamlIncremental(); - result.setPosition(entry.getValue().getPosition().toString()); - result.setDelay(entry.getValue().getIncrementalTaskDelay()); - return result; - })); - } - /** * Swap to object. * @@ -95,33 +50,8 @@ public final class YamlJobProgressSwapper { JobProgress result = new JobProgress(); result.setStatus(JobStatus.valueOf(yamlJobProgress.getStatus())); result.setSourceDatabaseType(yamlJobProgress.getSourceDatabaseType()); - result.setInventoryTaskProgressMap(getInventoryTaskProgressMap(yamlJobProgress.getInventory())); - result.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(yamlJobProgress.getSourceDatabaseType(), yamlJobProgress.getIncremental())); + result.setJobInventoryTask(new YamlInventoryTaskProgressSwapper().swapToObject(yamlJobProgress.getInventory())); + result.setJobIncrementalTask(new YamlIncrementalTaskProgressSwapper().swapToObject(yamlJobProgress.getSourceDatabaseType(), yamlJobProgress.getIncremental())); return result; } - - private Map<String, InventoryTaskProgress> getInventoryTaskProgressMap(final YamlJobProgress.YamlInventory inventory) { - if (null == inventory) { - return new LinkedHashMap<>(); - } - Map<String, InventoryTaskProgress> result = new HashMap<>(); - result.putAll(Arrays.stream(inventory.getFinished()).collect(Collectors.toMap(each -> each, each -> new InventoryTaskProgress(new FinishedPosition())))); - result.putAll(inventory.getUnfinished().entrySet().stream().collect(Collectors.toMap(Entry::getKey, getInventoryTaskProgressFunction()))); - return result; - } - - private Function<Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() { - return entry -> new InventoryTaskProgress(Strings.isNullOrEmpty(entry.getValue()) ? new PlaceholderPosition() : PrimaryKeyPositionFactory.newInstance(entry.getValue())); - } - - private Map<String, IncrementalTaskProgress> getIncrementalTaskProgressMap(final String databaseType, final Map<String, YamlJobProgress.YamlIncremental> incremental) { - if (null == incremental) { - return new LinkedHashMap<>(); - } - return incremental.entrySet().stream().collect(Collectors.toMap(Entry::getKey, getIncrementalTaskProgressFunction(databaseType))); - } - - private Function<Entry<String, YamlJobProgress.YamlIncremental>, IncrementalTaskProgress> getIncrementalTaskProgressFunction(final String databaseType) { - return entry -> new IncrementalTaskProgress(PositionInitializerFactory.getInstance(databaseType).init(entry.getValue().getPosition()), entry.getValue().getDelay()); - } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java index 161ed5fdfbb..55a8ab607a1 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java @@ -71,14 +71,19 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) { this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine; taskId = dumperConfig.getDataSourceName(); - progress = new IncrementalTaskProgress(); IngestPosition<?> position = dumperConfig.getPosition(); - progress.setPosition(position); + progress = createIncrementalTaskProgress(position); channel = createChannel(concurrency, pipelineChannelCreator, progress); dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader); importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, pipelineJobPersistCallback); } + private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position) { + IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(); + incrementalTaskProgress.setPosition(position); + return incrementalTaskProgress; + } + @Override protected void doStart() { progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis()); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java index fe3e1043416..dea0d03c684 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java @@ -75,12 +75,12 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom } private static boolean isAllInventoryTasksCompleted(final Collection<JobProgress> jobProgresses) { - return jobProgresses.stream().flatMap(each -> each.getInventoryTaskProgressMap().values().stream()).allMatch(each -> each.getPosition() instanceof FinishedPosition); + return jobProgresses.stream().flatMap(each -> each.getJobInventoryTask().getInventoryTaskProgressMap().values().stream()).allMatch(each -> each.getPosition() instanceof FinishedPosition); } private static Collection<Long> getIncrementalTasksIdleSeconds(final Collection<JobProgress> jobProgresses) { long currentTimeMillis = System.currentTimeMillis(); - return jobProgresses.stream().flatMap(each -> each.getIncrementalTaskProgressMap().values().stream()) + return jobProgresses.stream().flatMap(each -> each.getJobIncrementalTask().getIncrementalTaskProgressMap().values().stream()) .map(each -> { long latestActiveTimeMillis = each.getIncrementalTaskDelay().getLatestActiveTimeMillis(); return latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0; diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java index 986c0c653cf..695258f4d28 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java @@ -45,8 +45,8 @@ public final class JobProgressTest { JobProgress jobProgress = getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")); assertThat(jobProgress.getStatus(), is(JobStatus.RUNNING)); assertThat(jobProgress.getSourceDatabaseType(), is("H2")); - assertThat(jobProgress.getInventoryTaskProgressMap().size(), is(4)); - assertThat(jobProgress.getIncrementalTaskProgressMap().size(), is(1)); + assertThat(jobProgress.getJobInventoryTask().getInventoryTaskProgressMap().size(), is(4)); + assertThat(jobProgress.getJobIncrementalTask().getIncrementalTaskProgressMap().size(), is(1)); } @Test diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java index 8914eff8557..e650552b2f6 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java @@ -26,7 +26,6 @@ import org.junit.Test; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; public final class YamlJobProgressSwapperTest { @@ -48,16 +47,15 @@ public final class YamlJobProgressSwapperTest { assertThat(actual.getInventory().getUnfinished().size(), is(2)); assertThat(actual.getInventory().getUnfinished().get("ds1.t_2"), is("i,1,2")); assertThat(actual.getInventory().getUnfinished().get("ds1.t_1"), is("")); - assertThat(actual.getIncremental().size(), is(1)); - assertTrue(actual.getIncremental().containsKey("ds0")); - assertNull(actual.getIncremental().get("position")); + assertThat(actual.getIncremental().getDataSourceName(), is("ds0")); + assertThat(actual.getIncremental().getPosition().length(), is(0)); } @Test public void assertNullIncremental() { JobProgress jobProgress = getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")); YamlJobProgress actual = SWAPPER.swapToYaml(jobProgress); - assertTrue(actual.getIncremental().isEmpty()); + assertNull(actual.getIncremental()); } @Test diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java index de613760532..141abac58e0 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java @@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered; import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter; import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobIncrementalTaskProgress; +import org.apache.shardingsphere.data.pipeline.api.job.progress.JobInventoryTaskProgress; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress; @@ -29,8 +31,6 @@ import org.junit.Test; import java.util.Collection; import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.Properties; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -68,9 +68,7 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest { public void assertFalseOnUnFinishedPosition() { int jobShardingCount = 1; JobProgress jobProgress = new JobProgress(); - Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new LinkedHashMap<>(); - jobProgress.setInventoryTaskProgressMap(inventoryTaskProgressMap); - inventoryTaskProgressMap.put("foo_ds", new InventoryTaskProgress(new PlaceholderPosition())); + jobProgress.setJobInventoryTask(new JobInventoryTaskProgress(Collections.singletonMap("foo_ds", new InventoryTaskProgress(new PlaceholderPosition())))); Collection<JobProgress> jobProgresses = Collections.singleton(jobProgress); RuleAlteredJobAlmostCompletedParameter parameter = new RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses); assertFalse(JobCompletionDetectAlgorithmFactory.newInstance(new AlgorithmConfiguration("IDLE", new Properties())).isAlmostCompleted(parameter)); @@ -100,10 +98,11 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest { private JobProgress createJobProgress(final long latestActiveTimeMillis) { JobProgress result = new JobProgress(); - result.setInventoryTaskProgressMap(Collections.singletonMap("foo_ds", new InventoryTaskProgress(new FinishedPosition()))); + result.setJobInventoryTask(new JobInventoryTaskProgress(Collections.singletonMap("foo_ds", new InventoryTaskProgress(new FinishedPosition())))); IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(); incrementalTaskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(latestActiveTimeMillis); - result.setIncrementalTaskProgressMap(Collections.singletonMap("foo_ds", incrementalTaskProgress)); + JobIncrementalTaskProgress jobIncrementalTaskProgress = new JobIncrementalTaskProgress(Collections.singletonMap("foo_ds", incrementalTaskProgress)); + result.setJobIncrementalTask(jobIncrementalTaskProgress); return result; } } diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java index cf8fe6c47c0..ee0ea66f257 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java @@ -83,8 +83,9 @@ public final class InventoryTaskSplitterTest { initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig()); List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext); assertThat(actual.size(), is(1)); - assertThat(((IntegerPrimaryKeyPosition) actual.get(0).getProgress().getPosition()).getBeginValue(), is(0L)); - assertThat(((IntegerPrimaryKeyPosition) actual.get(0).getProgress().getPosition()).getEndValue(), is(0L)); + InventoryTask task = actual.get(0); + assertThat(((IntegerPrimaryKeyPosition) task.getProgress().getPosition()).getBeginValue(), is(0L)); + assertThat(((IntegerPrimaryKeyPosition) task.getProgress().getPosition()).getEndValue(), is(0L)); } @Test @@ -92,8 +93,9 @@ public final class InventoryTaskSplitterTest { initIntPrimaryEnvironment(taskConfig.getDumperConfig()); List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext); assertThat(actual.size(), is(10)); - assertThat(((IntegerPrimaryKeyPosition) actual.get(9).getProgress().getPosition()).getBeginValue(), is(91L)); - assertThat(((IntegerPrimaryKeyPosition) actual.get(9).getProgress().getPosition()).getEndValue(), is(100L)); + InventoryTask task = actual.get(9); + assertThat(((IntegerPrimaryKeyPosition) task.getProgress().getPosition()).getBeginValue(), is(91L)); + assertThat(((IntegerPrimaryKeyPosition) task.getProgress().getPosition()).getEndValue(), is(100L)); } @Test diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml index ea94331aaf3..06de622c13a 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/governance-repository.yaml @@ -16,11 +16,11 @@ # incremental: - ds_0: - delay: - lastEventTimestamps: 0 - latestActiveTimeMillis: 0 - position: '' + dataSourceName: ds_0 + delay: + lastEventTimestamps: 0 + latestActiveTimeMillis: 0 + position: '' inventory: unfinished: ds_0.t_order#0: '' diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml index 04ba007f592..0ae217af602 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml @@ -16,11 +16,11 @@ # incremental: - ds0: - delay: - lastEventTimestamps: 0 - latestActiveTimeMillis: 50 - position: '' + dataSourceName: ds0 + delay: + lastEventTimestamps: 0 + latestActiveTimeMillis: 50 + position: '' inventory: finished: - ds0.t_2 diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml index 4b2efffdf7c..b0994b68b59 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml @@ -15,12 +15,12 @@ # limitations under the License. # # -# incremental: -# ds0: -# delay: -# lastEventTimestamps: 0 -# latestActiveTimeMillis: 0 -# position: '' +#incremental: +# dataSourceName: ds0 +# delay: +# lastEventTimestamps: 0 +# latestActiveTimeMillis: 0 +# position: '' inventory: unfinished: ds0.t_1: i,1,2 diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml index 58a4d08e708..897258a53ce 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml @@ -15,11 +15,11 @@ # limitations under the License. # # - incremental: - ds0: - delay: - lastEventTimestamps: 0 - latestActiveTimeMillis: 0 - position: '' - sourceDatabaseType: H2 - status: RUNNING +incremental: + dataSourceName: ds0 + delay: + lastEventTimestamps: 0 + latestActiveTimeMillis: 0 + position: '' +sourceDatabaseType: H2 +status: RUNNING diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml index 2333522962f..61fe5448960 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress.yaml @@ -16,11 +16,11 @@ # incremental: - ds0: - delay: - lastEventTimestamps: 0 - latestActiveTimeMillis: 0 - position: '' + dataSourceName: ds0 + delay: + lastEventTimestamps: 0 + latestActiveTimeMillis: 0 + position: '' inventory: finished: - ds0.t_2