This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a05ba93464 [feature][core] Unified engine initialization connector 
logic (#8536)
a05ba93464 is described below

commit a05ba934644e22d51bded244414ff2616302d6a1
Author: Guangdong Liu <804167...@qq.com>
AuthorDate: Wed Jan 22 18:49:58 2025 +0800

    [feature][core] Unified engine initialization connector logic (#8536)
---
 .../seatunnel/api/common}/PluginIdentifier.java    |   2 +-
 .../seatunnel/api/table/factory/FactoryUtil.java   | 123 ++++++++++++---
 .../core/starter/execution/PluginUtil.java         | 171 +--------------------
 .../flink/execution/SinkExecuteProcessor.java      |  84 +++-------
 .../flink/execution/SinkExecuteProcessor.java      |  84 +++-------
 .../flink/execution/SourceExecuteProcessor.java    |  41 +++--
 .../seatunnel/core/starter/spark/SparkStarter.java |   2 +-
 .../spark/execution/SinkExecuteProcessor.java      |  57 ++++---
 .../seatunnel/core/starter/spark/SparkStarter.java |   2 +-
 .../spark/execution/SinkExecuteProcessor.java      |  59 ++++---
 .../spark/execution/SourceExecuteProcessor.java    |  37 +++--
 .../SparkAbstractPluginExecuteProcessor.java       |  21 +++
 .../seatunnel/command/ConnectorCheckCommand.java   |   2 +-
 .../resources/neo4j/fake_to_neo4j_batch_write.conf |   4 +
 .../engine/core/parse/ConnectorInstanceLoader.java |   2 +-
 .../engine/core/parse/JobConfigParser.java         | 142 -----------------
 .../core/parse/MultipleTableJobConfigParser.java   |  91 ++++-------
 .../plugin/discovery/AbstractPluginDiscovery.java  |   1 +
 .../plugin/discovery/PluginDiscovery.java          |   1 +
 .../seatunnel/SeaTunnelFactoryDiscovery.java       |   2 +-
 .../seatunnel/SeaTunnelSinkPluginDiscovery.java    |   2 +-
 .../seatunnel/SeaTunnelSourcePluginDiscovery.java  |   2 +-
 .../SeaTunnelTransformPluginDiscovery.java         |   2 +-
 .../discovery/AbstractPluginDiscoveryTest.java     |   1 +
 .../SeaTunnelSourcePluginDiscoveryTest.java        |   2 +-
 25 files changed, 314 insertions(+), 623 deletions(-)

diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java
similarity index 98%
rename from 
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java
index 8540063698..4cdbfe27e4 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.plugin.discovery;
+package org.apache.seatunnel.api.common;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index e11afd1d19..30e7b00864 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
@@ -36,11 +37,13 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 import scala.Tuple2;
 
 import java.io.Serializable;
@@ -51,12 +54,17 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+
 /**
  * Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and 
{@link
  * CatalogFactory}.
  */
+@Slf4j
 public final class FactoryUtil {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
@@ -65,8 +73,13 @@ public final class FactoryUtil {
 
     public static <T, SplitT extends SourceSplit, StateT extends Serializable>
             Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> 
createAndPrepareSource(
-                    ReadonlyConfig options, ClassLoader classLoader, String 
factoryIdentifier) {
-        return restoreAndPrepareSource(options, classLoader, 
factoryIdentifier, null);
+                    ReadonlyConfig options,
+                    ClassLoader classLoader,
+                    String factoryIdentifier,
+                    Function<PluginIdentifier, SeaTunnelSource> 
fallbackCreateSource,
+                    TableSourceFactory factory) {
+        return restoreAndPrepareSource(
+                options, classLoader, factoryIdentifier, null, 
fallbackCreateSource, factory);
     }
 
     public static <T, SplitT extends SourceSplit, StateT extends Serializable>
@@ -74,22 +87,46 @@ public final class FactoryUtil {
                     ReadonlyConfig options,
                     ClassLoader classLoader,
                     String factoryIdentifier,
-                    ChangeStreamTableSourceCheckpoint checkpoint) {
+                    ChangeStreamTableSourceCheckpoint checkpoint,
+                    Function<PluginIdentifier, SeaTunnelSource> 
fallbackCreateSource,
+                    TableSourceFactory factory) {
 
         try {
-            final TableSourceFactory factory =
-                    discoverFactory(classLoader, TableSourceFactory.class, 
factoryIdentifier);
+
             SeaTunnelSource<T, SplitT, StateT> source;
-            if (factory instanceof ChangeStreamTableSourceFactory && 
checkpoint != null) {
-                ChangeStreamTableSourceFactory changeStreamTableSourceFactory =
-                        (ChangeStreamTableSourceFactory) factory;
-                ChangeStreamTableSourceState<Serializable, SourceSplit> state =
-                        
changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint);
+            final String factoryId = options.get(PLUGIN_NAME);
+
+            boolean fallback =
+                    isFallback(
+                            classLoader,
+                            TableSourceFactory.class,
+                            factoryId,
+                            (sourceFactory) -> 
sourceFactory.createSource(null));
+
+            if (fallback) {
                 source =
-                        restoreAndPrepareSource(
-                                changeStreamTableSourceFactory, options, 
classLoader, state);
+                        fallbackCreateSource.apply(
+                                PluginIdentifier.of("seatunnel", "source", 
factoryId));
+                source.prepare(options.toConfig());
+
             } else {
-                source = createAndPrepareSource(factory, options, classLoader);
+                if (factory == null) {
+                    factory =
+                            discoverFactory(
+                                    classLoader, TableSourceFactory.class, 
factoryIdentifier);
+                }
+
+                if (factory instanceof ChangeStreamTableSourceFactory && 
checkpoint != null) {
+                    ChangeStreamTableSourceFactory 
changeStreamTableSourceFactory =
+                            (ChangeStreamTableSourceFactory) factory;
+                    ChangeStreamTableSourceState<Serializable, SourceSplit> 
state =
+                            
changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint);
+                    source =
+                            restoreAndPrepareSource(
+                                    changeStreamTableSourceFactory, options, 
classLoader, state);
+                } else {
+                    source = createAndPrepareSource(factory, options, 
classLoader);
+                }
             }
             List<CatalogTable> catalogTables;
             try {
@@ -115,6 +152,7 @@ public final class FactoryUtil {
                 catalogTables.add(catalogTable);
             }
             return new Tuple2<>(source, catalogTables);
+
         } catch (Throwable t) {
             throw new FactoryException(
                     String.format(
@@ -150,17 +188,42 @@ public final class FactoryUtil {
                     CatalogTable catalogTable,
                     ReadonlyConfig config,
                     ClassLoader classLoader,
-                    String factoryIdentifier) {
+                    String factoryIdentifier,
+                    Function<PluginIdentifier, SeaTunnelSink> 
fallbackCreateSink,
+                    TableSinkFactory<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT>
+                            tableSinkFactory) {
         try {
-            TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
factory =
-                    discoverFactory(classLoader, TableSinkFactory.class, 
factoryIdentifier);
+            final String factoryId = config.get(PLUGIN_NAME);
+
+            boolean fallback =
+                    isFallback(
+                            classLoader,
+                            TableSinkFactory.class,
+                            factoryId,
+                            (sinkFactory) -> sinkFactory.createSink(null));
+
+            if (fallback) {
+                SeaTunnelSink sink =
+                        fallbackCreateSink.apply(
+                                PluginIdentifier.of("seatunnel", "sink", 
factoryId));
+                sink.prepare(config.toConfig());
+                sink.setTypeInfo(catalogTable.getSeaTunnelRowType());
+
+                return sink;
+            }
+
+            if (tableSinkFactory == null) {
+                tableSinkFactory =
+                        discoverFactory(classLoader, TableSinkFactory.class, 
factoryIdentifier);
+            }
+
             TableSinkFactoryContext context =
                     TableSinkFactoryContext.replacePlaceholderAndCreate(
                             catalogTable,
                             config,
                             classLoader,
-                            factory.excludeTablePlaceholderReplaceKeys());
-            
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
+                            
tableSinkFactory.excludeTablePlaceholderReplaceKeys());
+            
ConfigValidator.of(context.getOptions()).validate(tableSinkFactory.optionRule());
 
             LOG.info(
                     "Create sink '{}' with upstream input 
catalog-table[database: {}, schema: {}, table: {}]",
@@ -168,7 +231,7 @@ public final class FactoryUtil {
                     catalogTable.getTablePath().getDatabaseName(),
                     catalogTable.getTablePath().getSchemaName(),
                     catalogTable.getTablePath().getTableName());
-            return factory.createSink(context).createSink();
+            return tableSinkFactory.createSink(context).createSink();
         } catch (Throwable t) {
             throw new FactoryException(
                     String.format(
@@ -351,4 +414,26 @@ public final class FactoryUtil {
         
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
         return factory.createTransform(context).createTransform();
     }
+
+    private static <T extends Factory> boolean isFallback(
+            ClassLoader classLoader,
+            Class<T> factoryClass,
+            String factoryId,
+            Consumer<T> virtualCreator) {
+        Optional<T> factory = discoverOptionalFactory(classLoader, 
factoryClass, factoryId);
+        if (!factory.isPresent()) {
+            return true;
+        }
+        try {
+            virtualCreator.accept(factory.get());
+        } catch (Exception e) {
+            if (e instanceof UnsupportedOperationException
+                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
+                            .equals(e.getMessage())) {
+                return true;
+            }
+            log.debug(ExceptionUtils.getMessage(e));
+        }
+        return false;
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index a8a245dd9b..b2b47854e3 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -20,40 +20,21 @@ package org.apache.seatunnel.core.starter.execution;
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryException;
-import org.apache.seatunnel.api.table.factory.FactoryUtil;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 
 import java.net.URL;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
-import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
 
 /** The util used for Spark/Flink to create to SeaTunnelSource etc. */
 @SuppressWarnings("rawtypes")
@@ -61,75 +42,6 @@ public class PluginUtil {
 
     protected static final String ENGINE_TYPE = "seatunnel";
 
-    public static SourceTableInfo createSource(
-            SeaTunnelFactoryDiscovery factoryDiscovery,
-            SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
-            PluginIdentifier pluginIdentifier,
-            Config pluginConfig,
-            JobContext jobContext) {
-        // get current thread classloader
-        ClassLoader classLoader =
-                Thread.currentThread()
-                        .getContextClassLoader(); // try to find factory of 
this plugin
-
-        final ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
-        // try to find table source factory
-        final Optional<Factory> sourceFactory =
-                
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
-        final boolean fallback = isFallback(sourceFactory);
-        SeaTunnelSource source;
-        if (fallback) {
-            source = fallbackCreate(sourcePluginDiscovery, pluginIdentifier, 
pluginConfig);
-        } else {
-            // create source with source factory
-            TableSourceFactoryContext context =
-                    new TableSourceFactoryContext(readonlyConfig, classLoader);
-            
ConfigValidator.of(context.getOptions()).validate(sourceFactory.get().optionRule());
-            TableSource tableSource =
-                    ((TableSourceFactory) 
sourceFactory.get()).createSource(context);
-            source = tableSource.createSource();
-        }
-        source.setJobContext(jobContext);
-        ensureJobModeMatch(jobContext, source);
-        List<CatalogTable> catalogTables;
-        try {
-            catalogTables = source.getProducedCatalogTables();
-        } catch (UnsupportedOperationException e) {
-            // TODO remove it when all connector use `getProducedCatalogTables`
-            SeaTunnelDataType<?> seaTunnelDataType = source.getProducedType();
-            final String tableId =
-                    
readonlyConfig.getOptional(CommonOptions.PLUGIN_OUTPUT).orElse(DEFAULT_ID);
-            catalogTables =
-                    
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
-        }
-        return new SourceTableInfo(source, catalogTables);
-    }
-
-    private static boolean isFallback(Optional<Factory> factory) {
-        if (!factory.isPresent()) {
-            return true;
-        }
-        try {
-            ((TableSourceFactory) factory.get()).createSource(null);
-        } catch (Exception e) {
-            if (e instanceof UnsupportedOperationException
-                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
-                            .equals(e.getMessage())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private static SeaTunnelSource fallbackCreate(
-            SeaTunnelSourcePluginDiscovery sourcePluginDiscovery,
-            PluginIdentifier pluginIdentifier,
-            Config pluginConfig) {
-        SeaTunnelSource source = 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
-        source.prepare(pluginConfig);
-        return source;
-    }
-
     public static Optional<? extends Factory> createTransformFactory(
             SeaTunnelFactoryDiscovery factoryDiscovery,
             SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
@@ -163,87 +75,6 @@ public class PluginUtil {
         }
     }
 
-    public static SeaTunnelSink createSink(
-            Optional<? extends Factory> factory,
-            Config sinkConfig,
-            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
-            JobContext jobContext,
-            List<CatalogTable> catalogTables,
-            ClassLoader classLoader) {
-        boolean fallBack = !factory.isPresent() || isFallback(factory.get());
-        if (fallBack) {
-            SeaTunnelSink sink =
-                    fallbackCreateSink(
-                            sinkPluginDiscovery,
-                            PluginIdentifier.of(
-                                    ENGINE_TYPE,
-                                    PluginType.SINK.getType(),
-                                    sinkConfig.getString(PLUGIN_NAME.key())),
-                            sinkConfig);
-            sink.setJobContext(jobContext);
-            sink.setTypeInfo(catalogTables.get(0).getSeaTunnelRowType());
-            return sink;
-        } else {
-            if (catalogTables.size() > 1) {
-                Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
-                ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(sinkConfig);
-                catalogTables.forEach(
-                        catalogTable -> {
-                            TableSinkFactoryContext context =
-                                    
TableSinkFactoryContext.replacePlaceholderAndCreate(
-                                            catalogTable,
-                                            
ReadonlyConfig.fromConfig(sinkConfig),
-                                            classLoader,
-                                            ((TableSinkFactory) factory.get())
-                                                    
.excludeTablePlaceholderReplaceKeys());
-                            ConfigValidator.of(context.getOptions())
-                                    .validate(factory.get().optionRule());
-                            SeaTunnelSink action =
-                                    ((TableSinkFactory) factory.get())
-                                            .createSink(context)
-                                            .createSink();
-                            action.setJobContext(jobContext);
-                            sinks.put(catalogTable.getTablePath(), action);
-                        });
-                return FactoryUtil.createMultiTableSink(sinks, readonlyConfig, 
classLoader);
-            }
-            TableSinkFactoryContext context =
-                    TableSinkFactoryContext.replacePlaceholderAndCreate(
-                            catalogTables.get(0),
-                            ReadonlyConfig.fromConfig(sinkConfig),
-                            classLoader,
-                            ((TableSinkFactory) factory.get())
-                                    .excludeTablePlaceholderReplaceKeys());
-            
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
-            SeaTunnelSink sink =
-                    ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
-            sink.setJobContext(jobContext);
-            return sink;
-        }
-    }
-
-    public static boolean isFallback(Factory factory) {
-        try {
-            ((TableSinkFactory) factory).createSink(null);
-        } catch (Exception e) {
-            if (e instanceof UnsupportedOperationException
-                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
-                            .equals(e.getMessage())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static SeaTunnelSink fallbackCreateSink(
-            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
-            PluginIdentifier pluginIdentifier,
-            Config pluginConfig) {
-        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
-        source.prepare(pluginConfig);
-        return source;
-    }
-
     public static void ensureJobModeMatch(JobContext jobContext, 
SeaTunnelSource source) {
         if (jobContext.getJobMode() == JobMode.BATCH
                 && source.getBoundedness()
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6f24e1c3fe..4466844536 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -34,14 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -55,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
@@ -65,8 +62,6 @@ import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
 
-    private static final String PLUGIN_TYPE = PluginType.SINK.getType();
-
     protected SinkExecuteProcessor(
             List<URL> jarPaths,
             Config envConfig,
@@ -101,48 +96,27 @@ public class SinkExecuteProcessor
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
         DataStreamTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+                sinkPluginDiscovery::createPluginInstance;
         for (int i = 0; i < plugins.size(); i++) {
+            Optional<? extends Factory> factory = plugins.get(i);
             Config sinkConfig = pluginConfigs.get(i);
             DataStreamTableInfo stream =
                     fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
-            Optional<? extends Factory> factory = plugins.get(i);
-            boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
             Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
-            if (fallBack) {
-                for (CatalogTable catalogTable : stream.getCatalogTables()) {
-                    SeaTunnelSink fallBackSink =
-                            fallbackCreateSink(
-                                    sinkPluginDiscovery,
-                                    PluginIdentifier.of(
-                                            ENGINE_TYPE,
-                                            PLUGIN_TYPE,
-                                            
sinkConfig.getString(PLUGIN_NAME.key())),
-                                    sinkConfig);
-                    fallBackSink.setJobContext(jobContext);
-                    SeaTunnelRowType sourceType = 
catalogTable.getSeaTunnelRowType();
-                    fallBackSink.setTypeInfo(sourceType);
-                    handleSaveMode(fallBackSink);
-                    TableIdentifier tableId = catalogTable.getTableId();
-                    sinks.put(tableId.toTablePath(), fallBackSink);
-                }
-            } else {
-                for (CatalogTable catalogTable : stream.getCatalogTables()) {
-                    SeaTunnelSink seaTunnelSink;
-                    TableSinkFactoryContext context =
-                            
TableSinkFactoryContext.replacePlaceholderAndCreate(
-                                    catalogTable,
-                                    ReadonlyConfig.fromConfig(sinkConfig),
-                                    classLoader,
-                                    ((TableSinkFactory) factory.get())
-                                            
.excludeTablePlaceholderReplaceKeys());
-                    
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
-                    seaTunnelSink =
-                            ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
-                    seaTunnelSink.setJobContext(jobContext);
-                    handleSaveMode(seaTunnelSink);
-                    TableIdentifier tableId = catalogTable.getTableId();
-                    sinks.put(tableId.toTablePath(), seaTunnelSink);
-                }
+            for (CatalogTable catalogTable : stream.getCatalogTables()) {
+                SeaTunnelSink sink =
+                        FactoryUtil.createAndPrepareSink(
+                                catalogTable,
+                                ReadonlyConfig.fromConfig(sinkConfig),
+                                classLoader,
+                                sinkConfig.getString(PLUGIN_NAME.key()),
+                                fallbackCreateSink,
+                                ((TableSinkFactory) (factory.orElse(null))));
+                sink.setJobContext(jobContext);
+                handleSaveMode(sink);
+                TableIdentifier tableId = catalogTable.getTableId();
+                sinks.put(tableId.toTablePath(), sink);
             }
             SeaTunnelSink sink =
                     tryGenerateMultiTableSink(
@@ -178,28 +152,6 @@ public class SinkExecuteProcessor
         return FactoryUtil.createMultiTableSink(sinks, sinkConfig, 
classLoader);
     }
 
-    public boolean isFallback(Factory factory) {
-        try {
-            ((TableSinkFactory) factory).createSink(null);
-        } catch (Exception e) {
-            if (e instanceof UnsupportedOperationException
-                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
-                            .equals(e.getMessage())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public SeaTunnelSink fallbackCreateSink(
-            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
-            PluginIdentifier pluginIdentifier,
-            Config pluginConfig) {
-        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
-        source.prepare(pluginConfig);
-        return source;
-    }
-
     public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
         if (seaTunnelSink instanceof SupportSaveMode) {
             SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index d41bfe34ce..a82d2392e6 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -34,14 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -56,6 +52,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
@@ -66,8 +63,6 @@ import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
 
-    private static final String PLUGIN_TYPE = PluginType.SINK.getType();
-
     protected SinkExecuteProcessor(
             List<URL> jarPaths,
             Config envConfig,
@@ -102,48 +97,27 @@ public class SinkExecuteProcessor
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
         DataStreamTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+                sinkPluginDiscovery::createPluginInstance;
         for (int i = 0; i < plugins.size(); i++) {
+            Optional<? extends Factory> factory = plugins.get(i);
             Config sinkConfig = pluginConfigs.get(i);
             DataStreamTableInfo stream =
                     fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
-            Optional<? extends Factory> factory = plugins.get(i);
-            boolean fallBack = !factory.isPresent() || 
isFallback(factory.get());
             Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
-            if (fallBack) {
-                for (CatalogTable catalogTable : stream.getCatalogTables()) {
-                    SeaTunnelSink fallBackSink =
-                            fallbackCreateSink(
-                                    sinkPluginDiscovery,
-                                    PluginIdentifier.of(
-                                            ENGINE_TYPE,
-                                            PLUGIN_TYPE,
-                                            
sinkConfig.getString(PLUGIN_NAME.key())),
-                                    sinkConfig);
-                    fallBackSink.setJobContext(jobContext);
-                    SeaTunnelRowType sourceType = 
catalogTable.getSeaTunnelRowType();
-                    fallBackSink.setTypeInfo(sourceType);
-                    handleSaveMode(fallBackSink);
-                    TableIdentifier tableId = catalogTable.getTableId();
-                    sinks.put(tableId.toTablePath(), fallBackSink);
-                }
-            } else {
-                for (CatalogTable catalogTable : stream.getCatalogTables()) {
-                    SeaTunnelSink seaTunnelSink;
-                    TableSinkFactoryContext context =
-                            
TableSinkFactoryContext.replacePlaceholderAndCreate(
-                                    catalogTable,
-                                    ReadonlyConfig.fromConfig(sinkConfig),
-                                    classLoader,
-                                    ((TableSinkFactory) factory.get())
-                                            
.excludeTablePlaceholderReplaceKeys());
-                    
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
-                    seaTunnelSink =
-                            ((TableSinkFactory) 
factory.get()).createSink(context).createSink();
-                    seaTunnelSink.setJobContext(jobContext);
-                    handleSaveMode(seaTunnelSink);
-                    TableIdentifier tableId = catalogTable.getTableId();
-                    sinks.put(tableId.toTablePath(), seaTunnelSink);
-                }
+            for (CatalogTable catalogTable : stream.getCatalogTables()) {
+                SeaTunnelSink sink =
+                        FactoryUtil.createAndPrepareSink(
+                                catalogTable,
+                                ReadonlyConfig.fromConfig(sinkConfig),
+                                classLoader,
+                                sinkConfig.getString(PLUGIN_NAME.key()),
+                                fallbackCreateSink,
+                                ((TableSinkFactory) (factory.orElse(null))));
+                sink.setJobContext(jobContext);
+                handleSaveMode(sink);
+                TableIdentifier tableId = catalogTable.getTableId();
+                sinks.put(tableId.toTablePath(), sink);
             }
             SeaTunnelSink sink =
                     tryGenerateMultiTableSink(
@@ -184,28 +158,6 @@ public class SinkExecuteProcessor
         return FactoryUtil.createMultiTableSink(sinks, sinkConfig, 
classLoader);
     }
 
-    public boolean isFallback(Factory factory) {
-        try {
-            ((TableSinkFactory) factory).createSink(null);
-        } catch (Exception e) {
-            if (e instanceof UnsupportedOperationException
-                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
-                            .equals(e.getMessage())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public SeaTunnelSink fallbackCreateSink(
-            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
-            PluginIdentifier pluginIdentifier,
-            Config pluginConfig) {
-        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
-        source.prepare(pluginConfig);
-        return source;
-    }
-
     public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
         if (seaTunnelSink instanceof SupportSaveMode) {
             SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 4e1de7c95d..d637f2256b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -22,14 +22,16 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import org.apache.seatunnel.translation.flink.source.FlinkSource;
@@ -39,15 +41,19 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import lombok.extern.slf4j.Slf4j;
+import scala.Tuple2;
 
+import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Function;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
+import static 
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
 
 @Slf4j
 @SuppressWarnings("unchecked,rawtypes")
@@ -95,11 +101,12 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
     @Override
     protected List<SourceTableInfo> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs) {
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
-                new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
-
         SeaTunnelFactoryDiscovery factoryDiscovery =
                 new SeaTunnelFactoryDiscovery(TableSourceFactory.class, 
ADD_URL_TO_CLASSLOADER);
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
+                new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
+        Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource =
+                sourcePluginDiscovery::createPluginInstance;
 
         List<SourceTableInfo> sources = new ArrayList<>();
         Set<URL> jars = new HashSet<>();
@@ -109,14 +116,22 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                             ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
                     
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SourceTableInfo source =
-                    PluginUtil.createSource(
-                            factoryDiscovery,
-                            sourcePluginDiscovery,
-                            pluginIdentifier,
-                            sourceConfig,
-                            jobContext);
-            sources.add(source);
+
+            Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> source =
+                    FactoryUtil.createAndPrepareSource(
+                            ReadonlyConfig.fromConfig(sourceConfig),
+                            Thread.currentThread().getContextClassLoader(),
+                            pluginIdentifier.getPluginName(),
+                            fallbackCreateSource,
+                            (TableSourceFactory)
+                                    factoryDiscovery
+                                            
.createOptionalPluginInstance(pluginIdentifier)
+                                            .orElse(null));
+
+            source._1().setJobContext(jobContext);
+            ensureJobModeMatch(jobContext, source._1());
+
+            sources.add(new SourceTableInfo(source._1(), source._2()));
         }
         jarPaths.addAll(jars);
         return sources;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index f2c20e8408..e9ee482e9c 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
@@ -29,7 +30,6 @@ import 
org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 import org.apache.seatunnel.core.starter.utils.CompressionUtils;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index d4f99d65f5..6e22576c91 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -29,12 +31,12 @@ import 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -45,11 +47,14 @@ import org.apache.spark.sql.Row;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
 public class SinkExecuteProcessor
@@ -91,6 +96,8 @@ public class SinkExecuteProcessor
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         DatasetTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
+        Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+                sinkPluginDiscovery::createPluginInstance;
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
             DatasetTableInfo datasetTableInfo =
@@ -110,15 +117,25 @@ public class SinkExecuteProcessor
                                         
CommonOptions.PARALLELISM.defaultValue());
             }
             
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), 
parallelism);
-            Optional<? extends Factory> factory = plugins.get(i);
+            Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+            datasetTableInfo.getCatalogTables().stream()
+                    .forEach(
+                            catalogTable -> {
+                                SeaTunnelSink<Object, Object, Object, Object> 
sink =
+                                        FactoryUtil.createAndPrepareSink(
+                                                catalogTable,
+                                                
ReadonlyConfig.fromConfig(sinkConfig),
+                                                classLoader,
+                                                
sinkConfig.getString(PLUGIN_NAME.key()),
+                                                fallbackCreateSink,
+                                                null);
+                                sink.setJobContext(jobContext);
+                                
sinks.put(catalogTable.getTableId().toTablePath(), sink);
+                            });
+
             SeaTunnelSink sink =
-                    PluginUtil.createSink(
-                            factory,
-                            sinkConfig,
-                            sinkPluginDiscovery,
-                            jobContext,
-                            datasetTableInfo.getCatalogTables(),
-                            classLoader);
+                    tryGenerateMultiTableSink(
+                            sinks, ReadonlyConfig.fromConfig(sinkConfig), 
classLoader);
             // TODO modify checkpoint location
             handleSaveMode(sink);
             String applicationId =
@@ -134,28 +151,6 @@ public class SinkExecuteProcessor
         return null;
     }
 
-    public boolean isFallback(Factory factory) {
-        try {
-            ((TableSinkFactory) factory).createSink(null);
-        } catch (Exception e) {
-            if (e instanceof UnsupportedOperationException
-                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
-                            .equals(e.getMessage())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public SeaTunnelSink fallbackCreateSink(
-            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
-            PluginIdentifier pluginIdentifier,
-            Config pluginConfig) {
-        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
-        source.prepare(pluginConfig);
-        return source;
-    }
-
     public void handleSaveMode(SeaTunnelSink sink) {
         if (sink instanceof SupportSaveMode) {
             Optional<SaveModeHandler> saveModeHandler =
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 790a20191d..dcd6a804b2 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
@@ -29,7 +30,6 @@ import 
org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 import org.apache.seatunnel.core.starter.utils.CompressionUtils;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index b66aaf7d86..74b0e1c1f1 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -29,12 +31,12 @@ import 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -44,15 +46,21 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
+@Slf4j
 public class SinkExecuteProcessor
         extends SparkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
     private static final String PLUGIN_TYPE = PluginType.SINK.getType();
@@ -92,6 +100,8 @@ public class SinkExecuteProcessor
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         DatasetTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
+        Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+                sinkPluginDiscovery::createPluginInstance;
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
             DatasetTableInfo datasetTableInfo =
@@ -110,15 +120,24 @@ public class SinkExecuteProcessor
                                         
CommonOptions.PARALLELISM.defaultValue());
             }
             
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), 
parallelism);
-            Optional<? extends Factory> factory = plugins.get(i);
+            Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+            datasetTableInfo.getCatalogTables().stream()
+                    .forEach(
+                            catalogTable -> {
+                                SeaTunnelSink<Object, Object, Object, Object> 
sink =
+                                        FactoryUtil.createAndPrepareSink(
+                                                catalogTable,
+                                                
ReadonlyConfig.fromConfig(sinkConfig),
+                                                classLoader,
+                                                
sinkConfig.getString(PLUGIN_NAME.key()),
+                                                fallbackCreateSink,
+                                                null);
+                                sink.setJobContext(jobContext);
+                                
sinks.put(catalogTable.getTableId().toTablePath(), sink);
+                            });
             SeaTunnelSink sink =
-                    PluginUtil.createSink(
-                            factory,
-                            sinkConfig,
-                            sinkPluginDiscovery,
-                            jobContext,
-                            datasetTableInfo.getCatalogTables(),
-                            classLoader);
+                    tryGenerateMultiTableSink(
+                            sinks, ReadonlyConfig.fromConfig(sinkConfig), 
classLoader);
             // TODO modify checkpoint location
             handleSaveMode(sink);
             String applicationId =
@@ -135,28 +154,6 @@ public class SinkExecuteProcessor
         return null;
     }
 
-    public boolean isFallback(Factory factory) {
-        try {
-            ((TableSinkFactory) factory).createSink(null);
-        } catch (Exception e) {
-            if (e instanceof UnsupportedOperationException
-                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
-                            .equals(e.getMessage())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public SeaTunnelSink fallbackCreateSink(
-            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
-            PluginIdentifier pluginIdentifier,
-            Config pluginConfig) {
-        SeaTunnelSink source = 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
-        source.prepare(pluginConfig);
-        return source;
-    }
-
     public void handleSaveMode(SeaTunnelSink sink) {
         if (sink instanceof SupportSaveMode) {
             Optional<SaveModeHandler> saveModeHandler =
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 5f4a583d84..50b3a88814 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -23,21 +23,24 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
+import scala.Tuple2;
+
+import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -45,9 +48,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
+import static 
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
 
 @SuppressWarnings("rawtypes")
 public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<SourceTableInfo> {
@@ -110,8 +115,9 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
     @Override
     protected List<SourceTableInfo> initializePlugins(List<? extends Config> 
pluginConfigs) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery();
-        SeaTunnelFactoryDiscovery factoryDiscovery =
-                new SeaTunnelFactoryDiscovery(TableSourceFactory.class);
+
+        Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource =
+                sourcePluginDiscovery::createPluginInstance;
 
         List<SourceTableInfo> sources = new ArrayList<>();
         Set<URL> jars = new HashSet<>();
@@ -121,14 +127,17 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
                             ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
                     
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SourceTableInfo source =
-                    PluginUtil.createSource(
-                            factoryDiscovery,
-                            sourcePluginDiscovery,
-                            pluginIdentifier,
-                            sourceConfig,
-                            jobContext);
-            sources.add(source);
+            Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> source =
+                    FactoryUtil.createAndPrepareSource(
+                            ReadonlyConfig.fromConfig(sourceConfig),
+                            Thread.currentThread().getContextClassLoader(),
+                            pluginIdentifier.getPluginName(),
+                            fallbackCreateSource,
+                            null);
+
+            source._1().setJobContext(jobContext);
+            ensureJobModeMatch(jobContext, source._1());
+            sources.add(new SourceTableInfo(source._1(), source._2()));
         }
         sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));
         return sources;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
index e85e2c56eb..d9de016446 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
@@ -21,6 +21,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
 import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -28,12 +32,16 @@ import 
org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_INPUT;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
 
+@Slf4j
 public abstract class SparkAbstractPluginExecuteProcessor<T>
         implements PluginExecuteProcessor<DatasetTableInfo, 
SparkRuntimeEnvironment> {
     protected SparkRuntimeEnvironment sparkRuntimeEnvironment;
@@ -101,6 +109,19 @@ public abstract class 
SparkAbstractPluginExecuteProcessor<T>
                         pluginInputIdentifier));
     }
 
+    // if not support multi table, rollback
+    protected SeaTunnelSink tryGenerateMultiTableSink(
+            Map<TablePath, SeaTunnelSink> sinks,
+            ReadonlyConfig sinkConfig,
+            ClassLoader classLoader) {
+        if (sinks.values().stream().anyMatch(sink -> !(sink instanceof 
SupportMultiTableSink))) {
+            log.info("Unsupported multi table sink api, rollback to sink 
template");
+            // choose the first sink
+            return sinks.values().iterator().next();
+        }
+        return FactoryUtil.createMultiTableSink(sinks, sinkConfig, 
classLoader);
+    }
+
     private void registerTempView(String tableName, Dataset<Row> ds) {
         ds.createOrReplaceTempView(tableName);
     }
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
index 1a2514fdca..b7b9adbccb 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.seatunnel.command;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -25,7 +26,6 @@ import 
org.apache.seatunnel.core.starter.exception.CommandExecuteException;
 import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
 import 
org.apache.seatunnel.core.starter.seatunnel.args.ConnectorCheckCommandArgs;
 import org.apache.seatunnel.plugin.discovery.PluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
index bb22567bcf..86f52b5f54 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
@@ -54,6 +54,10 @@ sink {
 
     max_transaction_retry_time = 3
     max_connection_timeout = 1
+    queryParamPosition = {
+      string = 0
+      int = 1
+    }
 
     query = "unwind $batch as row  create(n:BatchLabel) set n.name = 
row.name,n.age = row.age"
   }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
index affc90283b..87b1a4e7cd 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java
@@ -21,12 +21,12 @@ import 
org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 2ec19cabc9..02a56b3b83 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -17,42 +17,15 @@
 
 package org.apache.seatunnel.engine.core.parse;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.CollectionConstants;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
-import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
-import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.Data;
 import lombok.NonNull;
-import scala.Serializable;
-import scala.Tuple2;
 
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
 
 @Data
 public class JobConfigParser {
@@ -73,125 +46,10 @@ public class JobConfigParser {
         this.isStartWithSavePoint = isStartWithSavePoint;
     }
 
-    public Tuple2<CatalogTable, Action> parseSource(
-            Config config, JobConfig jobConfig, String tableId, int 
parallelism) {
-        ImmutablePair<SeaTunnelSource, Set<URL>> tuple =
-                ConnectorInstanceLoader.loadSourceInstance(
-                        config, jobConfig.getJobContext(), commonPluginJars);
-        final SeaTunnelSource source = tuple.getLeft();
-        // old logic: prepare(initialization) -> set job context
-        source.prepare(config);
-        source.setJobContext(jobConfig.getJobContext());
-        PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
-        String actionName =
-                createSourceActionName(0, 
config.getString(CollectionConstants.PLUGIN_NAME));
-        SourceAction action =
-                new SourceAction(
-                        idGenerator.getNextId(),
-                        actionName,
-                        tuple.getLeft(),
-                        tuple.getRight(),
-                        new HashSet<>());
-        action.setParallelism(parallelism);
-        SeaTunnelRowType producedType = (SeaTunnelRowType) 
tuple.getLeft().getProducedType();
-        CatalogTable catalogTable = CatalogTableUtil.getCatalogTable(tableId, 
producedType);
-        return new Tuple2<>(catalogTable, action);
-    }
-
-    public List<SinkAction<?, ?, ?, ?>> parseSinks(
-            int configIndex,
-            List<List<Tuple2<CatalogTable, Action>>> inputVertices,
-            Config sinkConfig,
-            JobConfig jobConfig) {
-        List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
-        int spareParallelism = 
inputVertices.get(0).get(0)._2().getParallelism();
-        if (inputVertices.size() > 1) {
-            // union
-            Set<Action> inputActions =
-                    inputVertices.stream()
-                            .flatMap(Collection::stream)
-                            .map(Tuple2::_2)
-                            
.collect(Collectors.toCollection(LinkedHashSet::new));
-            checkProducedTypeEquals(inputActions);
-            SinkAction<?, ?, ?, ?> sinkAction =
-                    parseSink(
-                            configIndex,
-                            sinkConfig,
-                            jobConfig,
-                            spareParallelism,
-                            inputVertices
-                                    .get(0)
-                                    .get(0)
-                                    ._1()
-                                    .getTableSchema()
-                                    .toPhysicalRowDataType(),
-                            inputActions);
-            sinkActions.add(sinkAction);
-        } else {
-            // sink template
-            for (Tuple2<CatalogTable, Action> tableTuple : 
inputVertices.get(0)) {
-                CatalogTable catalogTable = tableTuple._1();
-                Action inputAction = tableTuple._2();
-                int parallelism = inputAction.getParallelism();
-                SinkAction<?, ?, ?, ?> sinkAction =
-                        parseSink(
-                                configIndex,
-                                sinkConfig,
-                                jobConfig,
-                                parallelism,
-                                
catalogTable.getTableSchema().toPhysicalRowDataType(),
-                                Collections.singleton(inputAction));
-                sinkActions.add(sinkAction);
-            }
-        }
-        return sinkActions;
-    }
-
-    private SinkAction<?, ?, ?, ?> parseSink(
-            int configIndex,
-            Config config,
-            JobConfig jobConfig,
-            int parallelism,
-            SeaTunnelRowType rowType,
-            Set<Action> inputActions) {
-        final ImmutablePair<
-                        SeaTunnelSink<SeaTunnelRow, Serializable, 
Serializable, Serializable>,
-                        Set<URL>>
-                tuple =
-                        ConnectorInstanceLoader.loadSinkInstance(
-                                config, jobConfig.getJobContext(), 
commonPluginJars);
-        final SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> sink =
-                tuple.getLeft();
-        // old logic: prepare(initialization) -> set job context -> set row 
type (There is a logical
-        // judgment that depends on before and after, not a simple set)
-        sink.prepare(config);
-        sink.setJobContext(jobConfig.getJobContext());
-        sink.setTypeInfo(rowType);
-        if (!isStartWithSavePoint) {
-            multipleTableJobConfigParser.handleSaveMode(sink);
-        }
-        final String actionName =
-                createSinkActionName(configIndex, 
tuple.getLeft().getPluginName());
-        final SinkAction action =
-                new SinkAction<>(
-                        idGenerator.getNextId(),
-                        actionName,
-                        new ArrayList<>(inputActions),
-                        sink,
-                        tuple.getRight(),
-                        new HashSet<>());
-        action.setParallelism(parallelism);
-        return action;
-    }
-
     static String createSourceActionName(int configIndex, String pluginName) {
         return String.format("Source[%s]-%s", configIndex, pluginName);
     }
 
-    static String createSinkActionName(int configIndex, String pluginName) {
-        return String.format("Sink[%s]-%s", configIndex, pluginName);
-    }
-
     static String createSinkActionName(int configIndex, String pluginName, 
String table) {
         return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 61df9abcfe..903db02f80 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
@@ -35,10 +36,7 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import 
org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint;
-import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.Constants;
@@ -48,7 +46,6 @@ import 
org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -64,7 +61,6 @@ import 
org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.JobPipelineCheckpointData;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
@@ -95,7 +91,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -116,7 +111,6 @@ public class MultipleTableJobConfigParser {
 
     private final ReadonlyConfig envOptions;
 
-    private final JobConfigParser fallbackParser;
     private final boolean isStartWithSavePoint;
     private final List<JobPipelineCheckpointData> pipelineCheckpoints;
 
@@ -166,8 +160,6 @@ public class MultipleTableJobConfigParser {
         this.isStartWithSavePoint = isStartWithSavePoint;
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
-        this.fallbackParser =
-                new JobConfigParser(idGenerator, commonPluginJars, this, 
isStartWithSavePoint);
         this.pipelineCheckpoints = pipelineCheckpoints;
     }
 
@@ -184,8 +176,6 @@ public class MultipleTableJobConfigParser {
         this.isStartWithSavePoint = isStartWithSavePoint;
         this.seaTunnelJobConfig = seaTunnelJobConfig;
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
-        this.fallbackParser =
-                new JobConfigParser(idGenerator, commonPluginJars, this, 
isStartWithSavePoint);
         this.pipelineCheckpoints = pipelineCheckpoints;
     }
 
@@ -347,31 +337,6 @@ public class MultipleTableJobConfigParser {
         log.info("add common jar in plugins :{}", commonPluginJars);
     }
 
-    private static <T extends Factory> boolean isFallback(
-            ClassLoader classLoader,
-            Class<T> factoryClass,
-            String factoryId,
-            Consumer<T> virtualCreator) {
-        Optional<T> factory =
-                FactoryUtil.discoverOptionalFactory(classLoader, factoryClass, 
factoryId);
-        if (!factory.isPresent()) {
-            return true;
-        }
-        try {
-            virtualCreator.accept(factory.get());
-        } catch (Exception e) {
-            if (e instanceof UnsupportedOperationException
-                    && "The Factory has not been implemented and the 
deprecated Plugin will be used."
-                            .equals(e.getMessage())) {
-                log.warn(
-                        "The Factory has not been implemented and the 
deprecated Plugin will be used.");
-                return true;
-            }
-            log.debug(ExceptionUtils.getMessage(e));
-        }
-        return false;
-    }
-
     private int getParallelism(ReadonlyConfig config) {
         return Math.max(
                 1,
@@ -388,18 +353,12 @@ public class MultipleTableJobConfigParser {
 
         final int parallelism = getParallelism(readonlyConfig);
 
-        boolean fallback =
-                isFallback(
-                        classLoader,
-                        TableSourceFactory.class,
-                        factoryId,
-                        (factory) -> factory.createSource(null));
-
-        if (fallback) {
-            Tuple2<CatalogTable, Action> tuple =
-                    fallbackParser.parseSource(sourceConfig, jobConfig, 
tableId, parallelism);
-            return new Tuple2<>(tableId, Collections.singletonList(tuple));
-        }
+        Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource =
+                pluginIdentifier -> {
+                    SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
+                            new SeaTunnelSourcePluginDiscovery();
+                    return 
sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
+                };
 
         Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2;
         if (isStartWithSavePoint && pipelineCheckpoints != null && 
!pipelineCheckpoints.isEmpty()) {
@@ -407,9 +366,16 @@ public class MultipleTableJobConfigParser {
                     getSourceCheckpoint(configIndex, factoryId);
             tuple2 =
                     FactoryUtil.restoreAndPrepareSource(
-                            readonlyConfig, classLoader, factoryId, 
checkpoint);
+                            readonlyConfig,
+                            classLoader,
+                            factoryId,
+                            checkpoint,
+                            fallbackCreateSource,
+                            null);
         } else {
-            tuple2 = FactoryUtil.createAndPrepareSource(readonlyConfig, 
classLoader, factoryId);
+            tuple2 =
+                    FactoryUtil.createAndPrepareSource(
+                            readonlyConfig, classLoader, factoryId, 
fallbackCreateSource, null);
         }
 
         Set<URL> factoryUrls = new HashSet<>();
@@ -591,16 +557,6 @@ public class MultipleTableJobConfigParser {
             }
         }
 
-        boolean fallback =
-                isFallback(
-                        classLoader,
-                        TableSinkFactory.class,
-                        factoryId,
-                        (factory) -> factory.createSink(null));
-        if (fallback) {
-            return fallbackParser.parseSinks(configIndex, inputVertices, 
sinkConfig, jobConfig);
-        }
-
         // get jar urls
         Set<URL> jarUrls = new HashSet<>();
         jarUrls.addAll(getSinkPluginJarPaths(sinkConfig));
@@ -702,9 +658,22 @@ public class MultipleTableJobConfigParser {
             String factoryId,
             int parallelism,
             int configIndex) {
+
+        Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
+                pluginIdentifier -> {
+                    SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
+                            new SeaTunnelSinkPluginDiscovery();
+                    return 
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
+                };
+
         SeaTunnelSink<?, ?, ?, ?> sink =
                 FactoryUtil.createAndPrepareSink(
-                        catalogTable, readonlyConfig, classLoader, factoryId);
+                        catalogTable,
+                        readonlyConfig,
+                        classLoader,
+                        factoryId,
+                        fallbackCreateSink,
+                        null);
         sink.setJobContext(jobConfig.getJobContext());
         SinkConfig actionConfig = new 
SinkConfig(catalogTable.getTableId().toTablePath());
         long id = idGenerator.getNextId();
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 4b62895f18..a946700c75 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
index e765822af3..0d631745eb 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.plugin.discovery;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
index 9fe8717488..a536f3a446 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.plugin.discovery.seatunnel;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
index cef4f42ab5..145b0a04b4 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.plugin.discovery.seatunnel;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -24,7 +25,6 @@ import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
index e1fbce74bb..a224066f2d 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.plugin.discovery.seatunnel;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -24,7 +25,6 @@ import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
index 606cd0d7ca..88b586d137 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.plugin.discovery.seatunnel;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
@@ -24,7 +25,6 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 
diff --git 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
index e4cffe8780..5e8f4001aa 100644
--- 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
+++ 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.plugin.discovery;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.constants.PluginType;
diff --git 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index 1bc62981e8..e8511d524b 100644
--- 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++ 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -19,10 +19,10 @@ package org.apache.seatunnel.plugin.discovery.seatunnel;
 
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;


Reply via email to