This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new de114d56444 Refactor PipelineDataSourceFactory (#29503)
de114d56444 is described below
commit de114d56444dd1c2b1e122d7e79a4ea723477fe1
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 22 15:59:28 2023 +0800
Refactor PipelineDataSourceFactory (#29503)
---
.../core/datasource/PipelineDataSourceFactory.java | 48 ----------------------
.../core/datasource/PipelineDataSourceManager.java | 2 +-
.../core/datasource/PipelineDataSourceWrapper.java | 10 +++++
.../core/metadata/loader/PipelineSchemaUtils.java | 3 +-
.../datasource/PipelineDataSourceFactoryTest.java | 41 ------------------
.../scenario/migration/api/MigrationJobAPI.java | 15 ++++---
.../pipeline/cases/PipelineContainerComposer.java | 4 +-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 11 +++--
.../core/util/JobConfigurationBuilder.java | 3 +-
9 files changed, 27 insertions(+), 110 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
deleted file mode 100644
index c08946e05c7..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.datasource;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-
-import javax.sql.DataSource;
-import java.sql.SQLException;
-
-/**
- * Pipeline data source factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineDataSourceFactory {
-
- /**
- * New instance data source wrapper.
- *
- * @param pipelineDataSourceConfig pipeline data source configuration
- * @return new data source wrapper
- */
- @SneakyThrows(SQLException.class)
- public static PipelineDataSourceWrapper newInstance(final
PipelineDataSourceConfiguration pipelineDataSourceConfig) {
- DataSource dataSource = TypedSPILoader.getService(
- PipelineDataSourceCreator.class,
pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration());
- return new PipelineDataSourceWrapper(dataSource,
pipelineDataSourceConfig.getDatabaseType());
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index 6ef42f14e6b..ad731b33cbc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -51,7 +51,7 @@ public final class PipelineDataSourceManager implements
AutoCloseable {
}
log.warn("{} is already closed, create again.", result);
}
- result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
+ result = new PipelineDataSourceWrapper(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, result);
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
index 253e9a9a16a..162ba91a4ef 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
@@ -19,9 +19,13 @@ package
org.apache.shardingsphere.data.pipeline.core.datasource;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
import java.io.PrintWriter;
@@ -45,6 +49,12 @@ public final class PipelineDataSourceWrapper implements
DataSource, AutoCloseabl
private final AtomicBoolean closed = new AtomicBoolean(false);
+ @SneakyThrows(SQLException.class)
+ public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration
pipelineDataSourceConfig) {
+ this(TypedSPILoader.getService(PipelineDataSourceCreator.class,
pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration()),
+ pipelineDataSourceConfig.getDatabaseType());
+ }
+
/**
* Whether underlying data source is closed or not.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
index 25dbf133134..f1e78d620f8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
@@ -22,7 +22,6 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import java.sql.Connection;
@@ -43,7 +42,7 @@ public final class PipelineSchemaUtils {
*/
@SneakyThrows(SQLException.class)
public static String getDefaultSchema(final
PipelineDataSourceConfiguration dataSourceConfig) {
- try (PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
+ try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(dataSourceConfig)) {
try (Connection connection = dataSource.getConnection()) {
String result = connection.getSchema();
log.info("get default schema {}", result);
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java
deleted file mode 100644
index 829aa34d922..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.datasource;
-
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class PipelineDataSourceFactoryTest {
-
- @Test
- void assertNewInstance() {
- Map<String, Object> yamlDataSourceConfig = new HashMap<>(3, 1F);
- yamlDataSourceConfig.put("url",
"jdbc:mysql://localhost:3306/database");
- yamlDataSourceConfig.put("username", "username");
- yamlDataSourceConfig.put("password", "password");
- PipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(yamlDataSourceConfig);
-
assertThat(PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDatabaseType(),
is(pipelineDataSourceConfig.getDatabaseType()));
- }
-}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index df1dc31201e..3e16d0e239f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -27,23 +27,24 @@ import
org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;
+import
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -67,8 +68,6 @@ import org.apache.shardingsphere.infra.util.json.JsonUtils;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;
-import
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
import org.apache.shardingsphere.mode.manager.ContextManager;
import java.sql.Connection;
@@ -322,7 +321,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
- PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
+ PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
for (String each : jobConfig.getTargetTableNames()) {
String targetSchemaName = mapping.getSchemaName(each);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index f0cf4df32cd..d3ba2ec2a00 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -24,7 +24,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
@@ -580,7 +580,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
YamlSingleRuleConfiguration singleRuleConfig = new
YamlSingleRuleConfiguration();
singleRuleConfig.setTables(Collections.singletonList("*.*"));
rootConfig.getRules().add(singleRuleConfig);
- return PipelineDataSourceFactory.newInstance(new
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
+ return new PipelineDataSourceWrapper(new
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
}
private YamlRootConfiguration getYamlRootConfig() {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index a3e2df5d89e..8720812aad8 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -27,16 +27,15 @@ import
org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreaming
import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -175,7 +174,7 @@ class CDCE2EIT {
}
private DataSource createStandardDataSource(final
PipelineContainerComposer containerComposer, final String storageUnitName) {
- return PipelineDataSourceFactory.newInstance(new
StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName,
false),
+ return new PipelineDataSourceWrapper(new
StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName,
false),
containerComposer.getUsername(),
containerComposer.getPassword()));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 04af7e5121e..6e88c183f8b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -83,7 +82,7 @@ public final class JobConfigurationBuilder {
PipelineDataSourceConfiguration sourceDataSourceConfig = new
StandardPipelineDataSourceConfiguration(
ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml").replace("${databaseNameSuffix}",
databaseNameSuffix));
try (
- PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
+ PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(sourceDataSourceConfig);
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(PipelineContextUtils.getCreateOrderTableSchema());