This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 50ae15890a0 Add CreateTableConfiguration; Refactor DataSourcePreparer
and job preparer (#20824)
50ae15890a0 is described below
commit 50ae15890a00d89f3799a63279f6ceeb95d032b0
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Sep 6 14:33:33 2022 +0800
Add CreateTableConfiguration; Refactor DataSourcePreparer and job preparer
(#20824)
* Add PipelineTaskConfiguration
* Rename TaskConfiguration to MigrationTaskConfiguration
* Decouple InventoryTaskSplitter with MigrationTaskConfiguration
* Add CreateTableConfiguration; Refactor DataSourcePreparer method
parameters and job preparer
* Unit test
---
.../infra/database/type/DatabaseTypeEngine.java | 10 +++
.../api/config/CreateTableConfiguration.java} | 38 +++++-----
...uration.java => PipelineTaskConfiguration.java} | 16 +---
.../{TableName.java => IdentifierName.java} | 17 ++---
.../SchemaName.java} | 22 +++---
.../SchemaTableName.java} | 16 ++--
.../data/pipeline/api/metadata/TableName.java | 40 +---------
.../data/pipeline/core/api/PipelineJobAPI.java | 4 +-
.../DefaultPipelineDataSourceManager.java | 1 +
.../metadata/generator/PipelineDDLGenerator.java | 44 +++++------
.../core/prepare/InventoryTaskSplitter.java | 20 ++---
.../core/prepare/PipelineJobPreparerUtils.java | 20 ++++-
.../datasource/AbstractDataSourcePreparer.java | 86 +++++++++-------------
.../prepare/datasource/DataSourcePreparer.java | 3 +-
.../datasource/PrepareTargetSchemasParameter.java | 13 +---
.../datasource/PrepareTargetTablesParameter.java | 33 +--------
.../pipeline/scenario/migration/MigrationJob.java | 3 +-
.../scenario/migration/MigrationJobAPI.java | 4 +
.../scenario/migration/MigrationJobAPIImpl.java | 42 +++++++----
.../migration/MigrationJobItemContext.java | 5 +-
.../scenario/migration/MigrationJobPreparer.java | 45 +++--------
.../migration/MigrationTaskConfiguration.java} | 11 ++-
.../core/fixture/MigrationJobAPIFixture.java | 4 +-
.../datasource/MySQLDataSourcePreparer.java | 13 ++--
.../datasource/MySQLDataSourcePreparerTest.java | 1 -
.../datasource/OpenGaussDataSourcePreparer.java | 17 ++---
.../datasource/PostgreSQLDataSourcePreparer.java | 17 ++---
.../api/impl/GovernanceRepositoryAPIImplTest.java | 8 +-
.../core/prepare/InventoryTaskSplitterTest.java | 9 ++-
.../pipeline/core/task/IncrementalTaskTest.java | 4 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 4 +-
.../pipeline/core/util/PipelineContextUtil.java | 4 +-
32 files changed, 252 insertions(+), 322 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index 08872c7124a..8cf56eb08d0 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -154,4 +154,14 @@ public final class DatabaseTypeEngine {
public static String getDefaultSchemaName(final DatabaseType databaseType,
final String databaseName) {
return databaseType instanceof SchemaSupportedDatabaseType ?
((SchemaSupportedDatabaseType) databaseType).getDefaultSchema() :
databaseName.toLowerCase();
}
+
+ /**
+ * Get default schema name.
+ *
+ * @param databaseType database type
+ * @return default schema name
+ */
+ public static Optional<String> getDefaultSchemaName(final DatabaseType
databaseType) {
+ return databaseType instanceof SchemaSupportedDatabaseType ?
Optional.of(((SchemaSupportedDatabaseType) databaseType).getDefaultSchema()) :
Optional.empty();
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
similarity index 55%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
index df94812d018..fd69f7be447 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/CreateTableConfiguration.java
@@ -15,33 +15,37 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+package org.apache.shardingsphere.data.pipeline.api.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
-import java.util.List;
+import java.util.Collection;
/**
- * Prepare target schemas parameter.
+ * Create table configuration.
*/
@RequiredArgsConstructor
@Getter
-public final class PrepareTargetSchemasParameter {
+@ToString
+public final class CreateTableConfiguration {
- private final List<String> logicTableNames;
+ private final Collection<CreateTableEntry> createTableEntries;
- private final DatabaseType targetDatabaseType;
-
- private final String databaseName;
-
- private final PipelineDataSourceConfiguration dataSourceConfig;
-
- private final PipelineDataSourceManager dataSourceManager;
-
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ @RequiredArgsConstructor
+ @Getter
+ @ToString(exclude = {"sourceDataSourceConfig", "targetDataSourceConfig"})
+ public static final class CreateTableEntry {
+
+ private final PipelineDataSourceConfiguration sourceDataSourceConfig;
+
+ private final SchemaTableName sourceName;
+
+ private final PipelineDataSourceConfiguration targetDataSourceConfig;
+
+ private final SchemaTableName targetName;
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
similarity index 68%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
index 67c7187ce24..29af6e31597 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/PipelineTaskConfiguration.java
@@ -17,20 +17,8 @@
package org.apache.shardingsphere.data.pipeline.api.config;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-
/**
- * Task configuration.
+ * Pipeline task configuration.
*/
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
-
- private final DumperConfiguration dumperConfig;
-
- private final ImporterConfiguration importerConfig;
+public interface PipelineTaskConfiguration {
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
similarity index 81%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
index e84c4847807..f67c76ab003 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/IdentifierName.java
@@ -18,27 +18,24 @@
package org.apache.shardingsphere.data.pipeline.api.metadata;
import lombok.Getter;
-import lombok.NonNull;
import java.util.Objects;
/**
- * Table name.
- * <p>It might be logic table name or actual table name.</p>
+ * Identifier name.
+ * <p>It might be schema name or table name, etc.</p>
* <p>It's case-insensitive.</p>
*/
@Getter
-public class TableName {
+public class IdentifierName {
- @NonNull
private final String original;
- @NonNull
private final String lowercase;
- public TableName(final String tableName) {
- this.original = tableName;
- this.lowercase = tableName.toLowerCase();
+ public IdentifierName(final String identifierName) {
+ this.original = identifierName;
+ this.lowercase = null != identifierName ? identifierName.toLowerCase()
: null;
}
// TODO table name case-sensitive for some database
@@ -50,7 +47,7 @@ public class TableName {
if (o == null || getClass() != o.getClass()) {
return false;
}
- final TableName tableName = (TableName) o;
+ final IdentifierName tableName = (IdentifierName) o;
return lowercase.equals(tableName.lowercase);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
similarity index 64%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
index 67c7187ce24..a9195f2020a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaName.java
@@ -15,22 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import javax.annotation.Nullable;
/**
- * Task configuration.
+ * Schema name.
+ * <p>It might be null.</p>
+ * <p>It's case-insensitive.</p>
*/
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
+public class SchemaName extends IdentifierName {
- private final DumperConfiguration dumperConfig;
-
- private final ImporterConfiguration importerConfig;
+ public SchemaName(@Nullable final String schemaName) {
+ super(schemaName);
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
similarity index 75%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
index 67c7187ce24..f3f7dc7fbd4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
@@ -15,22 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.api.metadata;
import lombok.Getter;
+import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
/**
- * Task configuration.
+ * Schema name and table name.
*/
-@Getter
@RequiredArgsConstructor
+@Getter
@ToString
-public final class TaskConfiguration {
+public class SchemaTableName {
- private final DumperConfiguration dumperConfig;
+ @NonNull
+ private final SchemaName schemaName;
- private final ImporterConfiguration importerConfig;
+ @NonNull
+ private final TableName tableName;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
index e84c4847807..b48332797dd 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/TableName.java
@@ -17,50 +17,16 @@
package org.apache.shardingsphere.data.pipeline.api.metadata;
-import lombok.Getter;
import lombok.NonNull;
-import java.util.Objects;
-
/**
* Table name.
* <p>It might be logic table name or actual table name.</p>
* <p>It's case-insensitive.</p>
*/
-@Getter
-public class TableName {
-
- @NonNull
- private final String original;
-
- @NonNull
- private final String lowercase;
-
- public TableName(final String tableName) {
- this.original = tableName;
- this.lowercase = tableName.toLowerCase();
- }
-
- // TODO table name case-sensitive for some database
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final TableName tableName = (TableName) o;
- return lowercase.equals(tableName.lowercase);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(lowercase);
- }
+public class TableName extends IdentifierName {
- @Override
- public String toString() {
- return original;
+ public TableName(@NonNull final String tableName) {
+ super(tableName);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index e083d70efcc..06b4c336435 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
@@ -58,7 +58,7 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI,
PipelineJobItemAPI
* @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
- TaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
+ PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
/**
* Build pipeline process context.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
index 66ad7498d3c..ce2a9cbbf33 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
@@ -56,6 +56,7 @@ public final class DefaultPipelineDataSourceManager
implements PipelineDataSourc
}
}
+ // TODO monitor each DataSource close
/**
* Close, close cached data source.
*/
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index ecff9462e74..1df7d29a532 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -64,24 +64,24 @@ public final class PipelineDDLGenerator {
* @param databaseType database type
* @param sourceDataSource source data source
* @param schemaName schema name
- * @param logicTableName table name
- * @param actualTableName actual table name
+ * @param sourceTableName source table name
+ * @param targetTableName target table name
* @param parserEngine parser engine
- * @return DDL
+ * @return DDL SQL
* @throws SQLException SQL exception
*/
public String generateLogicDDL(final DatabaseType databaseType, final
DataSource sourceDataSource,
- final String schemaName, final String
logicTableName, final String actualTableName, final
ShardingSphereSQLParserEngine parserEngine) throws SQLException {
- log.info("generateLogicDDLSQL, databaseType={}, schemaName={},
tableName={}", databaseType.getType(), schemaName, logicTableName);
+ final String schemaName, final String
sourceTableName, final String targetTableName, final
ShardingSphereSQLParserEngine parserEngine) throws SQLException {
+ log.info("generateLogicDDLSQL, databaseType={}, schemaName={},
sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName,
sourceTableName, targetTableName);
StringBuilder result = new StringBuilder();
- for (String each :
CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource,
schemaName, actualTableName)) {
- Optional<String> queryContext = decorate(databaseType,
sourceDataSource, schemaName, logicTableName, parserEngine, each);
+ for (String each :
CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource,
schemaName, sourceTableName)) {
+ Optional<String> queryContext = decorate(databaseType,
sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(ddlSQL ->
result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
return result.toString();
}
- private Optional<String> decorate(final DatabaseType databaseType, final
DataSource dataSource, final String schemaName, final String logicTableName,
+ private Optional<String> decorate(final DatabaseType databaseType, final
DataSource dataSource, final String schemaName, final String targetTableName,
final ShardingSphereSQLParserEngine
parserEngine, final String sql) throws SQLException {
if (sql.trim().isEmpty()) {
return Optional.empty();
@@ -90,7 +90,7 @@ public final class PipelineDDLGenerator {
try (Connection connection = dataSource.getConnection()) {
databaseName = connection.getCatalog();
}
- String result = decorateActualSQL(databaseName, logicTableName,
parserEngine, sql.trim());
+ String result = decorateActualSQL(databaseName, targetTableName,
parserEngine, sql.trim());
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
return decorateOpenGauss(databaseName, schemaName, result,
parserEngine);
@@ -98,24 +98,24 @@ public final class PipelineDDLGenerator {
return Optional.of(result);
}
- private String decorateActualSQL(final String databaseName, final String
logicTableName, final ShardingSphereSQLParserEngine parserEngine, final String
sql) {
+ private String decorateActualSQL(final String databaseName, final String
targetTableName, final ShardingSphereSQLParserEngine parserEngine, final String
sql) {
QueryContext queryContext = getQueryContext(databaseName,
parserEngine, sql);
SQLStatementContext<?> sqlStatementContext =
queryContext.getSqlStatementContext();
Map<SQLSegment, String> replaceMap = new
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
if (sqlStatementContext instanceof CreateTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, logicTableName,
sqlStatementContext);
- appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, targetTableName,
sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
}
if (sqlStatementContext instanceof CommentStatementContext) {
- appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
}
if (sqlStatementContext instanceof CreateIndexStatementContext) {
- appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
- appendFromIndexAndConstraint(replaceMap, logicTableName,
sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, targetTableName,
sqlStatementContext);
}
if (sqlStatementContext instanceof AlterTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, logicTableName,
sqlStatementContext);
- appendFromTable(replaceMap, logicTableName, (TableAvailable)
sqlStatementContext);
+ appendFromIndexAndConstraint(replaceMap, targetTableName,
sqlStatementContext);
+ appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
}
return doDecorateActualTable(replaceMap, sql);
}
@@ -125,12 +125,12 @@ public final class PipelineDDLGenerator {
return new QueryContext(sqlStatementContext, sql,
Collections.emptyList());
}
- private void appendFromIndexAndConstraint(final Map<SQLSegment, String>
replaceMap, final String logicTableName, final SQLStatementContext<?>
sqlStatementContext) {
+ private void appendFromIndexAndConstraint(final Map<SQLSegment, String>
replaceMap, final String targetTableName, final SQLStatementContext<?>
sqlStatementContext) {
if (!(sqlStatementContext instanceof TableAvailable) ||
((TableAvailable)
sqlStatementContext).getTablesContext().getTables().isEmpty()) {
return;
}
TableNameSegment tableNameSegment = ((TableAvailable)
sqlStatementContext).getTablesContext().getTables().iterator().next().getTableName();
- if
(!tableNameSegment.getIdentifier().getValue().equals(logicTableName)) {
+ if
(!tableNameSegment.getIdentifier().getValue().equals(targetTableName)) {
if (sqlStatementContext instanceof IndexAvailable) {
for (IndexSegment each : ((IndexAvailable)
sqlStatementContext).getIndexes()) {
String logicIndexName =
IndexMetaDataUtil.getLogicIndexName(each.getIndexName().getIdentifier().getValue(),
tableNameSegment.getIdentifier().getValue());
@@ -146,10 +146,10 @@ public final class PipelineDDLGenerator {
}
}
- private void appendFromTable(final Map<SQLSegment, String> replaceMap,
final String logicTableName, final TableAvailable sqlStatementContext) {
+ private void appendFromTable(final Map<SQLSegment, String> replaceMap,
final String targetTableName, final TableAvailable sqlStatementContext) {
for (SimpleTableSegment each : sqlStatementContext.getAllTables()) {
- if
(!logicTableName.equals(each.getTableName().getIdentifier().getValue())) {
- replaceMap.put(each.getTableName(), logicTableName);
+ if
(!targetTableName.equals(each.getTableName().getIdentifier().getValue())) {
+ replaceMap.put(each.getTableName(), targetTableName);
}
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index f2e3a02064b..e10415451c0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
@@ -66,17 +66,19 @@ import java.util.List;
@RequiredArgsConstructor
public final class InventoryTaskSplitter {
- private final PipelineTableMetaDataLoader metaDataLoader;
+ private final PipelineDataSourceWrapper sourceDataSource;
- private final PipelineDataSourceManager dataSourceManager;
+ private final DumperConfiguration dumperConfig;
- private final ExecuteEngine importerExecuteEngine;
+ private final ImporterConfiguration importerConfig;
- private final PipelineDataSourceWrapper sourceDataSource;
+ private final InventoryIncrementalJobItemProgress initProgress;
- private final TaskConfiguration taskConfig;
+ private final PipelineTableMetaDataLoader metaDataLoader;
- private final InventoryIncrementalJobItemProgress initProgress;
+ private final PipelineDataSourceManager dataSourceManager;
+
+ private final ExecuteEngine importerExecuteEngine;
/**
* Split inventory data to multi-tasks.
@@ -88,8 +90,8 @@ public final class InventoryTaskSplitter {
List<InventoryTask> result = new LinkedList<>();
PipelineChannelCreator pipelineChannelCreator =
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
DefaultPipelineJobProgressListener jobProgressListener = new
DefaultPipelineJobProgressListener(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- for (InventoryDumperConfiguration each :
splitDumperConfig(jobItemContext, taskConfig.getDumperConfig())) {
- result.add(new InventoryTask(each, taskConfig.getImporterConfig(),
pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader,
importerExecuteEngine,
+ for (InventoryDumperConfiguration each :
splitDumperConfig(jobItemContext, dumperConfig)) {
+ result.add(new InventoryTask(each, importerConfig,
pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader,
importerExecuteEngine,
jobProgressListener));
}
return result;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 096b8851510..0a080946ad9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.check.datasource.DataSourceCheckerFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
@@ -38,7 +39,11 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionIniti
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
import javax.sql.DataSource;
import java.sql.SQLException;
@@ -72,8 +77,9 @@ public final class PipelineJobPreparerUtils {
*
* @param databaseType database type
* @param prepareTargetSchemasParameter prepare target schemas parameter
+ * @throws SQLException if prepare target schema fail
*/
- public static void prepareTargetSchema(final String databaseType, final
PrepareTargetSchemasParameter prepareTargetSchemasParameter) {
+ public static void prepareTargetSchema(final String databaseType, final
PrepareTargetSchemasParameter prepareTargetSchemasParameter) throws
SQLException {
Optional<DataSourcePreparer> dataSourcePreparer =
DataSourcePreparerFactory.getInstance(databaseType);
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
@@ -82,6 +88,18 @@ public final class PipelineJobPreparerUtils {
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
}
+ /**
+ * Get SQL parser engine.
+ *
+ * @param targetDatabaseName target database name
+ * @return SQL parser engine
+ */
+ public static ShardingSphereSQLParserEngine getSQLParserEngine(final
String targetDatabaseName) {
+ ShardingSphereMetaData metaData =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
+ ShardingSphereDatabase database =
metaData.getDatabases().get(targetDatabaseName);
+ return
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType().getType());
+ }
+
/**
* Prepare target tables.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index c0172f9c51f..9dcf18e2a8d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,29 +17,24 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
-import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
@@ -56,43 +51,38 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple
primary keys for table", "already exists"};
@Override
- public void prepareTargetSchemas(final PrepareTargetSchemasParameter
parameter) {
- Set<String> schemaNames = getSchemaNames(parameter);
- String defaultSchema =
DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(),
parameter.getDatabaseName());
- log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}",
schemaNames, defaultSchema);
- PipelineSQLBuilder pipelineSQLBuilder =
PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
- try (Connection targetConnection =
getCachedDataSource(parameter.getDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
- for (String each : schemaNames) {
- if (each.equalsIgnoreCase(defaultSchema)) {
- continue;
- }
- String sql = pipelineSQLBuilder.buildCreateSchemaSQL(each);
+ public void prepareTargetSchemas(final PrepareTargetSchemasParameter
parameter) throws SQLException {
+ DatabaseType targetDatabaseType = parameter.getTargetDatabaseType();
+ if (!targetDatabaseType.isSchemaAvailable()) {
+ log.info("prepareTargetSchemas, target database does not support
schema, ignore, targetDatabaseType={}", targetDatabaseType);
+ return;
+ }
+ CreateTableConfiguration createTableConfig =
parameter.getCreateTableConfig();
+ String defaultSchema =
DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
+ PipelineSQLBuilder pipelineSQLBuilder =
PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
+ Set<String> createdSchemaNames = new HashSet<>();
+ for (CreateTableEntry each :
createTableConfig.getCreateTableEntries()) {
+ String targetSchemaName =
each.getTargetName().getSchemaName().getOriginal();
+ if (null == targetSchemaName) {
+ continue;
+ }
+ if (targetSchemaName.equalsIgnoreCase(defaultSchema)) {
+ continue;
+ }
+ if (createdSchemaNames.contains(targetSchemaName)) {
+ continue;
+ }
+ try (Connection targetConnection =
getCachedDataSource(each.getTargetDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
+ String sql =
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
log.info("prepareTargetSchemas, sql={}", sql);
try (Statement statement = targetConnection.createStatement())
{
statement.execute(sql);
+ createdSchemaNames.add(targetSchemaName);
} catch (final SQLException ignored) {
}
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("Can not get
connection.", ex);
}
- }
-
- private Set<String> getSchemaNames(final PrepareTargetSchemasParameter
parameter) {
- Set<String> result = new HashSet<>();
- for (String each : parameter.getLogicTableNames()) {
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each);
- if (null == schemaName) {
- throw new PipelineJobPrepareFailedException("Can not get
schemaName by logic table name " + each);
- }
- result.add(schemaName);
- }
- return result;
- }
-
- // TODO the invocation is disabled for now, it might be used again for
next new feature
- protected final PipelineDataSourceWrapper getSourceCachedDataSource(final
MigrationJobConfiguration jobConfig, final PipelineDataSourceManager
dataSourceManager) {
- return
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter()));
+ log.info("prepareTargetSchemas, createdSchemaNames={},
defaultSchema={}", createdSchemaNames, defaultSchema);
}
protected final PipelineDataSourceWrapper getCachedDataSource(final
PipelineDataSourceConfiguration dataSourceConfig, final
PipelineDataSourceManager dataSourceManager) {
@@ -121,18 +111,14 @@ public abstract class AbstractDataSourcePreparer
implements DataSourcePreparer {
return
PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT
EXISTS ");
}
- protected final List<String> listCreateLogicalTableSQL(final
PrepareTargetTablesParameter parameter) throws SQLException {
+ protected final String getCreateTargetTableSQL(final CreateTableEntry
createTableEntry, final PipelineDataSourceManager dataSourceManager,
+ final
ShardingSphereSQLParserEngine sqlParserEngine) throws SQLException {
+ DatabaseType databaseType =
createTableEntry.getSourceDataSourceConfig().getDatabaseType();
+ DataSource sourceDataSource =
dataSourceManager.getDataSource(createTableEntry.getSourceDataSourceConfig());
+ String schemaName =
createTableEntry.getSourceName().getSchemaName().getOriginal();
+ String sourceTableName =
createTableEntry.getSourceName().getTableName().getOriginal();
+ String targetTableName =
createTableEntry.getTargetName().getTableName().getOriginal();
PipelineDDLGenerator generator = new PipelineDDLGenerator();
- List<String> result = new LinkedList<>();
- for (JobDataNodeEntry each :
parameter.getTablesFirstDataNodes().getEntries()) {
- String dataSourceName =
each.getDataNodes().get(0).getDataSourceName();
- DataSource dataSource =
parameter.getSourceDataSourceMap().get(dataSourceName);
- DatabaseType databaseType =
DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
- String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- String actualTableName = each.getDataNodes().get(0).getTableName();
- Preconditions.checkNotNull(actualTableName, "Could not get
actualTableName, nodeEntry={}", each);
- result.add(generator.generateLogicDDL(databaseType, dataSource,
schemaName, each.getLogicTableName(), actualTableName,
parameter.getSqlParserEngine()));
- }
- return result;
+ return generator.generateLogicDDL(databaseType, sourceDataSource,
schemaName, sourceTableName, targetTableName, sqlParserEngine);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
index e4ef48ab384..501765108aa 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
@@ -32,8 +32,9 @@ public interface DataSourcePreparer extends TypedSPI {
* Prepare target schemas.
*
* @param parameter prepare target schemas parameter
+ * @throws SQLException if prepare target schema fail
*/
- void prepareTargetSchemas(PrepareTargetSchemasParameter parameter);
+ void prepareTargetSchemas(PrepareTargetSchemasParameter parameter) throws
SQLException;
/**
* Prepare target tables.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index df94812d018..6f70d1c8b49 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -19,13 +19,10 @@ package
org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import java.util.List;
-
/**
* Prepare target schemas parameter.
*/
@@ -33,15 +30,9 @@ import java.util.List;
@Getter
public final class PrepareTargetSchemasParameter {
- private final List<String> logicTableNames;
-
private final DatabaseType targetDatabaseType;
- private final String databaseName;
-
- private final PipelineDataSourceConfiguration dataSourceConfig;
+ private final CreateTableConfiguration createTableConfig;
private final PipelineDataSourceManager dataSourceManager;
-
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index c9f1e8fc764..65fec5e55ef 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -18,46 +18,21 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.Getter;
-import lombok.NonNull;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import javax.sql.DataSource;
-import java.util.Map;
-
/**
* Prepare target tables parameter.
*/
+@RequiredArgsConstructor
@Getter
public final class PrepareTargetTablesParameter {
- private final String databaseName;
-
- private final JobDataNodeLine tablesFirstDataNodes;
-
- private final PipelineDataSourceConfiguration targetDataSourceConfig;
-
- private final Map<String, DataSource> sourceDataSourceMap;
+ private final CreateTableConfiguration createTableConfig;
private final PipelineDataSourceManager dataSourceManager;
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
-
private final ShardingSphereSQLParserEngine sqlParserEngine;
-
- public PrepareTargetTablesParameter(@NonNull final String databaseName,
@NonNull final PipelineDataSourceConfiguration targetDataSourceConfig,
- @NonNull final Map<String, DataSource>
sourceDataSourceMap, @NonNull final PipelineDataSourceManager dataSourceManager,
- @NonNull final JobDataNodeLine
tablesFirstDataNodes, final TableNameSchemaNameMapping
tableNameSchemaNameMapping,
- @NonNull final
ShardingSphereSQLParserEngine sqlParserEngine) {
- this.databaseName = databaseName;
- this.targetDataSourceConfig = targetDataSourceConfig;
- this.sourceDataSourceMap = sourceDataSourceMap;
- this.tablesFirstDataNodes = tablesFirstDataNodes;
- this.dataSourceManager = dataSourceManager;
- this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
- this.sqlParserEngine = sqlParserEngine;
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 2dd261b8420..e1737a49eef 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -68,7 +67,7 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
MigrationJobConfiguration jobConfig =
YamlMigrationJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
InventoryIncrementalJobItemProgress initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
- TaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ MigrationTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
MigrationJobItemContext jobItemContext = new
MigrationJobItemContext(jobConfig, shardingItem, initProgress,
jobProcessContext, taskConfig, dataSourceManager);
if (getTasksRunnerMap().containsKey(shardingItem)) {
log.warn("tasksRunnerMap contains shardingItem {}, ignore",
shardingItem);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 1a3d902efe2..383cf7e9581 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfig
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -37,6 +38,9 @@ public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI,
@Override
MigrationJobConfiguration getJobConfiguration(String jobId);
+ @Override
+ MigrationTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
+
@Override
MigrationProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index bd898dc5d9a..4be428d86ed 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -23,9 +23,10 @@ import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
@@ -48,6 +49,9 @@ import
org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
@@ -191,22 +195,32 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
}
@Override
- public TaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
+ public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()),
new LogicTableName(jobConfig.getTargetTableName()));
Map<LogicTableName, String> tableNameSchemaMap =
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
Collections.singletonList(jobConfig.getTargetTableName()));
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(tableNameSchemaMap);
- DumperConfiguration dumperConfig =
createDumperConfiguration(jobConfig.getJobId(),
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap,
tableNameSchemaNameMapping);
+ CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig);
+ DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig.getJobId(),
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap,
tableNameSchemaNameMapping);
// TODO now shardingColumnsMap always empty,
- ImporterConfiguration importerConfig =
createImporterConfiguration(jobConfig, pipelineProcessConfig,
Collections.emptyMap(), tableNameSchemaNameMapping);
- TaskConfiguration result = new TaskConfiguration(dumperConfig,
importerConfig);
- log.info("createTaskConfiguration, sourceResourceName={}, result={}",
jobConfig.getSourceResourceName(), result);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
Collections.emptyMap(), tableNameSchemaNameMapping);
+ MigrationTaskConfiguration result = new
MigrationTaskConfiguration(createTableConfig, dumperConfig, importerConfig);
+ log.info("buildTaskConfiguration, sourceResourceName={}, result={}",
jobConfig.getSourceResourceName(), result);
return result;
}
- private static DumperConfiguration createDumperConfiguration(final String
jobId, final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ private CreateTableConfiguration buildCreateTableConfiguration(final
MigrationJobConfiguration jobConfig) {
+ String sourceSchemaName = jobConfig.getSourceSchemaName();
+ String targetSchemaName =
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()).isSchemaAvailable()
? sourceSchemaName : null;
+ CreateTableEntry createTableEntry = new CreateTableEntry(
+ jobConfig.getSource(), new SchemaTableName(new
SchemaName(sourceSchemaName), new TableName(jobConfig.getSourceTableName())),
+ jobConfig.getTarget(), new SchemaTableName(new
SchemaName(targetSchemaName), new TableName(jobConfig.getTargetTableName())));
+ return new
CreateTableConfiguration(Collections.singletonList(createTableEntry));
+ }
+
+ private DumperConfiguration buildDumperConfiguration(final String jobId,
final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
+ final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
DumperConfiguration result = new DumperConfiguration();
result.setJobId(jobId);
result.setDataSourceName(dataSourceName);
@@ -216,18 +230,17 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
return result;
}
- private static ImporterConfiguration createImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
+ private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
+ final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
PipelineProcessContext migrationProcessContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
- return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
migrationProcessContext.getWriteRateLimitAlgorithm(),
+ return new ImporterConfiguration(jobConfig.getTarget(),
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
migrationProcessContext.getWriteRateLimitAlgorithm(),
retryTimes, concurrency);
}
- private static Map<LogicTableName, Set<String>> unmodifiable(final
Map<LogicTableName, Set<String>> shardingColumnsMap) {
+ private Map<LogicTableName, Set<String>> unmodifiable(final
Map<LogicTableName, Set<String>> shardingColumnsMap) {
Map<LogicTableName, Set<String>> result = new
HashMap<>(shardingColumnsMap.size());
for (Entry<LogicTableName, Set<String>> entry :
shardingColumnsMap.entrySet()) {
result.put(entry.getKey(),
Collections.unmodifiableSet(entry.getValue()));
@@ -343,10 +356,9 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
String targetTableName = jobConfig.getTargetTableName();
// TODO use jobConfig.targetSchemaName
String targetSchemaName = jobConfig.getSourceSchemaName();
- PipelineDataSourceConfiguration target = jobConfig.getTarget();
PipelineSQLBuilder pipelineSQLBuilder =
PipelineSQLBuilderFactory.getInstance(jobConfig.getTargetDatabaseType());
try (
- PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(),
target.getParameter()));
+ PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
String sql = pipelineSQLBuilder.buildTruncateSQL(targetSchemaName,
targetTableName);
log.info("cleanTempTableOnRollback, targetSchemaName={},
targetTableName={}, sql={}", targetSchemaName, targetTableName, sql);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index 9a72f14c16c..2904932b9e0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -23,7 +23,6 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -56,7 +55,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
private final InventoryIncrementalJobItemProgress initProgress;
- private final TaskConfiguration taskConfig;
+ private final MigrationTaskConfiguration taskConfig;
private final Collection<InventoryTask> inventoryTasks = new
LinkedList<>();
@@ -85,7 +84,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
};
public MigrationJobItemContext(final MigrationJobConfiguration jobConfig,
final int shardingItem, final InventoryIncrementalJobItemProgress initProgress,
- final MigrationProcessContext
jobProcessContext, final TaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager) {
+ final MigrationProcessContext
jobProcessContext, final MigrationTaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index c02cd51ea0f..63a94777d93 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -18,11 +18,8 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -43,17 +40,11 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
-import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
/**
* Migration job preparer.
@@ -141,41 +132,27 @@ public final class MigrationJobPreparer {
private void prepareTarget(final MigrationJobItemContext jobItemContext)
throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
- TableNameSchemaNameMapping tableNameSchemaNameMapping =
jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
String targetDatabaseType = jobConfig.getTargetDatabaseType();
- if (isTargetSchemaAvailable(jobConfig) &&
StringUtils.isNotBlank(jobConfig.getSourceSchemaName())) {
- PrepareTargetSchemasParameter prepareTargetSchemasParameter = new
PrepareTargetSchemasParameter(Collections.singletonList(jobConfig.getTargetTableName()),
- DatabaseTypeFactory.getInstance(targetDatabaseType),
jobConfig.getTargetDatabaseName(),
-
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
jobItemContext.getDataSourceManager(), tableNameSchemaNameMapping);
- PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType,
prepareTargetSchemasParameter);
- }
- ShardingSphereMetaData metaData =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
- ShardingSphereDatabase sphereDatabase =
metaData.getDatabases().get(jobConfig.getTargetDatabaseName());
- ShardingSphereSQLParserEngine sqlParserEngine =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
- JobDataNodeLine jobDataNodeLine =
JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
- PipelineDataSourceWrapper dataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
- Map<String, DataSource> sourceDataSourceMap = new HashMap<>(1, 1.0F);
- sourceDataSourceMap.put(jobConfig.getSourceResourceName(), dataSource);
- PrepareTargetTablesParameter prepareTargetTablesParameter = new
PrepareTargetTablesParameter(jobConfig.getTargetDatabaseName(),
-
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
sourceDataSourceMap, jobItemContext.getDataSourceManager(),
- jobDataNodeLine, tableNameSchemaNameMapping, sqlParserEngine);
+ CreateTableConfiguration createTableConfig =
jobItemContext.getTaskConfig().getCreateTableConfig();
+ PrepareTargetSchemasParameter prepareTargetSchemasParameter = new
PrepareTargetSchemasParameter(
+ DatabaseTypeFactory.getInstance(targetDatabaseType),
createTableConfig, jobItemContext.getDataSourceManager());
+ PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType,
prepareTargetSchemasParameter);
+ ShardingSphereSQLParserEngine sqlParserEngine =
PipelineJobPreparerUtils.getSQLParserEngine(jobConfig.getTargetDatabaseName());
+ PrepareTargetTablesParameter prepareTargetTablesParameter = new
PrepareTargetTablesParameter(createTableConfig,
jobItemContext.getDataSourceManager(), sqlParserEngine);
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType,
prepareTargetTablesParameter);
}
- private boolean isTargetSchemaAvailable(final MigrationJobConfiguration
jobConfig) {
- return
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()).isSchemaAvailable();
- }
-
private void initInventoryTasks(final MigrationJobItemContext
jobItemContext) {
- InventoryTaskSplitter inventoryTaskSplitter = new
InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager(),
-
jobItemContext.getJobProcessContext().getImporterExecuteEngine(),
jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(),
jobItemContext.getInitProgress());
+ InventoryTaskSplitter inventoryTaskSplitter = new
InventoryTaskSplitter(
+ jobItemContext.getSourceDataSource(),
jobItemContext.getTaskConfig().getDumperConfig(),
jobItemContext.getTaskConfig().getImporterConfig(),
jobItemContext.getInitProgress(),
+ jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager(),
jobItemContext.getJobProcessContext().getImporterExecuteEngine());
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
}
private void initIncrementalTasks(final MigrationJobItemContext
jobItemContext) throws SQLException {
PipelineChannelCreator pipelineChannelCreator =
jobItemContext.getJobProcessContext().getPipelineChannelCreator();
ExecuteEngine incrementalDumperExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
- TaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+ MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
JobItemIncrementalTasksProgress initIncremental =
jobItemContext.getInitProgress() == null ? null :
jobItemContext.getInitProgress().getIncremental();
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperConfig(), dataSourceManager));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
similarity index 69%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
index 67c7187ce24..52672769aab 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TaskConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
@@ -15,20 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
/**
- * Task configuration.
+ * Migration task configuration.
*/
@Getter
@RequiredArgsConstructor
@ToString
-public final class TaskConfiguration {
+public final class MigrationTaskConfiguration implements
PipelineTaskConfiguration {
+
+ private final CreateTableConfiguration createTableConfig;
private final DumperConfiguration dumperConfig;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index d68a2eb44f0..9608049f057 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
@@ -31,6 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
@@ -153,7 +153,7 @@ public final class MigrationJobAPIFixture implements
MigrationJobAPI {
}
@Override
- public TaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
+ public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
return null;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index ceccccf1742..9b7f0d2cde0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
@@ -33,14 +33,13 @@ import java.sql.SQLException;
public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
- public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
+ public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) throws SQLException {
PipelineDataSourceManager dataSourceManager =
parameter.getDataSourceManager();
- try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
dataSourceManager).getConnection()) {
- for (String each : listCreateLogicalTableSQL(parameter)) {
- executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(each));
+ for (CreateTableEntry each :
parameter.getCreateTableConfig().getCreateTableEntries()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, parameter.getSqlParserEngine());
+ try (Connection targetConnection =
getCachedDataSource(each.getTargetDataSourceConfig(),
dataSourceManager).getConnection()) {
+ executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(createTargetTableSQL));
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("prepare target tables
failed.", ex);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 0112176b321..7884a10dbb7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -83,7 +83,6 @@ public final class MySQLDataSourcePreparerTest {
when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
when(jobConfig.getTarget().getParameter()).thenReturn("target");
-
when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
}
@Test
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index d58133902b0..78b6fef6559 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -20,13 +20,13 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
import java.util.stream.Collectors;
/**
@@ -37,15 +37,14 @@ public final class OpenGaussDataSourcePreparer extends
AbstractDataSourcePrepare
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) throws SQLException {
- List<String> createLogicalTableSQLs =
listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
- for (String createLogicalTableSQL : createLogicalTableSQLs) {
- for (String each :
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
{
- executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(each));
+ PipelineDataSourceManager dataSourceManager =
parameter.getDataSourceManager();
+ for (CreateTableEntry each :
parameter.getCreateTableConfig().getCreateTableEntries()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, parameter.getSqlParserEngine());
+ try (Connection targetConnection =
getCachedDataSource(each.getTargetDataSourceConfig(),
dataSourceManager).getConnection()) {
+ for (String sql :
Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
{
+ executeTargetTableSQL(targetConnection, sql);
}
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("prepare target tables
failed.", ex);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource
[...]
index 8f75e968183..aa30e1c380c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -20,13 +20,13 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
import java.util.stream.Collectors;
/**
@@ -37,15 +37,14 @@ public final class PostgreSQLDataSourcePreparer extends
AbstractDataSourcePrepar
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) throws SQLException {
- List<String> createLogicalTableSQLs =
listCreateLogicalTableSQL(parameter);
- try (Connection targetConnection =
getCachedDataSource(parameter.getTargetDataSourceConfig(),
parameter.getDataSourceManager()).getConnection()) {
- for (String createLogicalTableSQL : createLogicalTableSQLs) {
- for (String each :
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
{
- executeTargetTableSQL(targetConnection, each);
+ PipelineDataSourceManager dataSourceManager =
parameter.getDataSourceManager();
+ for (CreateTableEntry each :
parameter.getCreateTableConfig().getCreateTableEntries()) {
+ String createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, parameter.getSqlParserEngine());
+ try (Connection targetConnection =
getCachedDataSource(each.getTargetDataSourceConfig(),
dataSourceManager).getConnection()) {
+ for (String sql :
Splitter.on(";").splitToList(createTargetTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
{
+ executeTargetTableSQL(targetConnection, sql);
}
}
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("prepare target tables
failed.", ex);
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f64fde18e31..9141e76717b 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.api.impl;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -34,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.BeforeClass;
@@ -127,13 +127,13 @@ public final class GovernanceRepositoryAPIImplTest {
private MigrationJobItemContext mockJobItemContext() {
MigrationJobItemContext result =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
- TaskConfiguration taskConfig = result.getTaskConfig();
+ MigrationTaskConfiguration taskConfig = result.getTaskConfig();
result.getInventoryTasks().add(mockInventoryTask(taskConfig));
result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
return result;
}
- private InventoryTask mockInventoryTask(final TaskConfiguration
taskConfig) {
+ private InventoryTask mockInventoryTask(final MigrationTaskConfiguration
taskConfig) {
InventoryDumperConfiguration dumperConfig = new
InventoryDumperConfiguration(taskConfig.getDumperConfig());
dumperConfig.setPosition(new PlaceholderPosition());
dumperConfig.setActualTableName("t_order");
@@ -147,7 +147,7 @@ public final class GovernanceRepositoryAPIImplTest {
new DefaultPipelineDataSourceManager(), dataSource,
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener());
}
- private IncrementalTask mockIncrementalTask(final TaskConfiguration
taskConfig) {
+ private IncrementalTask mockIncrementalTask(final
MigrationTaskConfiguration taskConfig) {
DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
dumperConfig.setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 27f35e6442b..55337499309 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.prepare;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -27,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -45,7 +45,7 @@ public final class InventoryTaskSplitterTest {
private MigrationJobItemContext jobItemContext;
- private TaskConfiguration taskConfig;
+ private MigrationTaskConfiguration taskConfig;
private PipelineDataSourceManager dataSourceManager;
@@ -59,8 +59,9 @@ public final class InventoryTaskSplitterTest {
@Before
public void setUp() {
initJobItemContext();
- inventoryTaskSplitter = new
InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager(),
-
jobItemContext.getJobProcessContext().getImporterExecuteEngine(),
jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(),
jobItemContext.getInitProgress());
+ inventoryTaskSplitter = new InventoryTaskSplitter(
+ jobItemContext.getSourceDataSource(),
jobItemContext.getTaskConfig().getDumperConfig(),
jobItemContext.getTaskConfig().getImporterConfig(),
jobItemContext.getInitProgress(),
+ jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getDataSourceManager(),
jobItemContext.getJobProcessContext().getImporterExecuteEngine());
}
private void initJobItemContext() {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index c0097e72f04..076766b8a6a 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.task;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -26,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPr
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public final class IncrementalTaskTest {
@Before
public void setUp() {
- TaskConfiguration taskConfig =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
+ MigrationTaskConfiguration taskConfig =
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(),
taskConfig.getImporterConfig(),
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 03096ef1cad..7f9f60caec0 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.task;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -30,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestExcep
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public final class InventoryTaskTest {
private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new
DefaultPipelineDataSourceManager();
- private TaskConfiguration taskConfig;
+ private MigrationTaskConfiguration taskConfig;
@BeforeClass
public static void beforeClass() {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 56409e6fbd1..d98bc93950c 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -32,6 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.Memory
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -159,7 +159,7 @@ public final class PipelineContextUtil {
PipelineProcessConfiguration processConfig =
mockPipelineProcessConfiguration();
MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), processConfig);
int jobShardingItem = 0;
- TaskConfiguration taskConfig = new
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem,
processConfig);
+ MigrationTaskConfiguration taskConfig = new
MigrationJobAPIImpl().buildTaskConfiguration(jobConfig, jobShardingItem,
processConfig);
return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
processContext, taskConfig, new DefaultPipelineDataSourceManager());
}