This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new d46cf16e5a [Feature] Split transform and move jar into connectors directory (#7218) d46cf16e5a is described below commit d46cf16e5a4bb0e4820feaf4dd03e7e9038d6281 Author: Jarvis <liunaijie1...@163.com> AuthorDate: Sat Aug 10 23:47:12 2024 +0800 [Feature] Split transform and move jar into connectors directory (#7218) --- plugin-mapping.properties | 13 +++++- .../core/starter/execution/PluginUtil.java | 16 +++---- .../flink/execution/TransformExecuteProcessor.java | 14 +++++- .../spark/execution/TransformExecuteProcessor.java | 11 ++++- .../src/main/assembly/assembly-bin-ci.xml | 48 ++++++++------------ seatunnel-dist/src/main/assembly/assembly-bin.xml | 6 +-- .../seatunnel/e2e/common/util/ContainerUtil.java | 4 +- .../core/parse/MultipleTableJobConfigParser.java | 36 ++++++++++----- .../SeaTunnelTransformPluginDiscovery.java | 2 +- .../common/AbstractCatalogSupportTransform.java | 16 ++++++- .../common/AbstractSeaTunnelTransform.java | 51 ---------------------- .../seatunnel/transform/sql/SQLTransform.java | 15 ++----- 12 files changed, 108 insertions(+), 124 deletions(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 1942f875d7..579bf2dac0 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -129,4 +129,15 @@ seatunnel.source.ObsFile = connector-file-obs seatunnel.sink.ObsFile = connector-file-obs seatunnel.source.Milvus = connector-milvus seatunnel.sink.Milvus = connector-milvus -seatunnel.sink.ActiveMQ = connector-activemq \ No newline at end of file +seatunnel.sink.ActiveMQ = connector-activemq + +seatunnel.transform.Sql = seatunnel-transforms-v2 +seatunnel.transform.FieldMapper = seatunnel-transforms-v2 +seatunnel.transform.Filter = seatunnel-transforms-v2 +seatunnel.transform.FilterRowKind = seatunnel-transforms-v2 +seatunnel.transform.JsonPath = seatunnel-transforms-v2 +seatunnel.transform.Replace = seatunnel-transforms-v2 +seatunnel.transform.Split = seatunnel-transforms-v2 +seatunnel.transform.Copy = seatunnel-transforms-v2 +seatunnel.transform.DynamicCompile = seatunnel-transforms-v2 +seatunnel.transform.LLM = seatunnel-transforms-v2 \ No newline at end of file diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java index 0dc4209a8b..166e581e2d 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java @@ -31,7 +31,6 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryException; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; -import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.utils.SeaTunnelException; @@ -49,7 +48,6 @@ import java.util.Optional; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID; -import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory; /** The util used for Spark/Flink to create to SeaTunnelSource etc. */ public class PluginUtil { @@ -130,21 +128,21 @@ public class PluginUtil { return source; } - public static TableTransformFactory createTransformFactory( + public static Optional<? extends Factory> createTransformFactory( + SeaTunnelFactoryDiscovery factoryDiscovery, SeaTunnelTransformPluginDiscovery transformPluginDiscovery, Config transformConfig, List<URL> pluginJars) { PluginIdentifier pluginIdentifier = PluginIdentifier.of( ENGINE_TYPE, "transform", transformConfig.getString(PLUGIN_NAME.key())); - final ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(transformConfig); - final String factoryId = readonlyConfig.get(PLUGIN_NAME); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - final TableTransformFactory factory = - discoverFactory(classLoader, TableTransformFactory.class, factoryId); pluginJars.addAll( transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - return factory; + try { + return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier); + } catch (FactoryException e) { + return Optional.empty(); + } } public static Optional<? extends Factory> createSinkFactory( diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index d91bb9d3da..1ff2cf6437 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; +import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter; import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; @@ -41,6 +42,7 @@ import org.apache.flink.types.Row; import java.net.URL; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME; @@ -59,15 +61,23 @@ public class TransformExecuteProcessor @Override protected List<TableTransformFactory> initializePlugins( List<URL> jarPaths, List<? extends Config> pluginConfigs) { + + SeaTunnelFactoryDiscovery factoryDiscovery = + new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER); SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery(); - return pluginConfigs.stream() .map( transformConfig -> PluginUtil.createTransformFactory( - transformPluginDiscovery, transformConfig, jarPaths)) + factoryDiscovery, + transformPluginDiscovery, + transformConfig, + jarPaths)) .distinct() + .filter(Optional::isPresent) + .map(Optional::get) + .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index bc7cd5cdbe..fc4a9e00d0 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; +import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter; import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; @@ -50,6 +51,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME; @@ -69,16 +71,23 @@ public class TransformExecuteProcessor protected List<TableTransformFactory> initializePlugins(List<? extends Config> pluginConfigs) { SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery(); + + SeaTunnelFactoryDiscovery factoryDiscovery = + new SeaTunnelFactoryDiscovery(TableTransformFactory.class); List<URL> pluginJars = new ArrayList<>(); List<TableTransformFactory> transforms = pluginConfigs.stream() .map( transformConfig -> PluginUtil.createTransformFactory( + factoryDiscovery, transformPluginDiscovery, transformConfig, - pluginJars)) + new ArrayList<>())) .distinct() + .filter(Optional::isPresent) + .map(Optional::get) + .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); return transforms; diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml index cc48ac86a2..4510579d81 100644 --- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml +++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml @@ -140,7 +140,7 @@ <scope>provided</scope> </dependencySet> - <!-- ============ Connectors Jars ============ --> + <!-- ============ Connectors Jars And Transforms V2 Jar ============ --> <!-- SeaTunnel connectors --> <dependencySet> <useProjectArtifact>false</useProjectArtifact> @@ -148,6 +148,7 @@ <unpack>false</unpack> <includes> <include>org.apache.seatunnel:connector-*:jar</include> + <include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include> </includes> <excludes> <exclude>org.apache.seatunnel:connector-common</exclude> @@ -160,36 +161,7 @@ <scope>provided</scope> </dependencySet> - <!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 Uber Jar ============ --> - <dependencySet> - <useProjectArtifact>false</useProjectArtifact> - <useTransitiveDependencies>true</useTransitiveDependencies> - <unpack>false</unpack> - <includes> - <include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include> - <include>org.apache.hadoop:hadoop-aws:jar</include> - <include>com.amazonaws:aws-java-sdk-bundle:jar</include> - <include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include> - <!--Add hadoop aliyun jar --> - <include>org.apache.hadoop:hadoop-aliyun:jar</include> - <include>com.aliyun.oss:aliyun-sdk-oss:jar</include> - <include>org.jdom:jdom:jar</include> - - <!--Add netty buffer jar --> - <include>io.netty:netty-buffer:jar</include> - <include>io.netty:netty-common:jar</include> - - <!--Add hive exec jar --> - <include>org.apache.hive:hive-exec:jar</include> - <include>org.apache.hive:hive-service:jar</include> - <include>org.apache.thrift:libfb303:jar</include> - </includes> - <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping> - <outputDirectory>/lib</outputDirectory> - <scope>provided</scope> - </dependencySet> - - <!-- =================== JDBC Connector Drivers =================== --> + <!-- =================== JDBC Connector Drivers And SeaTunnel Hadoop3 Uber Jar =================== --> <dependencySet> <useProjectArtifact>false</useProjectArtifact> <useTransitiveDependencies>true</useTransitiveDependencies> @@ -209,6 +181,20 @@ <include>com.amazon.redshift:redshift-jdbc42:jar</include> <include>net.snowflake.snowflake-jdbc:jar</include> <include>com.xugudb:xugu-jdbc:jar</include> + <include>org.apache.hadoop:hadoop-aws:jar</include> + <include>com.amazonaws:aws-java-sdk-bundle:jar</include> + <include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include> + <!--Add hadoop aliyun jar --> + <include>org.apache.hadoop:hadoop-aliyun:jar</include> + <include>com.aliyun.oss:aliyun-sdk-oss:jar</include> + <include>org.jdom:jdom:jar</include> + <!--Add netty buffer jar --> + <include>io.netty:netty-buffer:jar</include> + <include>io.netty:netty-common:jar</include> + <!--Add hive exec jar --> + <include>org.apache.hive:hive-exec:jar</include> + <include>org.apache.hive:hive-service:jar</include> + <include>org.apache.thrift:libfb303:jar</include> </includes> <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping> <outputDirectory>/lib</outputDirectory> diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml index 30fc5a6336..f16841f7a9 100644 --- a/seatunnel-dist/src/main/assembly/assembly-bin.xml +++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml @@ -161,13 +161,12 @@ <scope>provided</scope> </dependencySet> - <!-- ============ SeaTunnel Transforms V2 Jars And SeaTunnel Hadoop3 Uber Jar============ --> + <!-- ============ SeaTunnel Hadoop3 Uber Jar============ --> <dependencySet> <useProjectArtifact>false</useProjectArtifact> <useTransitiveDependencies>true</useTransitiveDependencies> <unpack>false</unpack> <includes> - <include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include> <include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include> </includes> <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping> @@ -175,7 +174,7 @@ <scope>provided</scope> </dependencySet> - <!-- ============ Connectors Jars ============ --> + <!-- ============ Connectors Jars And Transforms V2 Jar ============ --> <!-- SeaTunnel connectors for Demo --> <dependencySet> <useProjectArtifact>false</useProjectArtifact> @@ -184,6 +183,7 @@ <includes> <include>org.apache.seatunnel:connector-fake:jar</include> <include>org.apache.seatunnel:connector-console:jar</include> + <include>org.apache.seatunnel:seatunnel-transforms-v2:jar</include> </includes> <outputDirectory>/connectors</outputDirectory> <scope>provided</scope> diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index 1c590bb69a..6c6a8e5cdd 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -195,13 +195,13 @@ public final class ContainerUtil { MountableFile.forHostPath(startJarPath), Paths.get(seatunnelHomeInContainer, "starter", startJarName).toString()); - // copy lib + // copy transform String transformJar = "seatunnel-transforms-v2.jar"; Path transformJarPath = Paths.get(PROJECT_ROOT_PATH, "seatunnel-transforms-v2", "target", transformJar); container.withCopyFileToContainer( MountableFile.forHostPath(transformJarPath), - Paths.get(seatunnelHomeInContainer, "lib", transformJar).toString()); + Paths.get(seatunnelHomeInContainer, "connectors", transformJar).toString()); // copy bin final String startBinPath = startModulePath + File.separator + "src/main/bin/"; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 40a6640c35..d02a76a4c5 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -182,7 +182,7 @@ public class MultipleTableJobConfigParser { TypesafeConfigUtils.getConfigList( seaTunnelJobConfig, "sink", Collections.emptyList()); - List<URL> connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs); + List<URL> connectorJars = getConnectorJarList(sourceConfigs, transformConfigs, sinkConfigs); if (!commonPluginJars.isEmpty()) { connectorJars.addAll(commonPluginJars); } @@ -238,18 +238,32 @@ public class MultipleTableJobConfigParser { } private List<URL> getConnectorJarList( - List<? extends Config> sourceConfigs, List<? extends Config> sinkConfigs) { + List<? extends Config> sourceConfigs, + List<? extends Config> transformConfigs, + List<? extends Config> sinkConfigs) { List<PluginIdentifier> factoryIds = Stream.concat( - sourceConfigs.stream() - .map(ConfigParserUtil::getFactoryId) - .map( - factory -> - PluginIdentifier.of( - CollectionConstants - .SEATUNNEL_PLUGIN, - CollectionConstants.SOURCE_PLUGIN, - factory)), + Stream.concat( + sourceConfigs.stream() + .map(ConfigParserUtil::getFactoryId) + .map( + factory -> + PluginIdentifier.of( + CollectionConstants + .SEATUNNEL_PLUGIN, + CollectionConstants + .SOURCE_PLUGIN, + factory)), + transformConfigs.stream() + .map(ConfigParserUtil::getFactoryId) + .map( + factory -> + PluginIdentifier.of( + CollectionConstants + .SEATUNNEL_PLUGIN, + CollectionConstants + .TRANSFORM_PLUGIN, + factory))), sinkConfigs.stream() .map(ConfigParserUtil::getFactoryId) .map( diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java index 445bf14628..606cd0d7ca 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java @@ -34,7 +34,7 @@ import java.util.List; public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelTransform> { public SeaTunnelTransformPluginDiscovery() { - super(Common.libDir()); + super(Common.connectorDir()); } @Override diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java index 5670bcc129..632d3af1e4 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java @@ -20,10 +20,12 @@ package org.apache.seatunnel.transform.common; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; import lombok.NonNull; -public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelTransform { +public abstract class AbstractCatalogSupportTransform implements SeaTunnelTransform<SeaTunnelRow> { protected CatalogTable inputCatalogTable; protected volatile CatalogTable outputCatalogTable; @@ -32,6 +34,18 @@ public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelT this.inputCatalogTable = inputCatalogTable; } + @Override + public SeaTunnelRow map(SeaTunnelRow row) { + return transformRow(row); + } + + /** + * Outputs transformed row data. + * + * @param inputRow upstream input row data + */ + protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow); + @Override public CatalogTable getProducedCatalogTable() { if (outputCatalogTable == null) { diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java deleted file mode 100644 index 1892881c27..0000000000 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.transform.common; - -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.transform.SeaTunnelTransform; - -public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<SeaTunnelRow> { - - protected String inputTableName; - protected SeaTunnelRowType inputRowType; - - protected SeaTunnelRowType outputRowType; - - @Override - public SeaTunnelRow map(SeaTunnelRow row) { - return transformRow(row); - } - - /** - * Outputs transformed row data. - * - * @param inputRow upstream input row data - */ - protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow); - - @Override - public CatalogTable getProducedCatalogTable() { - throw new UnsupportedOperationException( - String.format( - "Connector %s must implement TableTransformFactory.createTransform method", - getPluginName())); - } -} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index a9d04b0739..00316bba8e 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -62,6 +62,8 @@ public class SQLTransform extends AbstractCatalogSupportTransform { private transient SQLEngine sqlEngine; + private final String inputTableName; + public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) { super(catalogTable); this.query = config.get(KEY_QUERY); @@ -77,15 +79,6 @@ public class SQLTransform extends AbstractCatalogSupportTransform { } else { this.inputTableName = catalogTable.getTableId().getTableName(); } - List<Column> columns = catalogTable.getTableSchema().getColumns(); - String[] fieldNames = new String[columns.size()]; - SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType<?>[columns.size()]; - for (int i = 0; i < columns.size(); i++) { - Column column = columns.get(i); - fieldNames[i] = column.getName(); - fieldTypes[i] = column.getDataType(); - } - this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes); } @Override @@ -98,8 +91,8 @@ public class SQLTransform extends AbstractCatalogSupportTransform { sqlEngine = SQLEngineFactory.getSQLEngine(engineType); sqlEngine.init( inputTableName, - inputCatalogTable != null ? inputCatalogTable.getTableId().getTableName() : null, - inputRowType, + inputCatalogTable.getTableId().getTableName(), + inputCatalogTable.getSeaTunnelRowType(), query); }