This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b22c0db386 Remove AbstractImporter sub-classes and simplify 
ImporterFactory (#19949)
5b22c0db386 is described below

commit 5b22c0db386d89ade4f4607f8d73a14e0fc9f036
Author: Da Xiang Huang <[email protected]>
AuthorDate: Wed Aug 10 11:36:40 2022 +0800

    Remove AbstractImporter sub-classes and simplify ImporterFactory (#19949)
---
 ...{AbstractImporter.java => DefaultImporter.java} |  8 +-
 .../data/pipeline/core/task/IncrementalTask.java   |  5 +-
 .../data/pipeline/core/task/InventoryTask.java     |  4 +-
 .../core/job/importer/DefaultImporterCreator.java} | 32 ++++++--
 .../core/job/importer/ImporterCreator.java}        | 26 ++++---
 .../importer/ImporterCreatorFactory.java}          | 43 +++++------
 .../scaling/core/job/importer/ImporterFactory.java | 57 --------------
 .../scaling/core/spi/ScalingEntry.java             |  8 --
 ...phere.scaling.core.job.importer.ImporterCreator | 18 +++++
 .../core/spi/fixture/ScalingEntryFixture.java      |  8 +-
 ...phere.scaling.core.job.importer.ImporterCreator | 18 +++++
 .../data/pipeline/mysql/MySQLScalingEntry.java     |  8 +-
 .../data/pipeline/mysql/MySQLScalingEntryTest.java |  2 -
 .../pipeline/opengauss/OpenGaussScalingEntry.java  |  8 +-
 .../opengauss/OpenGaussScalingEntryTest.java       |  2 -
 .../postgresql/PostgreSQLScalingEntry.java         |  8 +-
 .../postgresql/PostgreSQLScalingEntryTest.java     |  2 -
 .../core/fixture/FixtureImporterCreator.java       | 22 ++++--
 .../core/fixture/H2ScalingEntryFixture.java        |  6 --
 ...tImporterTest.java => DefaultImporterTest.java} |  7 +-
 .../job/importer/ImporterCreatorFactoryTest.java   | 87 ++++++++++++++++++++++
 ...phere.scaling.core.job.importer.ImporterCreator | 18 +++++
 22 files changed, 230 insertions(+), 167 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
similarity index 96%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index dbfb221d2eb..28b87b12eb2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -48,10 +48,10 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * Abstract importer.
+ * Default importer.
  */
 @Slf4j
-public abstract class AbstractImporter extends AbstractLifecycleExecutor 
implements Importer {
+public final class DefaultImporter extends AbstractLifecycleExecutor 
implements Importer {
     
     private static final DataRecordMerger MERGER = new DataRecordMerger();
     
@@ -66,8 +66,8 @@ public abstract class AbstractImporter extends 
AbstractLifecycleExecutor impleme
     
     private final PipelineJobProgressListener jobProgressListener;
     
-    protected AbstractImporter(final ImporterConfiguration importerConfig, 
final PipelineDataSourceManager dataSourceManager, final PipelineChannel 
channel,
-                               final PipelineJobProgressListener 
jobProgressListener) {
+    public DefaultImporter(final ImporterConfiguration importerConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                           final PipelineJobProgressListener 
jobProgressListener) {
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
         this.channel = channel;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index abafe1c9600..ce61d09e89c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -38,7 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
+import 
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -97,7 +97,8 @@ public final class IncrementalTask extends 
AbstractLifecycleExecutor implements
                                                  final 
PipelineJobProgressListener jobProgressListener) {
         Collection<Importer> result = new LinkedList<>();
         for (int i = 0; i < concurrency; i++) {
-            result.add(ImporterFactory.createImporter(importerConfig, 
dataSourceManager, channel, jobProgressListener));
+            
result.add(ImporterCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig,
 dataSourceManager, channel,
+                    jobProgressListener));
         }
         return result;
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 85a1f9cdfde..1671ba01bb8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -38,7 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
+import 
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
 
 import javax.sql.DataSource;
 import java.util.List;
@@ -73,7 +73,7 @@ public final class InventoryTask extends 
AbstractLifecycleExecutor implements Pi
         taskId = generateTaskId(inventoryDumperConfig);
         channel = createChannel(pipelineChannelCreator);
         dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, 
channel, sourceDataSource, sourceMetaDataLoader);
-        importer = ImporterFactory.createImporter(importerConfig, 
dataSourceManager, channel, jobProgressListener);
+        importer = 
ImporterCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig,
 dataSourceManager, channel, jobProgressListener);
         position = inventoryDumperConfig.getPosition();
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
similarity index 52%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
index 773786ab0e9..0aaed77b0f5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
@@ -15,21 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.mysql.importer;
+package org.apache.shardingsphere.scaling.core.job.importer;
 
+import java.util.Collection;
+import java.util.LinkedList;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 
 /**
- * MySQL importer.
+ * Default importer creator.
  */
-public final class MySQLImporter extends AbstractImporter {
+public final class DefaultImporterCreator implements ImporterCreator {
     
-    public MySQLImporter(final ImporterConfiguration importerConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                         final PipelineJobProgressListener 
jobProgressListener) {
-        super(importerConfig, dataSourceManager, channel, jobProgressListener);
+    @Override
+    public Importer createImporter(final ImporterConfiguration importerConfig,
+                                   final PipelineDataSourceManager 
dataSourceManager, final PipelineChannel channel,
+                                   final PipelineJobProgressListener 
jobProgressListener) {
+        return new DefaultImporter(importerConfig, dataSourceManager, channel, 
jobProgressListener);
+    }
+
+    @Override
+    public String getType() {
+        return "MySQL";
+    }
+
+    @Override
+    public Collection<String> getTypeAliases() {
+        Collection<String> aliases = new LinkedList<>();
+        aliases.add("PostgreSQL");
+        aliases.add("openGauss");
+        return aliases;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
similarity index 59%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
index 3fb304ee247..20a96972b3d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
@@ -15,21 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.opengauss.importer;
+package org.apache.shardingsphere.scaling.core.job.importer;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
-/**
- * Importer of openGauss.
- */
-public final class OpenGaussImporter extends AbstractImporter {
+@SingletonSPI
+public interface ImporterCreator extends TypedSPI {
     
-    public OpenGaussImporter(final ImporterConfiguration importerConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                             final PipelineJobProgressListener 
jobProgressListener) {
-        super(importerConfig, dataSourceManager, channel, jobProgressListener);
-    }
+    /**
+     * Create importer.
+     * @param importerConfig importerConfig
+     * @param dataSourceManager dataSourceManager
+     * @param channel channel
+     * @param jobProgressListener jobProgressListener
+     * @return importer
+     */
+    Importer createImporter(ImporterConfiguration importerConfig, 
PipelineDataSourceManager dataSourceManager, PipelineChannel channel,
+                            PipelineJobProgressListener jobProgressListener);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
similarity index 50%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
index 8f327d99ccf..b10924edc3e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
@@ -15,38 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.spi;
+package org.apache.shardingsphere.scaling.core.job.importer;
 
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
 /**
- * Scaling entry.
+ * Importer factory.
  */
-@SingletonSPI
-public interface ScalingEntry extends TypedSPI {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ImporterCreatorFactory {
     
-    /**
-     * Get inventory dumper type.
-     *
-     * @return inventory dumper type
-     */
-    Class<? extends InventoryDumper> getInventoryDumperClass();
-    
-    /**
-     * Get incremental dumper type.
-     *
-     * @return incremental dumper type
-     */
-    Class<? extends IncrementalDumper> getIncrementalDumperClass();
+    static {
+        ShardingSphereServiceLoader.register(ImporterCreator.class);
+    }
     
     /**
-     * Get importer type.
-     *
-     * @return importer type
+     * Get ImporterCreator.
+     * @param databaseType databaseType
+     * @return ImporterCreator
      */
-    Class<? extends Importer> getImporterClass();
+    public static ImporterCreator getInstance(final String databaseType) {
+        return TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
databaseType);
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
deleted file mode 100644
index fbe1bfeb305..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.importer;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
-
-import java.lang.reflect.Constructor;
-
-/**
- * Importer factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ImporterFactory {
-    
-    /**
-     * Create importer.
-     *
-     * @param importerConfig importer configuration
-     * @param dataSourceManager data source manager
-     * @param channel channel
-     * @param jobProgressListener job progress listener
-     * @return importer
-     */
-    @SneakyThrows(ReflectiveOperationException.class)
-    public static Importer createImporter(final ImporterConfiguration 
importerConfig, final PipelineDataSourceManager dataSourceManager, final 
PipelineChannel channel,
-                                          final PipelineJobProgressListener 
jobProgressListener) {
-        String databaseType = 
importerConfig.getDataSourceConfig().getDatabaseType().getType();
-        ScalingEntry scalingEntry = 
ScalingEntryFactory.getInstance(databaseType);
-        Constructor<? extends Importer> constructor = 
scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, 
PipelineDataSourceManager.class, PipelineChannel.class,
-                PipelineJobProgressListener.class);
-        return constructor.newInstance(importerConfig, dataSourceManager, 
channel, jobProgressListener);
-    }
-}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 8f327d99ccf..5c1dfd02e6a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.spi;
 
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
@@ -42,11 +41,4 @@ public interface ScalingEntry extends TypedSPI {
      * @return incremental dumper type
      */
     Class<? extends IncrementalDumper> getIncrementalDumperClass();
-    
-    /**
-     * Get importer type.
-     *
-     * @return importer type
-     */
-    Class<? extends Importer> getImporterClass();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
new file mode 100644
index 00000000000..f7c30c08c48
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
index 0e9792ae259..800563bb8d8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.spi.fixture;
 
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -33,12 +32,7 @@ public final class ScalingEntryFixture implements 
ScalingEntry {
     public Class<? extends IncrementalDumper> getIncrementalDumperClass() {
         return IncrementalDumper.class;
     }
-    
-    @Override
-    public Class<? extends Importer> getImporterClass() {
-        return Importer.class;
-    }
-    
+
     @Override
     public String getType() {
         return "FIXTURE";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
new file mode 100644
index 00000000000..f7c30c08c48
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
index ce69a1a824b..c0c5b2d3878 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql;
 
-import org.apache.shardingsphere.data.pipeline.mysql.importer.MySQLImporter;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -36,12 +35,7 @@ public final class MySQLScalingEntry implements ScalingEntry 
{
     public Class<MySQLIncrementalDumper> getIncrementalDumperClass() {
         return MySQLIncrementalDumper.class;
     }
-    
-    @Override
-    public Class<MySQLImporter> getImporterClass() {
-        return MySQLImporter.class;
-    }
-    
+
     @Override
     public String getType() {
         return "MySQL";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
index 326f443dc7d..5ce566cb32e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql;
 
-import org.apache.shardingsphere.data.pipeline.mysql.importer.MySQLImporter;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,7 +33,6 @@ public final class MySQLScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry scalingEntry = ScalingEntryFactory.getInstance("MySQL");
         assertThat(scalingEntry, instanceOf(MySQLScalingEntry.class));
-        assertThat(scalingEntry.getImporterClass(), 
equalTo(MySQLImporter.class));
         assertThat(scalingEntry.getInventoryDumperClass(), 
equalTo(MySQLInventoryDumper.class));
         assertThat(scalingEntry.getIncrementalDumperClass(), 
equalTo(MySQLIncrementalDumper.class));
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
index 2f0d3e16487..294364f41f0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss;
 
-import 
org.apache.shardingsphere.data.pipeline.opengauss.importer.OpenGaussImporter;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -36,12 +35,7 @@ public final class OpenGaussScalingEntry implements 
ScalingEntry {
     public Class<OpenGaussWalDumper> getIncrementalDumperClass() {
         return OpenGaussWalDumper.class;
     }
-    
-    @Override
-    public Class<OpenGaussImporter> getImporterClass() {
-        return OpenGaussImporter.class;
-    }
-    
+
     @Override
     public String getType() {
         return "openGauss";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
index 6ba5321dfb5..8c648087a4f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss;
 
-import 
org.apache.shardingsphere.data.pipeline.opengauss.importer.OpenGaussImporter;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,7 +33,6 @@ public final class OpenGaussScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry actual = ScalingEntryFactory.getInstance("openGauss");
         assertThat(actual, instanceOf(OpenGaussScalingEntry.class));
-        assertThat(actual.getImporterClass(), 
equalTo(OpenGaussImporter.class));
         assertThat(actual.getInventoryDumperClass(), 
equalTo(PostgreSQLInventoryDumper.class));
         assertThat(actual.getIncrementalDumperClass(), 
equalTo(OpenGaussWalDumper.class));
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
index 90e5c714226..89b3f415732 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql;
 
-import 
org.apache.shardingsphere.data.pipeline.postgresql.importer.PostgreSQLImporter;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -36,12 +35,7 @@ public final class PostgreSQLScalingEntry implements 
ScalingEntry {
     public Class<PostgreSQLWalDumper> getIncrementalDumperClass() {
         return PostgreSQLWalDumper.class;
     }
-    
-    @Override
-    public Class<PostgreSQLImporter> getImporterClass() {
-        return PostgreSQLImporter.class;
-    }
-    
+
     @Override
     public String getType() {
         return "PostgreSQL";
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
index f1fbec9d60c..700e139d702 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql;
 
-import 
org.apache.shardingsphere.data.pipeline.postgresql.importer.PostgreSQLImporter;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,7 +33,6 @@ public final class PostgreSQLScalingEntryTest {
     public void assertGetScalingEntryByDatabaseType() {
         ScalingEntry scalingEntry = 
ScalingEntryFactory.getInstance("PostgreSQL");
         assertThat(scalingEntry, instanceOf(PostgreSQLScalingEntry.class));
-        assertThat(scalingEntry.getImporterClass(), 
equalTo(PostgreSQLImporter.class));
         assertThat(scalingEntry.getInventoryDumperClass(), 
equalTo(PostgreSQLInventoryDumper.class));
         assertThat(scalingEntry.getIncrementalDumperClass(), 
equalTo(PostgreSQLWalDumper.class));
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
similarity index 60%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
index ff63036612b..0413a02845a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
@@ -15,21 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.postgresql.importer;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator;
 
 /**
- * PostgreSQL importer.
+ * Fixture importer creator.
  */
-public final class PostgreSQLImporter extends AbstractImporter {
+public final class FixtureImporterCreator implements ImporterCreator {
     
-    public PostgreSQLImporter(final ImporterConfiguration importerConfig, 
final PipelineDataSourceManager dataSourceManager, final PipelineChannel 
channel,
-                              final PipelineJobProgressListener 
jobProgressListener) {
-        super(importerConfig, dataSourceManager, channel, jobProgressListener);
+    @Override
+    public Importer createImporter(final ImporterConfiguration importerConfig,
+                                   final PipelineDataSourceManager 
dataSourceManager, final PipelineChannel channel,
+                                   final PipelineJobProgressListener 
jobProgressListener) {
+        return new FixtureImporter(importerConfig, dataSourceManager, channel, 
jobProgressListener);
+    }
+
+    @Override
+    public String getType() {
+        return "H2";
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
index 459da7dac8d..d599be9af00 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,11 +33,6 @@ public final class H2ScalingEntryFixture implements 
ScalingEntry {
         return FixtureIncrementalDumper.class;
     }
     
-    @Override
-    public Class<? extends Importer> getImporterClass() {
-        return FixtureImporter.class;
-    }
-    
     @Override
     public String getType() {
         return "H2";
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
similarity index 97%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index ff0b7280b7d..49ab326e7a9 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -56,7 +56,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class AbstractImporterTest {
+public final class DefaultImporterTest {
     
     private static final String TABLE_NAME = "test_table";
     
@@ -78,12 +78,11 @@ public final class AbstractImporterTest {
     @Mock
     private PreparedStatement preparedStatement;
     
-    private AbstractImporter jdbcImporter;
+    private DefaultImporter jdbcImporter;
     
     @Before
     public void setUp() throws SQLException {
-        jdbcImporter = new AbstractImporter(mockImporterConfiguration(), 
dataSourceManager, channel, new FixturePipelineJobProgressListener()) {
-        };
+        jdbcImporter = new DefaultImporter(mockImporterConfiguration(), 
dataSourceManager, channel, new FixturePipelineJobProgressListener());
         
when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
         when(dataSource.getConnection()).thenReturn(connection);
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
new file mode 100644
index 00000000000..2f54c27594d
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.cor.job.importer;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
+import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import 
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public final class ImporterCreatorFactoryTest {
+    
+    @Mock
+    private PipelineDataSourceManager dataSourceManager;
+    
+    @Mock
+    private PipelineChannel channel;
+    
+    private final PipelineDataSourceConfiguration dataSourceConfig = new 
StandardPipelineDataSourceConfiguration(
+            
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
 "root", "root");
+    
+    @Test
+    public void assertCreateImporterForMysql() {
+        Importer importer = ImporterCreatorFactory.getInstance("MySQL")
+                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
+                        new FixturePipelineJobProgressListener());
+        assertThat(importer, instanceOf(DefaultImporter.class));
+    }
+    
+    @Test
+    public void assertCreateImporterForPostgreSQL() {
+        Importer importer = ImporterCreatorFactory.getInstance("PostgreSQL")
+                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
+                        new FixturePipelineJobProgressListener());
+        assertThat(importer, instanceOf(DefaultImporter.class));
+    }
+    
+    @Test
+    public void assertCreateImporterForOpenGauss() {
+        Importer importer = ImporterCreatorFactory.getInstance("openGauss")
+                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
+                        new FixturePipelineJobProgressListener());
+        assertThat(importer, instanceOf(DefaultImporter.class));
+    }
+    
+    @Test
+    public void assertCreateImporterForH2() {
+        Importer importer = ImporterCreatorFactory.getInstance("H2")
+                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
+                        new FixturePipelineJobProgressListener());
+        assertThat(importer, instanceOf(FixtureImporter.class));
+    }
+    
+    private ImporterConfiguration mockImporterConfiguration() {
+        Map<LogicTableName, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new LogicTableName("test_table"), 
Collections.singleton("user"));
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+    }
+}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
new file mode 100644
index 00000000000..7ba874250f7
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporterCreator

Reply via email to