This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0d7d25f0b59 Fix sonar issue of CDCJobAPI (#25659)
0d7d25f0b59 is described below
commit 0d7d25f0b59133eb9e4282366c8a0ea1e16cefdb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun May 14 20:47:45 2023 +0800
Fix sonar issue of CDCJobAPI (#25659)
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 158 +++++++++++----------
.../cdc/config/job/CDCJobConfiguration.java | 2 +-
2 files changed, 85 insertions(+), 75 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 5ffd47a2619..97931179bdf 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -119,66 +119,45 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
* @return job id
*/
public String createJob(final StreamDataParameter param, final CDCSinkType
sinkType, final Properties sinkProps) {
- YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
- yamlJobConfig.setDatabaseName(param.getDatabaseName());
- yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
- yamlJobConfig.setFull(param.isFull());
- yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
- YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
- sinkConfig.setSinkType(sinkType.name());
- sinkConfig.setProps(sinkProps);
- yamlJobConfig.setSinkConfig(sinkConfig);
PipelineContextKey contextKey =
PipelineContextKey.buildForProxy(param.getDatabaseName());
- ShardingSphereDatabase database =
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
-
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
- List<JobDataNodeLine> jobDataNodeLines =
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
-
yamlJobConfig.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- JobDataNodeLine tableFirstDataNodes = new
JobDataNodeLine(param.getDataNodesMap().entrySet().stream().map(each -> new
JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1)))
- .collect(Collectors.toList()));
- yamlJobConfig.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
+ YamlCDCJobConfiguration yamlJobConfig =
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
- String jobId = jobConfig.getJobId();
- ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobId));
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
- String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
+ ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
+ GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ String jobConfigKey =
PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("CDC job already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
- return jobId;
- }
- repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
getJobClassName());
- JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
- jobConfigPOJO.setDisabled(true);
- repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
- if (!param.isFull()) {
- initIncrementalPosition(jobConfig);
+ } else {
+
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getJobClassName());
+ JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
+ jobConfigPOJO.setDisabled(true);
+ repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfigPOJO));
+ if (!param.isFull()) {
+ initIncrementalPosition(jobConfig);
+ }
}
- return jobId;
+ return jobConfig.getJobId();
}
- private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
- PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
- String jobId = jobConfig.getJobId();
- try {
- for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
- if (getJobItemProgress(jobId, i).isPresent()) {
- continue;
- }
- TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
- DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
- InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
-
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
-
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
- jobItemProgress.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId,
i,
-
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
- }
- } catch (final SQLException ex) {
- throw new
PrepareJobWithGetBinlogPositionException(jobConfig.getJobId(), ex);
- } finally {
- dataSourceManager.close();
- }
+ private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final
StreamDataParameter param, final CDCSinkType sinkType, final Properties
sinkProps, final PipelineContextKey contextKey) {
+ YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
+ result.setDatabaseName(param.getDatabaseName());
+ result.setSchemaTableNames(param.getSchemaTableNames());
+ result.setFull(param.isFull());
+ result.setDecodeWithTX(param.isDecodeWithTX());
+ YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
+ sinkConfig.setSinkType(sinkType.name());
+ sinkConfig.setProps(sinkProps);
+ result.setSinkConfig(sinkConfig);
+ ShardingSphereDatabase database =
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
+
result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
+ List<JobDataNodeLine> jobDataNodeLines =
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
+
result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
+ JobDataNodeLine tableFirstDataNodes = new
JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
+ .map(each -> new JobDataNodeEntry(each.getKey(),
each.getValue().subList(0, 1))).collect(Collectors.toList()));
+ result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
+ return result;
}
private ShardingSpherePipelineDataSourceConfiguration
getDataSourceConfiguration(final ShardingSphereDatabase database) {
@@ -194,35 +173,41 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return new
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
}
- @Override
- public void extendYamlJobConfiguration(final PipelineContextKey
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
- YamlCDCJobConfiguration config = (YamlCDCJobConfiguration)
yamlJobConfig;
- if (null == yamlJobConfig.getJobId()) {
- config.setJobId(generateJobId(contextKey, config));
- }
- if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
- PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
- config.getDataSourceConfiguration().getParameter());
-
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ String jobId = jobConfig.getJobId();
+ try {
+ for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
+ if (getJobItemProgress(jobId, i).isPresent()) {
+ continue;
+ }
+ DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, i,
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
+ InventoryIncrementalJobItemProgress jobItemProgress =
getInventoryIncrementalJobItemProgress(jobConfig, dataSourceManager,
dumperConfig);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
+ jobId, i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ }
+ } catch (final SQLException ex) {
+ throw new
PrepareJobWithGetBinlogPositionException(jobConfig.getJobId(), ex);
+ } finally {
+ dataSourceManager.close();
}
}
- private String generateJobId(final PipelineContextKey contextKey, final
YamlCDCJobConfiguration config) {
- CDCJobId jobId = new CDCJobId(contextKey,
config.getSchemaTableNames(), config.isFull(),
config.getSinkConfig().getSinkType());
- return marshalJobId(jobId);
- }
-
- @Override
- protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
- CDCJobId jobId = (CDCJobId) pipelineJobId;
- String text =
Joiner.on('|').join(jobId.getContextKey().getDatabaseName(),
jobId.getSchemaTableNames(), jobId.isFull());
- return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
+ private static InventoryIncrementalJobItemProgress
getInventoryIncrementalJobItemProgress(final CDCJobConfiguration jobConfig,
+
final PipelineDataSourceManager dataSourceManager,
+
final DumperConfiguration dumperConfig) throws SQLException {
+ InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
+ result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+ result.setDataSourceName(dumperConfig.getDataSourceName());
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
+ result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+ return result;
}
/**
* Start job.
*
- * @param jobId job id
+ * @param jobId job id
* @param importerConnector importer connector
*/
public void startJob(final String jobId, final ImporterConnector
importerConnector) {
@@ -238,7 +223,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
/**
* Update job configuration disabled.
*
- * @param jobId job id
+ * @param jobId job id
* @param disabled disabled
*/
public void updateJobConfigurationDisabled(final String jobId, final
boolean disabled) {
@@ -253,6 +238,31 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
}
+ @Override
+ public void extendYamlJobConfiguration(final PipelineContextKey
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
+ YamlCDCJobConfiguration config = (YamlCDCJobConfiguration)
yamlJobConfig;
+ if (null == yamlJobConfig.getJobId()) {
+ config.setJobId(generateJobId(contextKey, config));
+ }
+ if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+ PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
+ config.getDataSourceConfiguration().getParameter());
+
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+ }
+ }
+
+ private String generateJobId(final PipelineContextKey contextKey, final
YamlCDCJobConfiguration config) {
+ CDCJobId jobId = new CDCJobId(contextKey,
config.getSchemaTableNames(), config.isFull(),
config.getSinkConfig().getSinkType());
+ return marshalJobId(jobId);
+ }
+
+ @Override
+ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+ CDCJobId jobId = (CDCJobId) pipelineJobId;
+ String text =
Joiner.on('|').join(jobId.getContextKey().getDatabaseName(),
jobId.getSchemaTableNames(), jobId.isFull());
+ return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
+ }
+
@Override
public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 8df7a2783b6..fa0b79c341c 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -30,8 +30,8 @@ import java.util.Properties;
/**
* CDC job configuration.
*/
-@Getter
@RequiredArgsConstructor
+@Getter
public final class CDCJobConfiguration implements PipelineJobConfiguration {
private final String jobId;