This is an automated email from the ASF dual-hosted git repository. panjuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new fa4a9d61b73 Refactor and make RuleAlteredJobConfiguration immutable (#17682) fa4a9d61b73 is described below commit fa4a9d61b730c9b242a2f3c6d4d8ebcb40f443bc Author: Hongsheng Zhong <sand...@126.com> AuthorDate: Mon May 16 11:11:18 2022 +0800 Refactor and make RuleAlteredJobConfiguration immutable (#17682) --- ...hardingRuleAlteredJobConfigurationPreparer.java | 14 +-- .../rulealtered/RuleAlteredJobConfiguration.java | 115 ++++----------------- .../yaml/RuleAlteredJobConfigurationSwapper.java | 76 ++++++++++++++ .../YamlRuleAlteredJobConfiguration.java} | 61 ++++------- .../PipelineDataSourceConfigurationSwapper.java | 47 +++++++++ .../RuleAlteredJobConfigurationPreparer.java | 5 +- .../RuleAlteredJobConfigurationTest.java | 18 +++- .../core/api/impl/RuleAlteredJobAPIImpl.java | 14 +-- .../check/consistency/DataConsistencyChecker.java | 15 ++- .../pipeline/core/execute/PipelineJobExecutor.java | 9 +- .../data/pipeline/core/job/FinishedCheckJob.java | 6 +- .../scenario/rulealtered/RuleAlteredJob.java | 4 +- .../rulealtered/RuleAlteredJobContext.java | 1 - .../scenario/rulealtered/RuleAlteredJobWorker.java | 15 ++- .../job/environment/ScalingEnvironmentManager.java | 4 +- .../datasource/MySQLDataSourcePreparerTest.java | 10 +- .../data/pipeline/env/ITEnvironmentContext.java | 4 +- .../core/util/JobConfigurationBuilder.java | 8 +- .../scenario/rulealtered/RuleAlteredJobTest.java | 74 ------------- .../rulealtered/RuleAlteredJobWorkerTest.java | 8 +- 20 files changed, 238 insertions(+), 270 deletions(-) diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java index 9121aec9cf1..aa13d53a231 100644 --- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java +++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java @@ -24,6 +24,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; @@ -73,14 +75,14 @@ import java.util.stream.Collectors; public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer { @Override - public void extendJobConfiguration(final RuleAlteredJobConfiguration jobConfig) { - Map<String, List<DataNode>> shouldScalingActualDataNodes = getShouldScalingActualDataNodes(jobConfig); - jobConfig.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes)); - jobConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet())); - jobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes)); + public void extendJobConfiguration(final YamlRuleAlteredJobConfiguration yamlJobConfig) { + Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new RuleAlteredJobConfigurationSwapper().swapToObject(yamlJobConfig)); + yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes)); + yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet())); + yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes)); } - private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final RuleAlteredJobConfiguration jobConfig) { + private static Map<String, List<DataNode>> getActualDataNodes(final RuleAlteredJobConfiguration jobConfig) { PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()); ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig; ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules()); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java index 9565b9539c8..b32411d9d19 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java @@ -17,59 +17,46 @@ package org.apache.shardingsphere.data.pipeline.api.config.rulealtered; -import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import com.google.common.base.Strings; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.job.JobSubType; -import org.apache.shardingsphere.data.pipeline.api.job.JobType; -import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId; -import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory; -import java.util.Collections; import java.util.List; import java.util.Map; /** - * Scaling job configuration. + * Rule altered job configuration. */ -@NoArgsConstructor -@AllArgsConstructor +@RequiredArgsConstructor @Getter -@Setter @Slf4j -// TODO share for totally new scenario -// TODO rename to Yaml, add config class public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration { - private String jobId; + private final String jobId; - private String databaseName; + private final String databaseName; - /** - * Map{altered rule yaml class name, re-shard needed table names}. - */ - private Map<String, List<String>> alteredRuleYamlClassNameTablesMap; + private final Integer activeVersion; + + private final Integer newVersion; - private Integer activeVersion; + private final String sourceDatabaseType; - private Integer newVersion; + private final String targetDatabaseType; - private YamlPipelineDataSourceConfiguration source; + private final PipelineDataSourceConfiguration source; - private YamlPipelineDataSourceConfiguration target; + private final PipelineDataSourceConfiguration target; - private int concurrency = 3; + /** + * Map{altered rule yaml class name, re-shard needed table names}. + */ + private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap; - private int retryTimes = 3; + private final String logicTables; /** * Collection of each logic table's first data node. @@ -78,41 +65,13 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati * then value may be: {@code t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}. * </p> */ - private String tablesFirstDataNodes; + private final String tablesFirstDataNodes; - private List<String> jobShardingDataNodes; - - private String logicTables; - - private String sourceDatabaseType; - - private String targetDatabaseType; - - /** - * Set source. - * - * @param source source configuration - */ - public void setSource(final YamlPipelineDataSourceConfiguration source) { - checkParameters(source); - this.source = source; - } + private final List<String> jobShardingDataNodes; - /** - * Set target. - * - * @param target target configuration - */ - public void setTarget(final YamlPipelineDataSourceConfiguration target) { - checkParameters(target); - this.target = target; - } + private final int concurrency; - private void checkParameters(final YamlPipelineDataSourceConfiguration yamlConfig) { - Preconditions.checkNotNull(yamlConfig); - Preconditions.checkNotNull(yamlConfig.getType()); - Preconditions.checkNotNull(yamlConfig.getParameter()); - } + private final int retryTimes; /** * Get job sharding count. @@ -132,38 +91,6 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati return Splitter.on(',').splitToList(logicTables); } - /** - * Build handle configuration. - */ - public void buildHandleConfig() { - if (null == getJobShardingDataNodes()) { - RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this); - } - if (null == jobId) { - jobId = generateJobId(); - } - if (Strings.isNullOrEmpty(getSourceDatabaseType())) { - PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()); - setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType()); - } - if (Strings.isNullOrEmpty(getTargetDatabaseType())) { - PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()); - setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType()); - } - } - - private String generateJobId() { - RuleAlteredJobId jobId = new RuleAlteredJobId(); - // TODO type, subTypes - jobId.setType(JobType.RULE_ALTERED.getValue()); - jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION); - jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue())); - jobId.setCurrentMetadataVersion(activeVersion); - jobId.setNewMetadataVersion(newVersion); - jobId.setDatabaseName(databaseName); - return jobId.marshal(); - } - @Override public String toString() { return "RuleAlteredJobConfiguration{" diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java new file mode 100644 index 00000000000..df11b1661e9 --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/RuleAlteredJobConfigurationSwapper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml; + +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.PipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper; +import org.apache.shardingsphere.infra.yaml.engine.YamlEngine; + +/** + * Rule altered job configuration swapper. + */ +// TODO add RuleAlteredJobConfigurationSwapper test +public final class RuleAlteredJobConfigurationSwapper implements YamlConfigurationSwapper<YamlRuleAlteredJobConfiguration, RuleAlteredJobConfiguration> { + + private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper(); + + private final PipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new PipelineDataSourceConfigurationSwapper(); + + @Override + public YamlRuleAlteredJobConfiguration swapToYamlConfiguration(final RuleAlteredJobConfiguration data) { + YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration(); + result.setJobId(data.getJobId()); + result.setDatabaseName(data.getDatabaseName()); + result.setActiveVersion(data.getActiveVersion()); + result.setNewVersion(data.getNewVersion()); + result.setSourceDatabaseType(data.getSourceDatabaseType()); + result.setTargetDatabaseType(data.getTargetDatabaseType()); + result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource())); + result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget())); + result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap()); + result.setLogicTables(data.getLogicTables()); + result.setTablesFirstDataNodes(data.getTablesFirstDataNodes()); + result.setJobShardingDataNodes(data.getJobShardingDataNodes()); + result.setConcurrency(data.getConcurrency()); + result.setRetryTimes(data.getRetryTimes()); + return result; + } + + @Override + public RuleAlteredJobConfiguration swapToObject(final YamlRuleAlteredJobConfiguration yamlConfig) { + return new RuleAlteredJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(), + yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(), + yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(), + dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()), + yamlConfig.getAlteredRuleYamlClassNameTablesMap(), yamlConfig.getLogicTables(), + yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(), + yamlConfig.getConcurrency(), yamlConfig.getRetryTimes()); + } + + /** + * Swap to job configuration from text. + * + * @param jobParameter job parameter + * @return job configuration + */ + public static RuleAlteredJobConfiguration swapToObject(final String jobParameter) { + YamlRuleAlteredJobConfiguration yamlJobConfig = YamlEngine.unmarshal(jobParameter, YamlRuleAlteredJobConfiguration.class, true); + return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig); + } +} diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java similarity index 84% copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java index 9565b9539c8..a7b0cd9c7aa 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java @@ -15,17 +15,13 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.config.rulealtered; +package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml; import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; import com.google.common.base.Strings; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration; @@ -33,43 +29,42 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobSubType; import org.apache.shardingsphere.data.pipeline.api.job.JobType; import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId; import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory; +import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration; import java.util.Collections; import java.util.List; import java.util.Map; /** - * Scaling job configuration. + * Rule altered job configuration for YAML. */ -@NoArgsConstructor -@AllArgsConstructor @Getter @Setter @Slf4j -// TODO share for totally new scenario -// TODO rename to Yaml, add config class -public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration { +public final class YamlRuleAlteredJobConfiguration implements YamlConfiguration { private String jobId; private String databaseName; - /** - * Map{altered rule yaml class name, re-shard needed table names}. - */ - private Map<String, List<String>> alteredRuleYamlClassNameTablesMap; - private Integer activeVersion; private Integer newVersion; + private String sourceDatabaseType; + + private String targetDatabaseType; + private YamlPipelineDataSourceConfiguration source; private YamlPipelineDataSourceConfiguration target; - private int concurrency = 3; + /** + * Map{altered rule yaml class name, re-shard needed table names}. + */ + private Map<String, List<String>> alteredRuleYamlClassNameTablesMap; - private int retryTimes = 3; + private String logicTables; /** * Collection of each logic table's first data node. @@ -82,11 +77,9 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati private List<String> jobShardingDataNodes; - private String logicTables; - - private String sourceDatabaseType; + private int concurrency = 3; - private String targetDatabaseType; + private int retryTimes = 3; /** * Set source. @@ -115,27 +108,9 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati } /** - * Get job sharding count. - * - * @return job sharding count - */ - public int getJobShardingCount() { - return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size(); - } - - /** - * Split {@linkplain #logicTables} to logic table names. - * - * @return logic table names - */ - public List<String> splitLogicTableNames() { - return Splitter.on(',').splitToList(logicTables); - } - - /** - * Build handle configuration. + * Extend configuration. */ - public void buildHandleConfig() { + public void extendConfiguration() { if (null == getJobShardingDataNodes()) { RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this); } @@ -166,7 +141,7 @@ public final class RuleAlteredJobConfiguration implements PipelineJobConfigurati @Override public String toString() { - return "RuleAlteredJobConfiguration{" + return "YamlRuleAlteredJobConfiguration{" + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\'' + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\'' diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java new file mode 100644 index 00000000000..b80203f0172 --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/yaml/PipelineDataSourceConfigurationSwapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml; + +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; +import org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper; + +/** + * Pipeline data source configuration YAML swapper. + */ +public final class PipelineDataSourceConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineDataSourceConfiguration, PipelineDataSourceConfiguration> { + + @Override + public YamlPipelineDataSourceConfiguration swapToYamlConfiguration(final PipelineDataSourceConfiguration data) { + if (null == data) { + return null; + } + YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration(); + result.setType(data.getType()); + result.setParameter(data.getParameter()); + return result; + } + + @Override + public PipelineDataSourceConfiguration swapToObject(final YamlPipelineDataSourceConfiguration yamlConfig) { + if (null == yamlConfig) { + return null; + } + return PipelineDataSourceConfigurationFactory.newInstance(yamlConfig.getType(), yamlConfig.getParameter()); + } +} diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java index d189d128da5..c947e3874bd 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java @@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.spi.rulealtered; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration; import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration; import org.apache.shardingsphere.spi.type.required.RequiredSPI; @@ -30,9 +31,9 @@ public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI { /** * Extend job configuration. * - * @param jobConfig job configuration + * @param yamlJobConfig YAML job configuration */ - void extendJobConfiguration(RuleAlteredJobConfiguration jobConfig); + void extendJobConfiguration(YamlRuleAlteredJobConfiguration yamlJobConfig); /** * Create task configuration, used by underlying scheduler. diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java index 22a227032a2..2d783b94831 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java @@ -17,6 +17,8 @@ package org.apache.shardingsphere.data.pipeline.api.config.rulealtered; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration; import org.junit.Test; import java.util.Arrays; @@ -26,22 +28,28 @@ import static org.junit.Assert.assertThat; public final class RuleAlteredJobConfigurationTest { + private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper(); + @Test public void assertGetJobShardingCountByNull() { - assertThat(new RuleAlteredJobConfiguration().getJobShardingCount(), is(0)); + YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration(); + RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig); + assertThat(jobConfig.getJobShardingCount(), is(0)); } @Test public void assertGetJobShardingCount() { - RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration(); - jobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2")); + YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration(); + yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2")); + RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig); assertThat(jobConfig.getJobShardingCount(), is(2)); } @Test public void assertSplitLogicTableNames() { - RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration(); - jobConfig.setLogicTables("foo_tbl,bar_tbl"); + YamlRuleAlteredJobConfiguration yamlJobConfig = new YamlRuleAlteredJobConfiguration(); + yamlJobConfig.setLogicTables("foo_tbl,bar_tbl"); + RuleAlteredJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig); assertThat(jobConfig.splitLogicTableNames(), is(Arrays.asList("foo_tbl", "bar_tbl"))); } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java index 8a5fd5cb12b..56426ee3ab1 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI; import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo; @@ -102,8 +103,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl @Override public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) { - jobConfig.buildHandleConfig(); - if (jobConfig.getJobShardingCount() == 0) { + if (0 == jobConfig.getJobShardingCount()) { log.warn("Invalid scaling job config!"); throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0"); } @@ -116,15 +116,15 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl return Optional.of(jobId); } repositoryAPI.persist(String.format("%s/%s", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId), RuleAlteredJob.class.getName()); - repositoryAPI.persist(jobConfigKey, createJobConfig(jobConfig)); + repositoryAPI.persist(jobConfigKey, createJobConfigText(jobConfig)); return Optional.of(jobId); } - private String createJobConfig(final RuleAlteredJobConfiguration jobConfig) { + private String createJobConfigText(final RuleAlteredJobConfiguration jobConfig) { JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO(); jobConfigPOJO.setJobName(jobConfig.getJobId()); jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount()); - jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig)); + jobConfigPOJO.setJobParameter(YamlEngine.marshal(new RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER)); return YamlEngine.marshal(jobConfigPOJO); } @@ -365,7 +365,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl return getJobConfig(getElasticJobConfigPOJO(jobId)); } - private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) { - return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true); + private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO jobConfigPOJO) { + return RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()); } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java index 67c712cc391..eda9c1f476c 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java @@ -27,7 +27,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; @@ -155,8 +154,10 @@ public final class DataConsistencyChecker { } private Map<String, DataConsistencyContentCheckResult> checkData(final DataConsistencyCalculateAlgorithm calculator) { - PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getSource()); - PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getTarget()); + decoratePipelineDataSourceConfiguration(calculator, jobConfig.getSource()); + PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSource(); + decoratePipelineDataSourceConfiguration(calculator, jobConfig.getTarget()); + PipelineDataSourceConfiguration targetDataSourceConfig = jobConfig.getTarget(); ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d"); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory); JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm(); @@ -202,11 +203,9 @@ public final class DataConsistencyChecker { return result; } - private PipelineDataSourceConfiguration getPipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final YamlPipelineDataSourceConfiguration dataSourceConfig) { - PipelineDataSourceConfiguration result = PipelineDataSourceConfigurationFactory.newInstance(dataSourceConfig.getType(), dataSourceConfig.getParameter()); - checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), result.getDatabaseType().getType()); - addMySQLDataSourceConfig(result); - return result; + private void decoratePipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final PipelineDataSourceConfiguration dataSourceConfig) { + checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), dataSourceConfig.getDatabaseType().getType()); + addMySQLDataSourceConfig(dataSourceConfig); } private void checkDatabaseTypeSupported(final Collection<String> supportedDatabaseTypes, final String databaseType) { diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java index a3d988f01a8..51321db57f5 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java @@ -18,9 +18,10 @@ package org.apache.shardingsphere.data.pipeline.core.execute; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory; import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI; +import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants; @@ -47,6 +48,8 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor { private static final Pattern CONFIG_PATTERN = Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + "/(\\d{2}[0-9a-f]+)/config"); + private static final RuleAlteredJobConfigurationSwapper JOB_CONFIG_SWAPPER = new RuleAlteredJobConfigurationSwapper(); + @Override protected void doStart() { watchGovernanceRepositoryConfiguration(); @@ -66,7 +69,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor { log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), deleted, disabled); RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName()); // TODO refactor: dispatch to different job types - RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true); + RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()); if (deleted) { new RuleAlteredJobPreparer().cleanup(jobConfig); } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) { @@ -80,7 +83,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor { switch (event.getType()) { case ADDED: case UPDATED: - RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true); + RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()); String databaseName = jobConfig.getDatabaseName(); if (PipelineSimpleLock.getInstance().tryLock(databaseName, 1000)) { execute(jobConfigPOJO); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java index 870a0339573..724c70fca0a 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java @@ -19,9 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.job; import io.vertx.core.impl.ConcurrentHashSet; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory; import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI; +import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; @@ -32,7 +33,6 @@ import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock; import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; -import org.apache.shardingsphere.infra.yaml.engine.YamlEngine; import java.util.List; import java.util.Map; @@ -64,7 +64,7 @@ public final class FinishedCheckJob implements SimpleJob { onCheckJobIds.add(jobId); try { // TODO refactor: dispatch to different job types - RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), RuleAlteredJobConfiguration.class, true); + RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter()); RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig); if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) { log.info("completionDetector not configured, auto switch will not be enabled. You could query job progress and switch config manually with DistSQL."); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java index bb368119b3f..cbfa0940ef2 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java @@ -19,13 +19,13 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus; -import org.apache.shardingsphere.infra.yaml.engine.YamlEngine; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent; /** @@ -42,7 +42,7 @@ public final class RuleAlteredJob implements SimpleJob { @Override public void execute(final ShardingContext shardingContext) { log.info("Execute job {}-{}", shardingContext.getJobName(), shardingContext.getShardingItem()); - RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), RuleAlteredJobConfiguration.class, true); + RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter()); RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem()); jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem())); jobContext.setJobPreparer(jobPreparer); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java index 6f1550d7bab..35059e626c7 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java @@ -86,7 +86,6 @@ public final class RuleAlteredJobContext { public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, final int jobShardingItem) { ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig); this.jobConfig = jobConfig; - jobConfig.buildHandleConfig(); jobId = jobConfig.getJobId(); this.shardingItem = jobShardingItem; taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, jobShardingItem, ruleAlteredContext.getOnRuleAlteredActionConfig()); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java index a9ea71faba6..694dc084a3c 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java @@ -24,6 +24,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; @@ -204,14 +206,15 @@ public final class RuleAlteredJobWorker { log.error("more than 1 rule altered"); throw new PipelineJobCreationException("more than 1 rule altered"); } - RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration(); + YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration(); result.setDatabaseName(event.getDatabaseName()); result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap); result.setActiveVersion(event.getActiveVersion()); result.setNewVersion(event.getNewVersion()); result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig)); result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig)); - return Optional.of(result); + result.extendConfiguration(); + return Optional.of(new RuleAlteredJobConfigurationSwapper().swapToObject(result)); } private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules, @@ -289,8 +292,8 @@ public final class RuleAlteredJobWorker { .allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) { continue; } - RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(each.getJobParameter(), RuleAlteredJobConfiguration.class, true); - if (hasUncompletedJobOfSameDatabaseName(jobConfig, databaseName)) { + RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(each.getJobParameter()); + if (databaseName.equals(jobConfig.getDatabaseName())) { result = true; break; } @@ -298,10 +301,6 @@ public final class RuleAlteredJobWorker { return result; } - private boolean hasUncompletedJobOfSameDatabaseName(final RuleAlteredJobConfiguration jobConfig, final String currentDatabaseName) { - return currentDatabaseName.equals(jobConfig.getDatabaseName()); - } - /** * scaling release database level lock. * diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java index 61c31562e35..1e963116474 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java @@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory; @@ -50,7 +50,7 @@ public final class ScalingEnvironmentManager { public void cleanupTargetTables(final RuleAlteredJobConfiguration jobConfig) throws SQLException { Collection<String> tables = jobConfig.splitLogicTableNames(); log.info("cleanupTargetTables, tables={}", tables); - YamlPipelineDataSourceConfiguration target = jobConfig.getTarget(); + PipelineDataSourceConfiguration target = jobConfig.getTarget(); PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType()); ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData(jobConfig.getDatabaseName()); TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(metaData.getSchemas())); diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java index 96db014402d..140528e89bd 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java @@ -20,9 +20,9 @@ package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException; import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter; @@ -56,10 +56,10 @@ public final class MySQLDataSourcePreparerTest { private RuleAlteredJobConfiguration jobConfig; @Mock - private YamlPipelineDataSourceConfiguration sourceYamlPipelineDataSourceConfig; + private PipelineDataSourceConfiguration sourcePipelineDataSourceConfig; @Mock - private YamlPipelineDataSourceConfiguration targetYamlPipelineDataSourceConfig; + private PipelineDataSourceConfiguration targetPipelineDataSourceConfig; @Mock private ShardingSpherePipelineDataSourceConfiguration sourceScalingDataSourceConfig; @@ -79,10 +79,10 @@ public final class MySQLDataSourcePreparerTest { when(mockPipelineDataSourceManager.getDataSource(same(sourceScalingDataSourceConfig))).thenReturn(sourceDataSourceWrapper); when(mockPipelineDataSourceManager.getDataSource(same(targetScalingDataSourceConfig))).thenReturn(targetDataSourceWrapper); when(prepareTargetTablesParameter.getDataSourceManager()).thenReturn(mockPipelineDataSourceManager); - when(jobConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfig); + when(jobConfig.getSource()).thenReturn(sourcePipelineDataSourceConfig); when(jobConfig.getSource().getType()).thenReturn("ShardingSphereJDBC"); when(jobConfig.getSource().getParameter()).thenReturn("source"); - when(jobConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfig); + when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig); when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC"); when(jobConfig.getTarget().getParameter()).thenReturn("target"); when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig); diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java index 30a8a7ff919..4de500bd417 100644 --- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java +++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java @@ -20,7 +20,7 @@ package org.apache.shardingsphere.integration.data.pipeline.env; import com.google.gson.Gson; import lombok.Getter; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; @@ -81,7 +81,7 @@ public final class ITEnvironmentContext { } private static String createScalingConfiguration(final Map<String, YamlTableRuleConfiguration> tableRules) { - RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration(); + YamlRuleAlteredJobConfiguration jobConfig = new YamlRuleAlteredJobConfiguration(); jobConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules))); jobConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration())); return new Gson().toJson(jobConfig); diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java index 4b3fe083426..1900339730a 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java @@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; @@ -44,7 +46,7 @@ public final class JobConfigurationBuilder { * @return created job configuration */ public static RuleAlteredJobConfiguration createJobConfiguration() { - RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration(); + YamlRuleAlteredJobConfiguration result = new YamlRuleAlteredJobConfiguration(); result.setDatabaseName("logic_db"); result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order"))); result.setActiveVersion(0); @@ -53,10 +55,10 @@ public final class JobConfigurationBuilder { result.setSource(createYamlPipelineDataSourceConfiguration( new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml")))); result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml")))); - result.buildHandleConfig(); + result.extendConfiguration(); int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1; result.setJobId(generateJobId(activeVersion, "logic_db")); - return result; + return new RuleAlteredJobConfigurationSwapper().swapToObject(result); } private static YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final PipelineDataSourceConfiguration config) { diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java deleted file mode 100644 index 1600b0127f1..00000000000 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.scenario.rulealtered; - -import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; -import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; -import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder; -import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil; -import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil; -import org.apache.shardingsphere.elasticjob.api.ShardingContext; -import org.apache.shardingsphere.infra.yaml.engine.YamlEngine; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Map; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; - -public final class RuleAlteredJobTest { - - @BeforeClass - public static void beforeClass() { - PipelineContextUtil.mockModeConfigAndContextManager(); - } - - @Test - @SuppressWarnings("unchecked") - @SneakyThrows(ReflectiveOperationException.class) - public void assertExecute() { - RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); - initTableData(jobConfig); - ShardingContext shardingContext = new ShardingContext("1", null, 2, YamlEngine.marshal(jobConfig), 0, null); - new RuleAlteredJob().execute(shardingContext); - Map<String, RuleAlteredJobScheduler> jobSchedulerMap = ReflectionUtil.getStaticFieldValue(RuleAlteredJobSchedulerCenter.class, "JOB_SCHEDULER_MAP", Map.class); - assertNotNull(jobSchedulerMap); - assertFalse(jobSchedulerMap.isEmpty()); - } - - @SneakyThrows(SQLException.class) - private void initTableData(final RuleAlteredJobConfiguration jobConfig) { - YamlPipelineDataSourceConfiguration source = jobConfig.getSource(); - try ( - PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter())); - Connection connection = dataSource.getConnection(); - Statement statement = connection.createStatement()) { - statement.execute("DROP TABLE IF EXISTS t_order"); - statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))"); - statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')"); - } - } -} diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java index 577dee68b64..2ed00596652 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java @@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered; import org.apache.commons.io.FileUtils; import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI; @@ -56,8 +58,10 @@ public final class RuleAlteredJobWorkerTest { @Test(expected = PipelineJobCreationException.class) public void assertCreateRuleAlteredContextNoAlteredRule() { RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); - jobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap()); - RuleAlteredJobWorker.createRuleAlteredContext(jobConfig); + RuleAlteredJobConfigurationSwapper swapper = new RuleAlteredJobConfigurationSwapper(); + YamlRuleAlteredJobConfiguration yamlJobConfig = swapper.swapToYamlConfiguration(jobConfig); + yamlJobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap()); + RuleAlteredJobWorker.createRuleAlteredContext(swapper.swapToObject(yamlJobConfig)); } @Test