This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 99fa19d2ea [Refactor][core] Unify transformFactory creation logic 
(#8574)
99fa19d2ea is described below

commit 99fa19d2eab95b68dd79e27866233bd950efb166
Author: Guangdong Liu <804167...@qq.com>
AuthorDate: Thu Feb 6 11:36:28 2025 +0800

    [Refactor][core] Unify transformFactory creation logic (#8574)
---
 .../seatunnel/api/table/factory/FactoryUtil.java   | 41 +++++++++-
 .../seatunnel/common/constants}/EngineType.java    |  2 +-
 .../seatunnel/core/starter/enums/PluginType.java   | 35 ---------
 .../core/starter/execution/PluginUtil.java         | 87 ----------------------
 .../seatunnel/core/starter/flink/FlinkStarter.java |  2 +-
 .../core/starter/flink/SeaTunnelFlink.java         |  2 +-
 .../flink/execution/SinkExecuteProcessor.java      | 38 ++++++++--
 .../seatunnel/core/starter/flink/FlinkStarter.java |  2 +-
 .../core/starter/flink/SeaTunnelFlink.java         |  2 +-
 .../FlinkAbstractPluginExecuteProcessor.java       |  3 +-
 .../flink/execution/SinkExecuteProcessor.java      | 39 ++++++++--
 .../flink/execution/SourceExecuteProcessor.java    | 12 +--
 .../flink/execution/TransformExecuteProcessor.java | 37 ++++++---
 .../core/starter/spark/SeaTunnelSpark.java         |  2 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |  4 +-
 .../spark/execution/SinkExecuteProcessor.java      | 34 +++++----
 .../core/starter/spark/SeaTunnelSpark.java         |  2 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |  4 +-
 .../spark/execution/SinkExecuteProcessor.java      | 34 +++++----
 .../spark/execution/SourceExecuteProcessor.java    | 11 ++-
 .../SparkAbstractPluginExecuteProcessor.java       |  2 +-
 .../spark/execution/SparkRuntimeEnvironment.java   |  2 +-
 .../spark/execution/TransformExecuteProcessor.java | 36 ++++++---
 .../core/starter/seatunnel/SeaTunnelClient.java    |  2 +-
 .../core/starter/seatunnel/SeaTunnelServer.java    |  2 +-
 .../core/parse/MultipleTableJobConfigParser.java   |  3 +-
 26 files changed, 225 insertions(+), 215 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 30e7b00864..131d50852e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
@@ -37,6 +38,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -106,7 +110,10 @@ public final class FactoryUtil {
             if (fallback) {
                 source =
                         fallbackCreateSource.apply(
-                                PluginIdentifier.of("seatunnel", "source", 
factoryId));
+                                PluginIdentifier.of(
+                                        EngineType.SEATUNNEL.getEngine(),
+                                        PluginType.SOURCE.getType(),
+                                        factoryId));
                 source.prepare(options.toConfig());
 
             } else {
@@ -205,7 +212,10 @@ public final class FactoryUtil {
             if (fallback) {
                 SeaTunnelSink sink =
                         fallbackCreateSink.apply(
-                                PluginIdentifier.of("seatunnel", "sink", 
factoryId));
+                                PluginIdentifier.of(
+                                        EngineType.SEATUNNEL.getEngine(),
+                                        PluginType.SINK.getType(),
+                                        factoryId));
                 sink.prepare(config.toConfig());
                 sink.setTypeInfo(catalogTable.getSeaTunnelRowType());
 
@@ -273,6 +283,23 @@ public final class FactoryUtil {
         return 
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
     }
 
+    public static <T extends Factory> Optional<T> discoverOptionalFactory(
+            ClassLoader classLoader,
+            Class<T> factoryClass,
+            String factoryIdentifier,
+            Function<String, T> discoverOptionalFactoryFunction) {
+
+        if (discoverOptionalFactoryFunction != null) {
+            T apply = discoverOptionalFactoryFunction.apply(factoryIdentifier);
+            if (apply != null) {
+                return Optional.of(apply);
+            } else {
+                return Optional.empty();
+            }
+        }
+        return discoverOptionalFactory(classLoader, factoryClass, 
factoryIdentifier);
+    }
+
     public static <T extends Factory> Optional<T> discoverOptionalFactory(
             ClassLoader classLoader, Class<T> factoryClass, String 
factoryIdentifier) {
         final List<T> foundFactories = discoverFactories(classLoader, 
factoryClass);
@@ -436,4 +463,14 @@ public final class FactoryUtil {
         }
         return false;
     }
+
+    public static void ensureJobModeMatch(JobContext jobContext, 
SeaTunnelSource source) {
+        if (jobContext.getJobMode() == JobMode.BATCH
+                && source.getBoundedness()
+                        == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "'%s' source don't support off-line job.", 
source.getPluginName()));
+        }
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java
similarity index 97%
rename from 
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
rename to 
seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java
index 355fa838d4..6f5c79c1ed 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.enums;
+package org.apache.seatunnel.common.constants;
 
 /** Engine type enum */
 public enum EngineType {
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java
deleted file mode 100644
index 5fdec8b71b..0000000000
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.enums;
-
-/** Plugin type enum */
-public enum PluginType {
-    SOURCE("source"),
-    TRANSFORM("transform"),
-    SINK("sink");
-
-    private final String type;
-
-    PluginType(String type) {
-        this.type = type;
-    }
-
-    public String getType() {
-        return type;
-    }
-}
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
deleted file mode 100644
index b2b47854e3..0000000000
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.execution;
-
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PluginIdentifier;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.FactoryException;
-import org.apache.seatunnel.common.constants.JobMode;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-
-import java.net.URL;
-import java.util.List;
-import java.util.Optional;
-
-import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
-
-/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
-@SuppressWarnings("rawtypes")
-public class PluginUtil {
-
-    protected static final String ENGINE_TYPE = "seatunnel";
-
-    public static Optional<? extends Factory> createTransformFactory(
-            SeaTunnelFactoryDiscovery factoryDiscovery,
-            SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
-            Config transformConfig,
-            List<URL> pluginJars) {
-        PluginIdentifier pluginIdentifier =
-                PluginIdentifier.of(
-                        ENGINE_TYPE, "transform", 
transformConfig.getString(PLUGIN_NAME.key()));
-        pluginJars.addAll(
-                
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-        try {
-            return 
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
-        } catch (FactoryException e) {
-            return Optional.empty();
-        }
-    }
-
-    public static Optional<? extends Factory> createSinkFactory(
-            SeaTunnelFactoryDiscovery factoryDiscovery,
-            SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
-            Config sinkConfig,
-            List<URL> pluginJars) {
-        PluginIdentifier pluginIdentifier =
-                PluginIdentifier.of(ENGINE_TYPE, "sink", 
sinkConfig.getString(PLUGIN_NAME.key()));
-        pluginJars.addAll(
-                
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-        try {
-            return 
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
-        } catch (FactoryException e) {
-            return Optional.empty();
-        }
-    }
-
-    public static void ensureJobModeMatch(JobContext jobContext, 
SeaTunnelSource source) {
-        if (jobContext.getJobMode() == JobMode.BATCH
-                && source.getBoundedness()
-                        == 
org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "'%s' source don't support off-line job.", 
source.getPluginName()));
-        }
-    }
-}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index e9d0ba7df2..7d106abbce 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.core.starter.flink;
 
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.enums.MasterType;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
index 8d1b434801..4356e85bfb 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.core.starter.flink;
 
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 4466844536..b700af5b46 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
@@ -35,9 +36,10 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -56,6 +58,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 @Slf4j
@@ -77,14 +80,34 @@ public class SinkExecuteProcessor
                 new SeaTunnelFactoryDiscovery(TableSinkFactory.class, 
ADD_URL_TO_CLASSLOADER);
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+        Function<String, TableSinkFactory> discoverOptionalFactoryFunction =
+                pluginName ->
+                        (TableSinkFactory)
+                                factoryDiscovery
+                                        .createOptionalPluginInstance(
+                                                PluginIdentifier.of(
+                                                        
EngineType.SEATUNNEL.getEngine(),
+                                                        
PluginType.SINK.getType(),
+                                                        pluginName))
+                                        .orElse(null);
+
         return pluginConfigs.stream()
                 .map(
-                        sinkConfig ->
-                                PluginUtil.createSinkFactory(
-                                        factoryDiscovery,
-                                        sinkPluginDiscovery,
-                                        sinkConfig,
-                                        jarPaths))
+                        sinkConfig -> {
+                            jarPaths.addAll(
+                                    sinkPluginDiscovery.getPluginJarPaths(
+                                            Lists.newArrayList(
+                                                    PluginIdentifier.of(
+                                                            
EngineType.SEATUNNEL.getEngine(),
+                                                            
PluginType.SINK.getType(),
+                                                            
sinkConfig.getString(
+                                                                    
PLUGIN_NAME.key())))));
+                            return discoverOptionalFactory(
+                                    classLoader,
+                                    TableSinkFactory.class,
+                                    sinkConfig.getString(PLUGIN_NAME.key()),
+                                    discoverOptionalFactoryFunction);
+                        })
                 .distinct()
                 .collect(Collectors.toList());
     }
@@ -95,7 +118,6 @@ public class SinkExecuteProcessor
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
         DataStreamTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
                 sinkPluginDiscovery::createPluginInstance;
         for (int i = 0; i < plugins.size(); i++) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 06cfd5f449..54d1984bf4 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.core.starter.flink;
 
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.enums.MasterType;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
index 1595da686a..dbae7e5fe9 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.core.starter.flink;
 
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 00614b1d88..aef7d57416 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -36,8 +36,6 @@ import static 
org.apache.seatunnel.api.common.CommonOptions.PLUGIN_INPUT;
 public abstract class FlinkAbstractPluginExecuteProcessor<T>
         implements PluginExecuteProcessor<DataStreamTableInfo, 
FlinkRuntimeEnvironment> {
 
-    protected static final String ENGINE_TYPE = "seatunnel";
-
     protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER 
=
             (classLoader, url) -> {
                 if 
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
@@ -57,6 +55,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
     protected JobContext jobContext;
     protected final List<T> plugins;
     protected final Config envConfig;
+    protected final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
 
     protected FlinkAbstractPluginExecuteProcessor(
             List<URL> jarPaths,
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index a82d2392e6..d6d8518a28 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
@@ -35,9 +36,10 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSink;
@@ -57,6 +59,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
 
 @Slf4j
 @SuppressWarnings("unchecked,rawtypes")
@@ -74,18 +77,39 @@ public class SinkExecuteProcessor
     @Override
     protected List<Optional<? extends Factory>> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+
         SeaTunnelFactoryDiscovery factoryDiscovery =
                 new SeaTunnelFactoryDiscovery(TableSinkFactory.class, 
ADD_URL_TO_CLASSLOADER);
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
+        Function<String, TableSinkFactory> discoverOptionalFactoryFunction =
+                pluginName ->
+                        (TableSinkFactory)
+                                factoryDiscovery
+                                        .createOptionalPluginInstance(
+                                                PluginIdentifier.of(
+                                                        
EngineType.SEATUNNEL.getEngine(),
+                                                        
PluginType.SINK.getType(),
+                                                        pluginName))
+                                        .orElse(null);
+
         return pluginConfigs.stream()
                 .map(
-                        sinkConfig ->
-                                PluginUtil.createSinkFactory(
-                                        factoryDiscovery,
-                                        sinkPluginDiscovery,
-                                        sinkConfig,
-                                        jarPaths))
+                        sinkConfig -> {
+                            jarPaths.addAll(
+                                    sinkPluginDiscovery.getPluginJarPaths(
+                                            Lists.newArrayList(
+                                                    PluginIdentifier.of(
+                                                            
EngineType.SEATUNNEL.getEngine(),
+                                                            
PluginType.SINK.getType(),
+                                                            
sinkConfig.getString(
+                                                                    
PLUGIN_NAME.key())))));
+                            return discoverOptionalFactory(
+                                    classLoader,
+                                    TableSinkFactory.class,
+                                    sinkConfig.getString(PLUGIN_NAME.key()),
+                                    discoverOptionalFactoryFunction);
+                        })
                 .distinct()
                 .collect(Collectors.toList());
     }
@@ -96,7 +120,6 @@ public class SinkExecuteProcessor
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
                 new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
         DataStreamTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
                 sinkPluginDiscovery::createPluginInstance;
         for (int i = 0; i < plugins.size(); i++) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index d637f2256b..deafcc6cb2 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -30,7 +30,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -53,12 +54,11 @@ import java.util.function.Function;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
-import static 
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch;
 
 @Slf4j
 @SuppressWarnings("unchecked,rawtypes")
 public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {
-    private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
 
     public SourceExecuteProcessor(
             List<URL> jarPaths,
@@ -113,14 +113,16 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier =
                     PluginIdentifier.of(
-                            ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME.key()));
+                            EngineType.SEATUNNEL.getEngine(),
+                            PluginType.SOURCE.getType(),
+                            sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
                     
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
 
             Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> source =
                     FactoryUtil.createAndPrepareSource(
                             ReadonlyConfig.fromConfig(sourceConfig),
-                            Thread.currentThread().getContextClassLoader(),
+                            classLoader,
                             pluginIdentifier.getPluginName(),
                             fallbackCreateSource,
                             (TableSourceFactory)
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 615876c417..fa1380cd00 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,9 +17,11 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
@@ -28,8 +30,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 
@@ -49,6 +52,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
 
 @SuppressWarnings("unchecked,rawtypes")
@@ -66,23 +70,32 @@ public class TransformExecuteProcessor
     @Override
     protected List<TableTransformFactory> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs) {
-
-        SeaTunnelFactoryDiscovery factoryDiscovery =
-                new SeaTunnelFactoryDiscovery(TableTransformFactory.class, 
ADD_URL_TO_CLASSLOADER);
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
                 new SeaTunnelTransformPluginDiscovery();
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableTransformFactory.class, 
ADD_URL_TO_CLASSLOADER);
         return pluginConfigs.stream()
                 .map(
-                        transformConfig ->
-                                PluginUtil.createTransformFactory(
-                                        factoryDiscovery,
-                                        transformPluginDiscovery,
-                                        transformConfig,
-                                        jarPaths))
+                        transformConfig -> {
+                            jarPaths.addAll(
+                                    transformPluginDiscovery.getPluginJarPaths(
+                                            Lists.newArrayList(
+                                                    PluginIdentifier.of(
+                                                            
EngineType.SEATUNNEL.getEngine(),
+                                                            
PluginType.TRANSFORM.getType(),
+                                                            
transformConfig.getString(
+                                                                    
PLUGIN_NAME.key())))));
+                            return Optional.of(
+                                    (TableTransformFactory)
+                                            
factoryDiscovery.createPluginInstance(
+                                                    PluginIdentifier.of(
+                                                            
EngineType.SEATUNNEL.getEngine(),
+                                                            
PluginType.TRANSFORM.getType(),
+                                                            
transformConfig.getString(
+                                                                    
PLUGIN_NAME.key()))));
+                        })
                 .distinct()
-                .filter(Optional::isPresent)
                 .map(Optional::get)
-                .map(e -> (TableTransformFactory) e)
                 .collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
index ca7b2ed4be..fd7dea9287 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.core.starter.spark;
 
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index e9ee482e9c..3325ae38a5 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
-import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 import org.apache.seatunnel.core.starter.utils.CompressionUtils;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 6e22576c91..5a9ff3d2e4 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
@@ -33,10 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -56,10 +57,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
 
 public class SinkExecuteProcessor
         extends SparkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
-    private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
     protected SinkExecuteProcessor(
             SparkRuntimeEnvironment sparkRuntimeEnvironment,
@@ -71,19 +72,27 @@ public class SinkExecuteProcessor
     @Override
     protected List<Optional<? extends Factory>> initializePlugins(
             List<? extends Config> pluginConfigs) {
-        SeaTunnelFactoryDiscovery factoryDiscovery =
-                new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
+        SeaTunnelFactoryDiscovery sinkPluginDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
         List<Optional<? extends Factory>> sinks =
                 pluginConfigs.stream()
                         .map(
-                                sinkConfig ->
-                                        PluginUtil.createSinkFactory(
-                                                factoryDiscovery,
-                                                sinkPluginDiscovery,
-                                                sinkConfig,
-                                                pluginJars))
+                                sinkConfig -> {
+                                    pluginJars.addAll(
+                                            
sinkPluginDiscovery.getPluginJarPaths(
+                                                    Lists.newArrayList(
+                                                            
PluginIdentifier.of(
+                                                                    
EngineType.SEATUNNEL
+                                                                            
.getEngine(),
+                                                                    
PluginType.SINK.getType(),
+                                                                    
sinkConfig.getString(
+                                                                            
PLUGIN_NAME.key())))));
+                                    return discoverOptionalFactory(
+                                            classLoader,
+                                            TableSinkFactory.class,
+                                            
sinkConfig.getString(PLUGIN_NAME.key()));
+                                })
                         .distinct()
                         .collect(Collectors.toList());
         sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -94,7 +103,6 @@ public class SinkExecuteProcessor
     public List<DatasetTableInfo> execute(List<DatasetTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         DatasetTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
         Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
                 sinkPluginDiscovery::createPluginInstance;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
index 9b3fde6fd1..e984e19182 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.core.starter.spark;
 
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index dcd6a804b2..3411b785d1 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.Starter;
-import org.apache.seatunnel.core.starter.enums.EngineType;
-import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 import org.apache.seatunnel.core.starter.utils.CompressionUtils;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 74b0e1c1f1..19a07878ed 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.CommonOptions;
@@ -33,10 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -59,11 +60,11 @@ import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;
 
 @Slf4j
 public class SinkExecuteProcessor
         extends SparkAbstractPluginExecuteProcessor<Optional<? extends 
Factory>> {
-    private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
     protected SinkExecuteProcessor(
             SparkRuntimeEnvironment sparkRuntimeEnvironment,
@@ -75,19 +76,27 @@ public class SinkExecuteProcessor
     @Override
     protected List<Optional<? extends Factory>> initializePlugins(
             List<? extends Config> pluginConfigs) {
-        SeaTunnelFactoryDiscovery factoryDiscovery =
-                new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
-        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
+        SeaTunnelFactoryDiscovery sinkPluginDiscovery =
+                new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
         List<Optional<? extends Factory>> sinks =
                 pluginConfigs.stream()
                         .map(
-                                sinkConfig ->
-                                        PluginUtil.createSinkFactory(
-                                                factoryDiscovery,
-                                                sinkPluginDiscovery,
-                                                sinkConfig,
-                                                new ArrayList<>()))
+                                sinkConfig -> {
+                                    pluginJars.addAll(
+                                            
sinkPluginDiscovery.getPluginJarPaths(
+                                                    Lists.newArrayList(
+                                                            
PluginIdentifier.of(
+                                                                    
EngineType.SEATUNNEL
+                                                                            
.getEngine(),
+                                                                    
PluginType.SINK.getType(),
+                                                                    
sinkConfig.getString(
+                                                                            
PLUGIN_NAME.key())))));
+                                    return discoverOptionalFactory(
+                                            classLoader,
+                                            TableSinkFactory.class,
+                                            
sinkConfig.getString(PLUGIN_NAME.key()));
+                                })
                         .distinct()
                         .collect(Collectors.toList());
         sparkRuntimeEnvironment.registerPlugin(pluginJars);
@@ -98,7 +107,6 @@ public class SinkExecuteProcessor
     public List<DatasetTableInfo> execute(List<DatasetTableInfo> 
upstreamDataStreams)
             throws TaskExecuteException {
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         DatasetTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
         Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
                 sinkPluginDiscovery::createPluginInstance;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 50b3a88814..5ccbb59cbd 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -30,6 +30,8 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -52,11 +54,10 @@ import java.util.function.Function;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
-import static 
org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch;
 
 @SuppressWarnings("rawtypes")
 public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<SourceTableInfo> {
-    private static final String PLUGIN_TYPE = "source";
     private Map envOption = new HashMap<String, String>();
 
     public SourceExecuteProcessor(
@@ -124,13 +125,15 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier =
                     PluginIdentifier.of(
-                            ENGINE_TYPE, PLUGIN_TYPE, 
sourceConfig.getString(PLUGIN_NAME.key()));
+                            EngineType.SEATUNNEL.getEngine(),
+                            PluginType.SOURCE.getType(),
+                            sourceConfig.getString(PLUGIN_NAME.key()));
             jars.addAll(
                     
sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> source =
                     FactoryUtil.createAndPrepareSource(
                             ReadonlyConfig.fromConfig(sourceConfig),
-                            Thread.currentThread().getContextClassLoader(),
+                            classLoader,
                             pluginIdentifier.getPluginName(),
                             fallbackCreateSource,
                             null);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
index d9de016446..42b9155483 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
@@ -48,7 +48,7 @@ public abstract class SparkAbstractPluginExecuteProcessor<T>
     protected final List<? extends Config> pluginConfigs;
     protected final JobContext jobContext;
     protected final List<T> plugins;
-    protected static final String ENGINE_TYPE = "seatunnel";
+    protected final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
 
     protected SparkAbstractPluginExecuteProcessor(
             SparkRuntimeEnvironment sparkRuntimeEnvironment,
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
index 7e31ca463b..e9bc4e07c9 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 
 import org.apache.spark.SparkConf;
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 492af1ad73..f10135e724 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,9 +17,11 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -29,8 +31,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.common.constants.EngineType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
@@ -56,6 +59,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
 
 @Slf4j
@@ -71,25 +75,39 @@ public class TransformExecuteProcessor
 
     @Override
     protected List<TableTransformFactory> initializePlugins(List<? extends 
Config> pluginConfigs) {
+
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
                 new SeaTunnelTransformPluginDiscovery();
 
         SeaTunnelFactoryDiscovery factoryDiscovery =
                 new SeaTunnelFactoryDiscovery(TableTransformFactory.class);
+
         List<URL> pluginJars = new ArrayList<>();
         List<TableTransformFactory> transforms =
                 pluginConfigs.stream()
                         .map(
-                                transformConfig ->
-                                        PluginUtil.createTransformFactory(
-                                                factoryDiscovery,
-                                                transformPluginDiscovery,
-                                                transformConfig,
-                                                new ArrayList<>()))
+                                transformConfig -> {
+                                    pluginJars.addAll(
+                                            
transformPluginDiscovery.getPluginJarPaths(
+                                                    Lists.newArrayList(
+                                                            
PluginIdentifier.of(
+                                                                    
EngineType.SEATUNNEL
+                                                                            
.getEngine(),
+                                                                    
PluginType.TRANSFORM.getType(),
+                                                                    
transformConfig.getString(
+                                                                            
PLUGIN_NAME.key())))));
+                                    return Optional.of(
+                                            (TableTransformFactory)
+                                                    
factoryDiscovery.createPluginInstance(
+                                                            
PluginIdentifier.of(
+                                                                    
EngineType.SEATUNNEL
+                                                                            
.getEngine(),
+                                                                    
PluginType.TRANSFORM.getType(),
+                                                                    
transformConfig.getString(
+                                                                            
PLUGIN_NAME.key()))));
+                                })
                         .distinct()
-                        .filter(Optional::isPresent)
                         .map(Optional::get)
-                        .map(e -> (TableTransformFactory) e)
                         .collect(Collectors.toList());
         sparkRuntimeEnvironment.registerPlugin(pluginJars);
         return transforms;
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
index 7a37fd340c..308ef35905 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.core.starter.seatunnel;
 
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
index 96a3e32e51..a340c0acb6 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.core.starter.seatunnel;
 
+import org.apache.seatunnel.common.constants.EngineType;
 import org.apache.seatunnel.core.starter.SeaTunnel;
-import org.apache.seatunnel.core.starter.enums.EngineType;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 903db02f80..6ec3eaf0cf 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -46,7 +46,6 @@ import 
org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
@@ -386,7 +385,7 @@ public class MultipleTableJobConfigParser {
         String actionName = 
JobConfigParser.createSourceActionName(configIndex, factoryId);
         SeaTunnelSource<Object, SourceSplit, Serializable> source = 
tuple2._1();
         source.setJobContext(jobConfig.getJobContext());
-        PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
+        FactoryUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
         SourceAction<Object, SourceSplit, Serializable> action =
                 new SourceAction<>(id, actionName, tuple2._1(), factoryUrls, 
new HashSet<>());
         action.setParallelism(parallelism);

Reply via email to