This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 99fa19d2ea [Refactor][core] Unify transformFactory creation logic (#8574) 99fa19d2ea is described below commit 99fa19d2eab95b68dd79e27866233bd950efb166 Author: Guangdong Liu <804167...@qq.com> AuthorDate: Thu Feb 6 11:36:28 2025 +0800 [Refactor][core] Unify transformFactory creation logic (#8574) --- .../seatunnel/api/table/factory/FactoryUtil.java | 41 +++++++++- .../seatunnel/common/constants}/EngineType.java | 2 +- .../seatunnel/core/starter/enums/PluginType.java | 35 --------- .../core/starter/execution/PluginUtil.java | 87 ---------------------- .../seatunnel/core/starter/flink/FlinkStarter.java | 2 +- .../core/starter/flink/SeaTunnelFlink.java | 2 +- .../flink/execution/SinkExecuteProcessor.java | 38 ++++++++-- .../seatunnel/core/starter/flink/FlinkStarter.java | 2 +- .../core/starter/flink/SeaTunnelFlink.java | 2 +- .../FlinkAbstractPluginExecuteProcessor.java | 3 +- .../flink/execution/SinkExecuteProcessor.java | 39 ++++++++-- .../flink/execution/SourceExecuteProcessor.java | 12 +-- .../flink/execution/TransformExecuteProcessor.java | 37 ++++++--- .../core/starter/spark/SeaTunnelSpark.java | 2 +- .../seatunnel/core/starter/spark/SparkStarter.java | 4 +- .../spark/execution/SinkExecuteProcessor.java | 34 +++++---- .../core/starter/spark/SeaTunnelSpark.java | 2 +- .../seatunnel/core/starter/spark/SparkStarter.java | 4 +- .../spark/execution/SinkExecuteProcessor.java | 34 +++++---- .../spark/execution/SourceExecuteProcessor.java | 11 ++- .../SparkAbstractPluginExecuteProcessor.java | 2 +- .../spark/execution/SparkRuntimeEnvironment.java | 2 +- .../spark/execution/TransformExecuteProcessor.java | 36 ++++++--- .../core/starter/seatunnel/SeaTunnelClient.java | 2 +- .../core/starter/seatunnel/SeaTunnelServer.java | 2 +- .../core/parse/MultipleTableJobConfigParser.java | 3 +- 26 files changed, 225 insertions(+), 215 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index 30e7b00864..131d50852e 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; @@ -37,6 +38,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.utils.ExceptionUtils; import org.slf4j.Logger; @@ -106,7 +110,10 @@ public final class FactoryUtil { if (fallback) { source = fallbackCreateSource.apply( - PluginIdentifier.of("seatunnel", "source", factoryId)); + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.SOURCE.getType(), + factoryId)); source.prepare(options.toConfig()); } else { @@ -205,7 +212,10 @@ public final class FactoryUtil { if (fallback) { SeaTunnelSink sink = fallbackCreateSink.apply( - PluginIdentifier.of("seatunnel", "sink", factoryId)); + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.SINK.getType(), + factoryId)); sink.prepare(config.toConfig()); sink.setTypeInfo(catalogTable.getSeaTunnelRowType()); @@ -273,6 +283,23 @@ public final class FactoryUtil { return factory.getClass().getProtectionDomain().getCodeSource().getLocation(); } + public static <T extends Factory> Optional<T> discoverOptionalFactory( + ClassLoader classLoader, + Class<T> factoryClass, + String factoryIdentifier, + Function<String, T> discoverOptionalFactoryFunction) { + + if (discoverOptionalFactoryFunction != null) { + T apply = discoverOptionalFactoryFunction.apply(factoryIdentifier); + if (apply != null) { + return Optional.of(apply); + } else { + return Optional.empty(); + } + } + return discoverOptionalFactory(classLoader, factoryClass, factoryIdentifier); + } + public static <T extends Factory> Optional<T> discoverOptionalFactory( ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) { final List<T> foundFactories = discoverFactories(classLoader, factoryClass); @@ -436,4 +463,14 @@ public final class FactoryUtil { } return false; } + + public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) { + if (jobContext.getJobMode() == JobMode.BATCH + && source.getBoundedness() + == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) { + throw new UnsupportedOperationException( + String.format( + "'%s' source don't support off-line job.", source.getPluginName())); + } + } } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java similarity index 97% rename from seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java rename to seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java index 355fa838d4..6f5c79c1ed 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/EngineType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.core.starter.enums; +package org.apache.seatunnel.common.constants; /** Engine type enum */ public enum EngineType { diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java deleted file mode 100644 index 5fdec8b71b..0000000000 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/PluginType.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.core.starter.enums; - -/** Plugin type enum */ -public enum PluginType { - SOURCE("source"), - TRANSFORM("transform"), - SINK("sink"); - - private final String type; - - PluginType(String type) { - this.type = type; - } - - public String getType() { - return type; - } -} diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java deleted file mode 100644 index b2b47854e3..0000000000 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.core.starter.execution; - -import org.apache.seatunnel.shade.com.google.common.collect.Lists; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.common.PluginIdentifier; -import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.FactoryException; -import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; - -import java.net.URL; -import java.util.List; -import java.util.Optional; - -import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; - -/** The util used for Spark/Flink to create to SeaTunnelSource etc. */ -@SuppressWarnings("rawtypes") -public class PluginUtil { - - protected static final String ENGINE_TYPE = "seatunnel"; - - public static Optional<? extends Factory> createTransformFactory( - SeaTunnelFactoryDiscovery factoryDiscovery, - SeaTunnelTransformPluginDiscovery transformPluginDiscovery, - Config transformConfig, - List<URL> pluginJars) { - PluginIdentifier pluginIdentifier = - PluginIdentifier.of( - ENGINE_TYPE, "transform", transformConfig.getString(PLUGIN_NAME.key())); - pluginJars.addAll( - transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - try { - return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier); - } catch (FactoryException e) { - return Optional.empty(); - } - } - - public static Optional<? extends Factory> createSinkFactory( - SeaTunnelFactoryDiscovery factoryDiscovery, - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - Config sinkConfig, - List<URL> pluginJars) { - PluginIdentifier pluginIdentifier = - PluginIdentifier.of(ENGINE_TYPE, "sink", sinkConfig.getString(PLUGIN_NAME.key())); - pluginJars.addAll( - sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - try { - return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier); - } catch (FactoryException e) { - return Optional.empty(); - } - } - - public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) { - if (jobContext.getJobMode() == JobMode.BATCH - && source.getBoundedness() - == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) { - throw new UnsupportedOperationException( - String.format( - "'%s' source don't support off-line job.", source.getPluginName())); - } - } -} diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index e9d0ba7df2..7d106abbce 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.core.starter.flink; import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.Starter; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.enums.MasterType; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java index 8d1b434801..4356e85bfb 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.core.starter.flink; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.SeaTunnel; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.exception.CommandException; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 4466844536..b700af5b46 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.core.starter.flink.execution; +import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.CommonOptions; @@ -35,9 +36,10 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; @@ -56,6 +58,7 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @SuppressWarnings({"unchecked", "rawtypes"}) @Slf4j @@ -77,14 +80,34 @@ public class SinkExecuteProcessor new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER); SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); + Function<String, TableSinkFactory> discoverOptionalFactoryFunction = + pluginName -> + (TableSinkFactory) + factoryDiscovery + .createOptionalPluginInstance( + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.SINK.getType(), + pluginName)) + .orElse(null); + return pluginConfigs.stream() .map( - sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - jarPaths)) + sinkConfig -> { + jarPaths.addAll( + sinkPluginDiscovery.getPluginJarPaths( + Lists.newArrayList( + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.SINK.getType(), + sinkConfig.getString( + PLUGIN_NAME.key()))))); + return discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key()), + discoverOptionalFactoryFunction); + }) .distinct() .collect(Collectors.toList()); } @@ -95,7 +118,6 @@ public class SinkExecuteProcessor SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java index 06cfd5f449..54d1984bf4 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.core.starter.flink; import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.Starter; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.enums.MasterType; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java index 1595da686a..dbae7e5fe9 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.core.starter.flink; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.SeaTunnel; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.exception.CommandException; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java index 00614b1d88..aef7d57416 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java @@ -36,8 +36,6 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_INPUT; public abstract class FlinkAbstractPluginExecuteProcessor<T> implements PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> { - protected static final String ENGINE_TYPE = "seatunnel"; - protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> { if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) { @@ -57,6 +55,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T> protected JobContext jobContext; protected final List<T> plugins; protected final Config envConfig; + protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); protected FlinkAbstractPluginExecuteProcessor( List<URL> jarPaths, diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index a82d2392e6..d6d8518a28 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.core.starter.flink.execution; +import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.CommonOptions; @@ -35,9 +36,10 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; @@ -57,6 +59,7 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @Slf4j @SuppressWarnings("unchecked,rawtypes") @@ -74,18 +77,39 @@ public class SinkExecuteProcessor @Override protected List<Optional<? extends Factory>> initializePlugins( List<URL> jarPaths, List<? extends Config> pluginConfigs) { + SeaTunnelFactoryDiscovery factoryDiscovery = new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER); SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); + Function<String, TableSinkFactory> discoverOptionalFactoryFunction = + pluginName -> + (TableSinkFactory) + factoryDiscovery + .createOptionalPluginInstance( + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.SINK.getType(), + pluginName)) + .orElse(null); + return pluginConfigs.stream() .map( - sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - jarPaths)) + sinkConfig -> { + jarPaths.addAll( + sinkPluginDiscovery.getPluginJarPaths( + Lists.newArrayList( + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.SINK.getType(), + sinkConfig.getString( + PLUGIN_NAME.key()))))); + return discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key()), + discoverOptionalFactoryFunction); + }) .distinct() .collect(Collectors.toList()); } @@ -96,7 +120,6 @@ public class SinkExecuteProcessor SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index d637f2256b..deafcc6cb2 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -30,7 +30,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.core.starter.enums.PluginType; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.starter.execution.SourceTableInfo; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; @@ -53,12 +54,11 @@ import java.util.function.Function; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; -import static org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch; @Slf4j @SuppressWarnings("unchecked,rawtypes") public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<SourceTableInfo> { - private static final String PLUGIN_TYPE = PluginType.SOURCE.getType(); public SourceExecuteProcessor( List<URL> jarPaths, @@ -113,14 +113,16 @@ public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor< for (Config sourceConfig : pluginConfigs) { PluginIdentifier pluginIdentifier = PluginIdentifier.of( - ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME.key())); + EngineType.SEATUNNEL.getEngine(), + PluginType.SOURCE.getType(), + sourceConfig.getString(PLUGIN_NAME.key())); jars.addAll( sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, List<CatalogTable>> source = FactoryUtil.createAndPrepareSource( ReadonlyConfig.fromConfig(sourceConfig), - Thread.currentThread().getContextClassLoader(), + classLoader, pluginIdentifier.getPluginName(), fallbackCreateSource, (TableSourceFactory) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index 615876c417..fa1380cd00 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -17,9 +17,11 @@ package org.apache.seatunnel.core.starter.flink.execution; +import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.table.factory.TableTransformFactory; @@ -28,8 +30,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; @@ -49,6 +52,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; @SuppressWarnings("unchecked,rawtypes") @@ -66,23 +70,32 @@ public class TransformExecuteProcessor @Override protected List<TableTransformFactory> initializePlugins( List<URL> jarPaths, List<? extends Config> pluginConfigs) { - - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER); SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery(); + SeaTunnelFactoryDiscovery factoryDiscovery = + new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER); return pluginConfigs.stream() .map( - transformConfig -> - PluginUtil.createTransformFactory( - factoryDiscovery, - transformPluginDiscovery, - transformConfig, - jarPaths)) + transformConfig -> { + jarPaths.addAll( + transformPluginDiscovery.getPluginJarPaths( + Lists.newArrayList( + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.TRANSFORM.getType(), + transformConfig.getString( + PLUGIN_NAME.key()))))); + return Optional.of( + (TableTransformFactory) + factoryDiscovery.createPluginInstance( + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.TRANSFORM.getType(), + transformConfig.getString( + PLUGIN_NAME.key())))); + }) .distinct() - .filter(Optional::isPresent) .map(Optional::get) - .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java index ca7b2ed4be..fd7dea9287 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.core.starter.spark; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.SeaTunnel; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.exception.CommandException; import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index e9ee482e9c..3325ae38a5 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -23,9 +23,9 @@ import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.starter.Starter; -import org.apache.seatunnel.core.starter.enums.EngineType; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; import org.apache.seatunnel.core.starter.utils.CompressionUtils; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 6e22576c91..5a9ff3d2e4 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.core.starter.spark.execution; +import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.CommonOptions; @@ -33,10 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; @@ -56,10 +57,10 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; public class SinkExecuteProcessor extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> { - private static final String PLUGIN_TYPE = PluginType.SINK.getType(); protected SinkExecuteProcessor( SparkRuntimeEnvironment sparkRuntimeEnvironment, @@ -71,19 +72,27 @@ public class SinkExecuteProcessor @Override protected List<Optional<? extends Factory>> initializePlugins( List<? extends Config> pluginConfigs) { - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSinkFactory.class); - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); List<URL> pluginJars = new ArrayList<>(); + SeaTunnelFactoryDiscovery sinkPluginDiscovery = + new SeaTunnelFactoryDiscovery(TableSinkFactory.class); List<Optional<? extends Factory>> sinks = pluginConfigs.stream() .map( - sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - pluginJars)) + sinkConfig -> { + pluginJars.addAll( + sinkPluginDiscovery.getPluginJarPaths( + Lists.newArrayList( + PluginIdentifier.of( + EngineType.SEATUNNEL + .getEngine(), + PluginType.SINK.getType(), + sinkConfig.getString( + PLUGIN_NAME.key()))))); + return discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key())); + }) .distinct() .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); @@ -94,7 +103,6 @@ public class SinkExecuteProcessor public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams) throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java index 9b3fde6fd1..e984e19182 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.core.starter.spark; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.SeaTunnel; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.exception.CommandException; import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index dcd6a804b2..3411b785d1 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -23,9 +23,9 @@ import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.starter.Starter; -import org.apache.seatunnel.core.starter.enums.EngineType; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; import org.apache.seatunnel.core.starter.utils.CompressionUtils; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 74b0e1c1f1..19a07878ed 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.core.starter.spark.execution; +import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.CommonOptions; @@ -33,10 +34,10 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; @@ -59,11 +60,11 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @Slf4j public class SinkExecuteProcessor extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> { - private static final String PLUGIN_TYPE = PluginType.SINK.getType(); protected SinkExecuteProcessor( SparkRuntimeEnvironment sparkRuntimeEnvironment, @@ -75,19 +76,27 @@ public class SinkExecuteProcessor @Override protected List<Optional<? extends Factory>> initializePlugins( List<? extends Config> pluginConfigs) { - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSinkFactory.class); - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); List<URL> pluginJars = new ArrayList<>(); + SeaTunnelFactoryDiscovery sinkPluginDiscovery = + new SeaTunnelFactoryDiscovery(TableSinkFactory.class); List<Optional<? extends Factory>> sinks = pluginConfigs.stream() .map( - sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - new ArrayList<>())) + sinkConfig -> { + pluginJars.addAll( + sinkPluginDiscovery.getPluginJarPaths( + Lists.newArrayList( + PluginIdentifier.of( + EngineType.SEATUNNEL + .getEngine(), + PluginType.SINK.getType(), + sinkConfig.getString( + PLUGIN_NAME.key()))))); + return discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key())); + }) .distinct() .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); @@ -98,7 +107,6 @@ public class SinkExecuteProcessor public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams) throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java index 50b3a88814..5ccbb59cbd 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java @@ -30,6 +30,8 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.common.Constants; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.utils.SerializationUtils; import org.apache.seatunnel.core.starter.execution.SourceTableInfo; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; @@ -52,11 +54,10 @@ import java.util.function.Function; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; -import static org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch; @SuppressWarnings("rawtypes") public class SourceExecuteProcessor extends SparkAbstractPluginExecuteProcessor<SourceTableInfo> { - private static final String PLUGIN_TYPE = "source"; private Map envOption = new HashMap<String, String>(); public SourceExecuteProcessor( @@ -124,13 +125,15 @@ public class SourceExecuteProcessor extends SparkAbstractPluginExecuteProcessor< for (Config sourceConfig : pluginConfigs) { PluginIdentifier pluginIdentifier = PluginIdentifier.of( - ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME.key())); + EngineType.SEATUNNEL.getEngine(), + PluginType.SOURCE.getType(), + sourceConfig.getString(PLUGIN_NAME.key())); jars.addAll( sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, List<CatalogTable>> source = FactoryUtil.createAndPrepareSource( ReadonlyConfig.fromConfig(sourceConfig), - Thread.currentThread().getContextClassLoader(), + classLoader, pluginIdentifier.getPluginName(), fallbackCreateSource, null); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java index d9de016446..42b9155483 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java @@ -48,7 +48,7 @@ public abstract class SparkAbstractPluginExecuteProcessor<T> protected final List<? extends Config> pluginConfigs; protected final JobContext jobContext; protected final List<T> plugins; - protected static final String ENGINE_TYPE = "seatunnel"; + protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); protected SparkAbstractPluginExecuteProcessor( SparkRuntimeEnvironment sparkRuntimeEnvironment, diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java index 7e31ca463b..e9bc4e07c9 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.core.starter.enums.PluginType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment; import org.apache.spark.SparkConf; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index 492af1ad73..f10135e724 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -17,9 +17,11 @@ package org.apache.seatunnel.core.starter.spark.execution; +import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.table.catalog.CatalogTable; @@ -29,8 +31,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; @@ -56,6 +59,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; @Slf4j @@ -71,25 +75,39 @@ public class TransformExecuteProcessor @Override protected List<TableTransformFactory> initializePlugins(List<? extends Config> pluginConfigs) { + SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery(); SeaTunnelFactoryDiscovery factoryDiscovery = new SeaTunnelFactoryDiscovery(TableTransformFactory.class); + List<URL> pluginJars = new ArrayList<>(); List<TableTransformFactory> transforms = pluginConfigs.stream() .map( - transformConfig -> - PluginUtil.createTransformFactory( - factoryDiscovery, - transformPluginDiscovery, - transformConfig, - new ArrayList<>())) + transformConfig -> { + pluginJars.addAll( + transformPluginDiscovery.getPluginJarPaths( + Lists.newArrayList( + PluginIdentifier.of( + EngineType.SEATUNNEL + .getEngine(), + PluginType.TRANSFORM.getType(), + transformConfig.getString( + PLUGIN_NAME.key()))))); + return Optional.of( + (TableTransformFactory) + factoryDiscovery.createPluginInstance( + PluginIdentifier.of( + EngineType.SEATUNNEL + .getEngine(), + PluginType.TRANSFORM.getType(), + transformConfig.getString( + PLUGIN_NAME.key())))); + }) .distinct() - .filter(Optional::isPresent) .map(Optional::get) - .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); return transforms; diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java index 7a37fd340c..308ef35905 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.core.starter.seatunnel; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.SeaTunnel; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.exception.CommandException; import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java index 96a3e32e51..a340c0acb6 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.core.starter.seatunnel; +import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.SeaTunnel; -import org.apache.seatunnel.core.starter.enums.EngineType; import org.apache.seatunnel.core.starter.exception.CommandException; import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 903db02f80..6ec3eaf0cf 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -46,7 +46,6 @@ import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.core.starter.utils.ConfigBuilder; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.exception.JobDefineCheckException; @@ -386,7 +385,7 @@ public class MultipleTableJobConfigParser { String actionName = JobConfigParser.createSourceActionName(configIndex, factoryId); SeaTunnelSource<Object, SourceSplit, Serializable> source = tuple2._1(); source.setJobContext(jobConfig.getJobContext()); - PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source); + FactoryUtil.ensureJobModeMatch(jobConfig.getJobContext(), source); SourceAction<Object, SourceSplit, Serializable> action = new SourceAction<>(id, actionName, tuple2._1(), factoryUrls, new HashSet<>()); action.setParallelism(parallelism);