This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d46cf16e5a [Feature] Split transform and move jar into connectors 
directory (#7218)
d46cf16e5a is described below

commit d46cf16e5a4bb0e4820feaf4dd03e7e9038d6281
Author: Jarvis <liunaijie1...@163.com>
AuthorDate: Sat Aug 10 23:47:12 2024 +0800

    [Feature] Split transform and move jar into connectors directory (#7218)
---
 plugin-mapping.properties                          | 13 +++++-
 .../core/starter/execution/PluginUtil.java         | 16 +++----
 .../flink/execution/TransformExecuteProcessor.java | 14 +++++-
 .../spark/execution/TransformExecuteProcessor.java | 11 ++++-
 .../src/main/assembly/assembly-bin-ci.xml          | 48 ++++++++------------
 seatunnel-dist/src/main/assembly/assembly-bin.xml  |  6 +--
 .../seatunnel/e2e/common/util/ContainerUtil.java   |  4 +-
 .../core/parse/MultipleTableJobConfigParser.java   | 36 ++++++++++-----
 .../SeaTunnelTransformPluginDiscovery.java         |  2 +-
 .../common/AbstractCatalogSupportTransform.java    | 16 ++++++-
 .../common/AbstractSeaTunnelTransform.java         | 51 ----------------------
 .../seatunnel/transform/sql/SQLTransform.java      | 15 ++-----
 12 files changed, 108 insertions(+), 124 deletions(-)

diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 1942f875d7..579bf2dac0 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -129,4 +129,15 @@ seatunnel.source.ObsFile = connector-file-obs
 seatunnel.sink.ObsFile = connector-file-obs
 seatunnel.source.Milvus = connector-milvus
 seatunnel.sink.Milvus = connector-milvus
-seatunnel.sink.ActiveMQ = connector-activemq
\ No newline at end of file
+seatunnel.sink.ActiveMQ = connector-activemq
+
+seatunnel.transform.Sql = seatunnel-transforms-v2
+seatunnel.transform.FieldMapper = seatunnel-transforms-v2
+seatunnel.transform.Filter = seatunnel-transforms-v2
+seatunnel.transform.FilterRowKind = seatunnel-transforms-v2
+seatunnel.transform.JsonPath = seatunnel-transforms-v2
+seatunnel.transform.Replace = seatunnel-transforms-v2
+seatunnel.transform.Split = seatunnel-transforms-v2
+seatunnel.transform.Copy = seatunnel-transforms-v2
+seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
+seatunnel.transform.LLM = seatunnel-transforms-v2
\ No newline at end of file
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
index 0dc4209a8b..166e581e2d 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java
@@ -31,7 +31,6 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryException;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
@@ -49,7 +48,6 @@ import java.util.Optional;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
 import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
-import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
 
 /** The util used for Spark/Flink to create to SeaTunnelSource etc. */
 public class PluginUtil {
@@ -130,21 +128,21 @@ public class PluginUtil {
         return source;
     }
 
-    public static TableTransformFactory createTransformFactory(
+    public static Optional<? extends Factory> createTransformFactory(
+            SeaTunnelFactoryDiscovery factoryDiscovery,
             SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
             Config transformConfig,
             List<URL> pluginJars) {
         PluginIdentifier pluginIdentifier =
                 PluginIdentifier.of(
                         ENGINE_TYPE, "transform", 
transformConfig.getString(PLUGIN_NAME.key()));
-        final ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(transformConfig);
-        final String factoryId = readonlyConfig.get(PLUGIN_NAME);
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-        final TableTransformFactory factory =
-                discoverFactory(classLoader, TableTransformFactory.class, 
factoryId);
         pluginJars.addAll(
                 
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-        return factory;
+        try {
+            return 
factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
+        } catch (FactoryException e) {
+            return Optional.empty();
+        }
     }
 
     public static Optional<? extends Factory> createSinkFactory(
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index d91bb9d3da..1ff2cf6437 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
 import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
@@ -41,6 +42,7 @@ import org.apache.flink.types.Row;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
@@ -59,15 +61,23 @@ public class TransformExecuteProcessor
     @Override
     protected List<TableTransformFactory> initializePlugins(
             List<URL> jarPaths, List<? extends Config> pluginConfigs) {
+
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableTransformFactory.class, 
ADD_URL_TO_CLASSLOADER);
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
                 new SeaTunnelTransformPluginDiscovery();
-
         return pluginConfigs.stream()
                 .map(
                         transformConfig ->
                                 PluginUtil.createTransformFactory(
-                                        transformPluginDiscovery, 
transformConfig, jarPaths))
+                                        factoryDiscovery,
+                                        transformPluginDiscovery,
+                                        transformConfig,
+                                        jarPaths))
                 .distinct()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .map(e -> (TableTransformFactory) e)
                 .collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index bc7cd5cdbe..fc4a9e00d0 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 import 
org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
 import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
@@ -50,6 +51,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
@@ -69,16 +71,23 @@ public class TransformExecuteProcessor
     protected List<TableTransformFactory> initializePlugins(List<? extends 
Config> pluginConfigs) {
         SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
                 new SeaTunnelTransformPluginDiscovery();
+
+        SeaTunnelFactoryDiscovery factoryDiscovery =
+                new SeaTunnelFactoryDiscovery(TableTransformFactory.class);
         List<URL> pluginJars = new ArrayList<>();
         List<TableTransformFactory> transforms =
                 pluginConfigs.stream()
                         .map(
                                 transformConfig ->
                                         PluginUtil.createTransformFactory(
+                                                factoryDiscovery,
                                                 transformPluginDiscovery,
                                                 transformConfig,
-                                                pluginJars))
+                                                new ArrayList<>()))
                         .distinct()
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .map(e -> (TableTransformFactory) e)
                         .collect(Collectors.toList());
         sparkRuntimeEnvironment.registerPlugin(pluginJars);
         return transforms;
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml 
b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index cc48ac86a2..4510579d81 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -140,7 +140,7 @@
             <scope>provided</scope>
         </dependencySet>
 
-        <!-- ============ Connectors Jars ============  -->
+        <!-- ============ Connectors Jars And Transforms V2 Jar ============  
-->
         <!-- SeaTunnel connectors -->
         <dependencySet>
             <useProjectArtifact>false</useProjectArtifact>
@@ -148,6 +148,7 @@
             <unpack>false</unpack>
             <includes>
                 <include>org.apache.seatunnel:connector-*:jar</include>
+                
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
             </includes>
             <excludes>
                 <exclude>org.apache.seatunnel:connector-common</exclude>
@@ -160,36 +161,7 @@
             <scope>provided</scope>
         </dependencySet>
 
-        <!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 
Uber Jar ============  -->
-        <dependencySet>
-            <useProjectArtifact>false</useProjectArtifact>
-            <useTransitiveDependencies>true</useTransitiveDependencies>
-            <unpack>false</unpack>
-            <includes>
-                
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
-                <include>org.apache.hadoop:hadoop-aws:jar</include>
-                <include>com.amazonaws:aws-java-sdk-bundle:jar</include>
-                
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
-                <!--Add hadoop aliyun jar -->
-                <include>org.apache.hadoop:hadoop-aliyun:jar</include>
-                <include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
-                <include>org.jdom:jdom:jar</include>
-
-                <!--Add netty buffer jar -->
-                <include>io.netty:netty-buffer:jar</include>
-                <include>io.netty:netty-common:jar</include>
-
-                <!--Add hive exec jar -->
-                <include>org.apache.hive:hive-exec:jar</include>
-                <include>org.apache.hive:hive-service:jar</include>
-                <include>org.apache.thrift:libfb303:jar</include>
-            </includes>
-            
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
-            <outputDirectory>/lib</outputDirectory>
-            <scope>provided</scope>
-        </dependencySet>
-
-        <!-- =================== JDBC Connector Drivers ===================  
-->
+        <!-- =================== JDBC Connector Drivers  And SeaTunnel Hadoop3 
Uber Jar ===================  -->
         <dependencySet>
             <useProjectArtifact>false</useProjectArtifact>
             <useTransitiveDependencies>true</useTransitiveDependencies>
@@ -209,6 +181,20 @@
                 <include>com.amazon.redshift:redshift-jdbc42:jar</include>
                 <include>net.snowflake.snowflake-jdbc:jar</include>
                 <include>com.xugudb:xugu-jdbc:jar</include>
+                <include>org.apache.hadoop:hadoop-aws:jar</include>
+                <include>com.amazonaws:aws-java-sdk-bundle:jar</include>
+                
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
+                <!--Add hadoop aliyun jar -->
+                <include>org.apache.hadoop:hadoop-aliyun:jar</include>
+                <include>com.aliyun.oss:aliyun-sdk-oss:jar</include>
+                <include>org.jdom:jdom:jar</include>
+                <!--Add netty buffer jar -->
+                <include>io.netty:netty-buffer:jar</include>
+                <include>io.netty:netty-common:jar</include>
+                <!--Add hive exec jar -->
+                <include>org.apache.hive:hive-exec:jar</include>
+                <include>org.apache.hive:hive-service:jar</include>
+                <include>org.apache.thrift:libfb303:jar</include>
             </includes>
             
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
             <outputDirectory>/lib</outputDirectory>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml 
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index 30fc5a6336..f16841f7a9 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -161,13 +161,12 @@
             <scope>provided</scope>
         </dependencySet>
 
-        <!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 
Uber Jar============  -->
+        <!-- ============ SeaTunnel Hadoop3 Uber Jar============  -->
         <dependencySet>
             <useProjectArtifact>false</useProjectArtifact>
             <useTransitiveDependencies>true</useTransitiveDependencies>
             <unpack>false</unpack>
             <includes>
-                
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
                 
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
             </includes>
             
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
@@ -175,7 +174,7 @@
             <scope>provided</scope>
         </dependencySet>
 
-        <!-- ============ Connectors Jars ============  -->
+        <!-- ============ Connectors Jars And Transforms V2 Jar ============  
-->
         <!-- SeaTunnel connectors for Demo -->
         <dependencySet>
             <useProjectArtifact>false</useProjectArtifact>
@@ -184,6 +183,7 @@
             <includes>
                 <include>org.apache.seatunnel:connector-fake:jar</include>
                 <include>org.apache.seatunnel:connector-console:jar</include>
+                
<include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include>
             </includes>
             <outputDirectory>/connectors</outputDirectory>
             <scope>provided</scope>
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 1c590bb69a..6c6a8e5cdd 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -195,13 +195,13 @@ public final class ContainerUtil {
                 MountableFile.forHostPath(startJarPath),
                 Paths.get(seatunnelHomeInContainer, "starter", 
startJarName).toString());
 
-        // copy lib
+        // copy transform
         String transformJar = "seatunnel-transforms-v2.jar";
         Path transformJarPath =
                 Paths.get(PROJECT_ROOT_PATH, "seatunnel-transforms-v2", 
"target", transformJar);
         container.withCopyFileToContainer(
                 MountableFile.forHostPath(transformJarPath),
-                Paths.get(seatunnelHomeInContainer, "lib", 
transformJar).toString());
+                Paths.get(seatunnelHomeInContainer, "connectors", 
transformJar).toString());
 
         // copy bin
         final String startBinPath = startModulePath + File.separator + 
"src/main/bin/";
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 40a6640c35..d02a76a4c5 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -182,7 +182,7 @@ public class MultipleTableJobConfigParser {
                 TypesafeConfigUtils.getConfigList(
                         seaTunnelJobConfig, "sink", Collections.emptyList());
 
-        List<URL> connectorJars = getConnectorJarList(sourceConfigs, 
sinkConfigs);
+        List<URL> connectorJars = getConnectorJarList(sourceConfigs, 
transformConfigs, sinkConfigs);
         if (!commonPluginJars.isEmpty()) {
             connectorJars.addAll(commonPluginJars);
         }
@@ -238,18 +238,32 @@ public class MultipleTableJobConfigParser {
     }
 
     private List<URL> getConnectorJarList(
-            List<? extends Config> sourceConfigs, List<? extends Config> 
sinkConfigs) {
+            List<? extends Config> sourceConfigs,
+            List<? extends Config> transformConfigs,
+            List<? extends Config> sinkConfigs) {
         List<PluginIdentifier> factoryIds =
                 Stream.concat(
-                                sourceConfigs.stream()
-                                        .map(ConfigParserUtil::getFactoryId)
-                                        .map(
-                                                factory ->
-                                                        PluginIdentifier.of(
-                                                                
CollectionConstants
-                                                                        
.SEATUNNEL_PLUGIN,
-                                                                
CollectionConstants.SOURCE_PLUGIN,
-                                                                factory)),
+                                Stream.concat(
+                                        sourceConfigs.stream()
+                                                
.map(ConfigParserUtil::getFactoryId)
+                                                .map(
+                                                        factory ->
+                                                                
PluginIdentifier.of(
+                                                                        
CollectionConstants
+                                                                               
 .SEATUNNEL_PLUGIN,
+                                                                        
CollectionConstants
+                                                                               
 .SOURCE_PLUGIN,
+                                                                        
factory)),
+                                        transformConfigs.stream()
+                                                
.map(ConfigParserUtil::getFactoryId)
+                                                .map(
+                                                        factory ->
+                                                                
PluginIdentifier.of(
+                                                                        
CollectionConstants
+                                                                               
 .SEATUNNEL_PLUGIN,
+                                                                        
CollectionConstants
+                                                                               
 .TRANSFORM_PLUGIN,
+                                                                        
factory))),
                                 sinkConfigs.stream()
                                         .map(ConfigParserUtil::getFactoryId)
                                         .map(
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
index 445bf14628..606cd0d7ca 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java
@@ -34,7 +34,7 @@ import java.util.List;
 public class SeaTunnelTransformPluginDiscovery extends 
AbstractPluginDiscovery<SeaTunnelTransform> {
 
     public SeaTunnelTransformPluginDiscovery() {
-        super(Common.libDir());
+        super(Common.connectorDir());
     }
 
     @Override
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index 5670bcc129..632d3af1e4 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.transform.common;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 
 import lombok.NonNull;
 
-public abstract class AbstractCatalogSupportTransform extends 
AbstractSeaTunnelTransform {
+public abstract class AbstractCatalogSupportTransform implements 
SeaTunnelTransform<SeaTunnelRow> {
     protected CatalogTable inputCatalogTable;
 
     protected volatile CatalogTable outputCatalogTable;
@@ -32,6 +34,18 @@ public abstract class AbstractCatalogSupportTransform 
extends AbstractSeaTunnelT
         this.inputCatalogTable = inputCatalogTable;
     }
 
+    @Override
+    public SeaTunnelRow map(SeaTunnelRow row) {
+        return transformRow(row);
+    }
+
+    /**
+     * Outputs transformed row data.
+     *
+     * @param inputRow upstream input row data
+     */
+    protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
+
     @Override
     public CatalogTable getProducedCatalogTable() {
         if (outputCatalogTable == null) {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
deleted file mode 100644
index 1892881c27..0000000000
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.transform.common;
-
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-
-public abstract class AbstractSeaTunnelTransform implements 
SeaTunnelTransform<SeaTunnelRow> {
-
-    protected String inputTableName;
-    protected SeaTunnelRowType inputRowType;
-
-    protected SeaTunnelRowType outputRowType;
-
-    @Override
-    public SeaTunnelRow map(SeaTunnelRow row) {
-        return transformRow(row);
-    }
-
-    /**
-     * Outputs transformed row data.
-     *
-     * @param inputRow upstream input row data
-     */
-    protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
-
-    @Override
-    public CatalogTable getProducedCatalogTable() {
-        throw new UnsupportedOperationException(
-                String.format(
-                        "Connector %s must implement 
TableTransformFactory.createTransform method",
-                        getPluginName()));
-    }
-}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
index a9d04b0739..00316bba8e 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java
@@ -62,6 +62,8 @@ public class SQLTransform extends 
AbstractCatalogSupportTransform {
 
     private transient SQLEngine sqlEngine;
 
+    private final String inputTableName;
+
     public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable 
catalogTable) {
         super(catalogTable);
         this.query = config.get(KEY_QUERY);
@@ -77,15 +79,6 @@ public class SQLTransform extends 
AbstractCatalogSupportTransform {
         } else {
             this.inputTableName = catalogTable.getTableId().getTableName();
         }
-        List<Column> columns = catalogTable.getTableSchema().getColumns();
-        String[] fieldNames = new String[columns.size()];
-        SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType<?>[columns.size()];
-        for (int i = 0; i < columns.size(); i++) {
-            Column column = columns.get(i);
-            fieldNames[i] = column.getName();
-            fieldTypes[i] = column.getDataType();
-        }
-        this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes);
     }
 
     @Override
@@ -98,8 +91,8 @@ public class SQLTransform extends 
AbstractCatalogSupportTransform {
         sqlEngine = SQLEngineFactory.getSQLEngine(engineType);
         sqlEngine.init(
                 inputTableName,
-                inputCatalogTable != null ? 
inputCatalogTable.getTableId().getTableName() : null,
-                inputRowType,
+                inputCatalogTable.getTableId().getTableName(),
+                inputCatalogTable.getSeaTunnelRowType(),
                 query);
     }
 

Reply via email to