This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 80cf91318d [Improve][Zeta] Move SaveMode behavior to master (#6843) 80cf91318d is described below commit 80cf91318d6daebf4096a73c431cf270ef053785 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Thu Jun 13 11:29:33 2024 +0800 [Improve][Zeta] Move SaveMode behavior to master (#6843) --- docs/en/concept/JobEnvConfig.md | 6 + .../apache/seatunnel/api/env/EnvCommonOptions.java | 7 ++ .../apache/seatunnel/api/env/EnvOptionRule.java | 1 + .../api/sink/SaveModeExecuteLocation.java | 24 ++++ .../seatunnel/connectors/doris/sink/DorisSink.java | 6 + .../connectors/seatunnel/jdbc/sink/JdbcSink.java | 5 + .../seatunnel/jdbc/sink/JdbcSinkWriter.java | 5 + .../e2e/sink/inmemory/InMemorySaveModeHandler.java | 137 +++++++++++++++++++++ .../seatunnel/e2e/sink/inmemory/InMemorySink.java | 15 ++- .../e2e/sink/inmemory/InMemorySinkFactory.java | 2 +- .../seatunnel/engine/e2e/JobClientJobProxyIT.java | 45 +++++++ .../engine/e2e/classloader/ClassLoaderITBase.java | 2 +- .../savemode/fake_to_inmemory_savemode.conf | 57 +++++++++ .../savemode/fake_to_inmemory_savemode_client.conf | 58 +++++++++ .../engine/core/dag/logical/LogicalDag.java | 13 ++ .../core/dag/logical/LogicalDagGenerator.java | 13 +- .../engine/core/job/AbstractJobEnvironment.java | 2 +- .../engine/core/parse/JobConfigParser.java | 6 +- .../core/parse/MultipleTableJobConfigParser.java | 32 ++--- .../seatunnel/engine/server/master/JobMaster.java | 62 +++++++++- 20 files changed, 469 insertions(+), 29 deletions(-) diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md index 32bf089e92..66104f7fbc 100644 --- a/docs/en/concept/JobEnvConfig.md +++ b/docs/en/concept/JobEnvConfig.md @@ -33,6 +33,12 @@ This parameter configures the parallelism of source and sink. Used to control the default retry times when a job fails. The default value is 3, and it only works in the Zeta engine. +### savemode.execute.location + +This parameter is used to specify the location of the savemode when the job is executed in the Zeta engine. +The default value is `CLUSTER`, which means that the savemode is executed on the cluster. If you want to execute the savemode on the client, +you can set it to `CLIENT`. Please use `CLUSTER` mode as much as possible, because when there are no problems with `CLUSTER` mode, we will remove `CLIENT` mode. + ### shade.identifier Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting config files, this option can be ignored. diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java index 75e58a5f5b..cabf0856dc 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.api.env; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.sink.SaveModeExecuteLocation; import org.apache.seatunnel.common.constants.JobMode; import java.util.Map; @@ -76,6 +77,12 @@ public interface EnvCommonOptions { .noDefaultValue() .withDescription("The timeout (in milliseconds) for a checkpoint."); + Option<SaveModeExecuteLocation> SAVEMODE_EXECUTE_LOCATION = + Options.key("savemode.execute.location") + .enumType(SaveModeExecuteLocation.class) + .defaultValue(SaveModeExecuteLocation.CLUSTER) + .withDescription("The location of save mode execute."); + Option<String> JARS = Options.key("jars") .stringType() diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java index d4caa710d8..262cfe065e 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java @@ -33,6 +33,7 @@ public class EnvOptionRule { EnvCommonOptions.CHECKPOINT_TIMEOUT, EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND, EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND, + EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION, EnvCommonOptions.CUSTOM_PARAMETERS) .build(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java new file mode 100644 index 0000000000..3184bce3fb --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +public enum SaveModeExecuteLocation { + @Deprecated + CLIENT, + CLUSTER +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java index e0ef8e1893..e14b64d9a2 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java @@ -106,6 +106,12 @@ public class DorisSink @Override public Optional<SaveModeHandler> getSaveModeHandler() { + // Load the JDBC driver in to DriverManager + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } CatalogFactory catalogFactory = discoverFactory( Thread.currentThread().getContextClassLoader(), diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 384319d022..69b01f10d2 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -167,6 +167,11 @@ public class JdbcSink @Override public Optional<SaveModeHandler> getSaveModeHandler() { + try { + Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } if (catalogTable != null) { if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) { return Optional.empty(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java index 1cf08c4c43..d8c7b661c5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java @@ -77,6 +77,11 @@ public class JdbcSinkWriter public MultiTableResourceManager<ConnectionPoolManager> initMultiTableResourceManager( int tableSize, int queueSize) { HikariDataSource ds = new HikariDataSource(); + try { + Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } ds.setIdleTimeout(30 * 1000); ds.setMaximumPoolSize(queueSize); ds.setJdbcUrl(jdbcSinkConfig.getJdbcConnectionConfig().getUrl()); diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java new file mode 100644 index 0000000000..e2c28bd447 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.sink.inmemory; + +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SaveModeHandler; +import org.apache.seatunnel.api.sink.SchemaSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j +public class InMemorySaveModeHandler implements SaveModeHandler { + + private final CatalogTable catalogTable; + + public InMemorySaveModeHandler(CatalogTable catalogTable) { + this.catalogTable = catalogTable; + } + + @Override + public void handleSchemaSaveMode() { + log.info("handle schema savemode with table path: {}", catalogTable.getTablePath()); + } + + @Override + public void handleDataSaveMode() { + log.info("handle data savemode with table path: {}", catalogTable.getTablePath()); + } + + @Override + public SchemaSaveMode getSchemaSaveMode() { + return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST; + } + + @Override + public DataSaveMode getDataSaveMode() { + return DataSaveMode.APPEND_DATA; + } + + @Override + public TablePath getHandleTablePath() { + return catalogTable.getTablePath(); + } + + @Override + public Catalog getHandleCatalog() { + return new Catalog() { + @Override + public void open() throws CatalogException {} + + @Override + public void close() throws CatalogException {} + + @Override + public String name() { + return "InMemoryCatalog"; + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return null; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return false; + } + + @Override + public List<String> listDatabases() throws CatalogException { + return null; + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + return null; + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + return false; + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + return null; + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, + CatalogException {} + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException {} + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException {} + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException {} + }; + } + + @Override + public void close() throws Exception {} +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java index 3efe0ad00e..8f1eba9af4 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java @@ -20,10 +20,13 @@ package org.apache.seatunnel.e2e.sink.inmemory; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import java.io.IOException; @@ -35,11 +38,14 @@ public class InMemorySink InMemoryState, InMemoryCommitInfo, InMemoryAggregatedCommitInfo>, - SupportMultiTableSink { + SupportMultiTableSink, + SupportSaveMode { private ReadonlyConfig config; + private CatalogTable catalogTable; - public InMemorySink(ReadonlyConfig config) { + public InMemorySink(CatalogTable catalogTable, ReadonlyConfig config) { + this.catalogTable = catalogTable; this.config = config; } @@ -69,4 +75,9 @@ public class InMemorySink public Optional<Serializer<InMemoryAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() { return Optional.of(new DefaultSerializer<>()); } + + @Override + public Optional<SaveModeHandler> getSaveModeHandler() { + return Optional.of(new InMemorySaveModeHandler(catalogTable)); + } } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java index 16f1c0dc44..7b06ec99d9 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java @@ -56,6 +56,6 @@ public class InMemorySinkFactory @Override public TableSink<SeaTunnelRow, InMemoryState, InMemoryCommitInfo, InMemoryAggregatedCommitInfo> createSink(TableSinkFactoryContext context) { - return () -> new InMemorySink(context.getOptions()); + return () -> new InMemorySink(context.getCatalogTable(), context.getOptions()); } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java index bcbf40b9f7..3d871adb5a 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java @@ -70,6 +70,51 @@ public class JobClientJobProxyIT extends SeaTunnelContainer { Assertions.assertNotEquals(0, execResult.getExitCode()); } + @Test + public void testSaveModeOnMasterOrClient() throws IOException, InterruptedException { + Container.ExecResult execResult = + executeJob(server, "/savemode/fake_to_inmemory_savemode.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + int serverLogLength = 0; + String serverLogs = server.getLogs(); + Assertions.assertTrue( + serverLogs.contains( + "org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema savemode with table path: test.table1")); + Assertions.assertTrue( + serverLogs.contains( + "org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data savemode with table path: test.table1")); + Assertions.assertTrue( + serverLogs.contains( + "org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema savemode with table path: test.table2")); + Assertions.assertTrue( + serverLogs.contains( + "org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data savemode with table path: test.table2")); + + // restore will not execute savemode + execResult = restoreJob(server, "/savemode/fake_to_inmemory_savemode.conf", "1"); + Assertions.assertEquals(0, execResult.getExitCode()); + // clear old logs + serverLogLength += serverLogs.length(); + serverLogs = server.getLogs().substring(serverLogLength); + Assertions.assertFalse(serverLogs.contains("handle schema savemode with table path")); + Assertions.assertFalse(serverLogs.contains("handle data savemode with table path")); + + // test savemode on client side + Container.ExecResult execResult2 = + executeJob(server, "/savemode/fake_to_inmemory_savemode_client.conf"); + Assertions.assertEquals(0, execResult2.getExitCode()); + // clear old logs + serverLogLength += serverLogs.length(); + serverLogs = server.getLogs().substring(serverLogLength); + Assertions.assertFalse(serverLogs.contains("handle schema savemode with table path")); + Assertions.assertFalse(serverLogs.contains("handle data savemode with table path")); + + Assertions.assertTrue( + execResult2.getStdout().contains("handle schema savemode with table path")); + Assertions.assertTrue( + execResult2.getStdout().contains("handle data savemode with table path")); + } + @Test public void testJobFailedWillThrowException() throws IOException, InterruptedException { Container.ExecResult execResult = executeSeaTunnelJob("/batch_slot_not_enough.conf"); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java index 60907b3c0e..cdeef180f6 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java @@ -158,7 +158,7 @@ public abstract class ClassLoaderITBase extends SeaTunnelContainer { if (cacheMode()) { Assertions.assertTrue(3 >= getClassLoaderCount()); } else { - Assertions.assertTrue(2 + i >= getClassLoaderCount()); + Assertions.assertTrue(3 + 2 * i >= getClassLoaderCount()); } } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf new file mode 100644 index 0000000000..677589b0ac --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + row.num = 1 + schema = { + table = "test.table1" + columns = [ + { + name = id + type = bigint + } + ] + } + }, + { + row.num = 1 + schema = { + table = "test.table2" + columns = [ + { + name = id + type = bigint + } + ] + } + } + ] + } +} + +sink{ + InMemory { + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf new file mode 100644 index 0000000000..e7b15a25f2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + savemode.execute.location = client +} + +source { + FakeSource { + tables_configs = [ + { + row.num = 1 + schema = { + table = "test.table1" + columns = [ + { + name = id + type = bigint + } + ] + } + }, + { + row.num = 1 + schema = { + table = "test.table2" + columns = [ + { + name = id + type = bigint + } + ] + } + } + ] + } +} + +sink{ + InMemory { + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java index 38fd74f3ee..8739a82d2a 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java @@ -61,6 +61,7 @@ public class LogicalDag implements IdentifiedDataSerializable { private final Set<LogicalEdge> edges = new LinkedHashSet<>(); private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>(); private IdGenerator idGenerator; + private boolean isStartWithSavePoint = false; public LogicalDag() {} @@ -85,6 +86,14 @@ public class LogicalDag implements IdentifiedDataSerializable { return logicalVertexMap; } + public boolean isStartWithSavePoint() { + return isStartWithSavePoint; + } + + public void setStartWithSavePoint(boolean startWithSavePoint) { + isStartWithSavePoint = startWithSavePoint; + } + @NonNull public JsonObject getLogicalDagAsJson() { JsonObject logicalDag = new JsonObject(); JsonArray vertices = new JsonArray(); @@ -143,6 +152,8 @@ public class LogicalDag implements IdentifiedDataSerializable { out.writeObject(jobConfig); out.writeObject(idGenerator); + + out.writeBoolean(isStartWithSavePoint); } @Override @@ -165,6 +176,8 @@ public class LogicalDag implements IdentifiedDataSerializable { jobConfig = in.readObject(); idGenerator = in.readObject(); + + isStartWithSavePoint = in.readBoolean(); } @Override diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java index f847c2127b..6a56240b99 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java @@ -38,6 +38,7 @@ public class LogicalDagGenerator { private List<Action> actions; private JobConfig jobConfig; private IdGenerator idGenerator; + private boolean isStartWithSavePoint; private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>(); @@ -51,10 +52,19 @@ public class LogicalDagGenerator { @NonNull List<Action> actions, @NonNull JobConfig jobConfig, @NonNull IdGenerator idGenerator) { + this(actions, jobConfig, idGenerator, false); + } + + public LogicalDagGenerator( + @NonNull List<Action> actions, + @NonNull JobConfig jobConfig, + @NonNull IdGenerator idGenerator, + boolean isStartWithSavePoint) { this.actions = actions; this.jobConfig = jobConfig; this.idGenerator = idGenerator; - if (actions.size() <= 0) { + this.isStartWithSavePoint = isStartWithSavePoint; + if (actions.isEmpty()) { throw new IllegalStateException("No actions define in the job. Cannot execute."); } } @@ -65,6 +75,7 @@ public class LogicalDagGenerator { LogicalDag logicalDag = new LogicalDag(jobConfig, idGenerator); logicalDag.getEdges().addAll(logicalEdges); logicalDag.getLogicalVertexMap().putAll(logicalVertexMap); + logicalDag.setStartWithSavePoint(isStartWithSavePoint); return logicalDag; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java index 9f54db03d5..49c9b9275d 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java @@ -146,7 +146,7 @@ public abstract class AbstractJobEnvironment { protected abstract MultipleTableJobConfigParser getJobConfigParser(); protected LogicalDagGenerator getLogicalDagGenerator() { - return new LogicalDagGenerator(actions, jobConfig, idGenerator); + return new LogicalDagGenerator(actions, jobConfig, idGenerator, isStartWithSavePoint); } protected abstract LogicalDag getLogicalDag(); diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java index 981b85049a..2ec19cabc9 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java @@ -53,21 +53,23 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals; -import static org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode; @Data public class JobConfigParser { private static final ILogger LOGGER = Logger.getLogger(JobConfigParser.class); private IdGenerator idGenerator; private boolean isStartWithSavePoint; + private MultipleTableJobConfigParser multipleTableJobConfigParser; private List<URL> commonPluginJars; public JobConfigParser( @NonNull IdGenerator idGenerator, @NonNull List<URL> commonPluginJars, + MultipleTableJobConfigParser multipleTableJobConfigParser, boolean isStartWithSavePoint) { this.idGenerator = idGenerator; this.commonPluginJars = commonPluginJars; + this.multipleTableJobConfigParser = multipleTableJobConfigParser; this.isStartWithSavePoint = isStartWithSavePoint; } @@ -166,7 +168,7 @@ public class JobConfigParser { sink.setJobContext(jobConfig.getJobContext()); sink.setTypeInfo(rowType); if (!isStartWithSavePoint) { - handleSaveMode(sink); + multipleTableJobConfigParser.handleSaveMode(sink); } final String actionName = createSinkActionName(configIndex, tuple.getLeft().getPluginName()); diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 6e5fa3decd..c1ff66c0d3 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.api.sink.SaveModeExecuteLocation; import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper; import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -146,7 +147,7 @@ public class MultipleTableJobConfigParser { this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables); this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); this.fallbackParser = - new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); + new JobConfigParser(idGenerator, commonPluginJars, this, isStartWithSavePoint); } public MultipleTableJobConfigParser( @@ -162,7 +163,7 @@ public class MultipleTableJobConfigParser { this.seaTunnelJobConfig = seaTunnelJobConfig; this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); this.fallbackParser = - new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); + new JobConfigParser(idGenerator, commonPluginJars, this, isStartWithSavePoint); } public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoaderService) { @@ -268,13 +269,6 @@ public class MultipleTableJobConfigParser { }); } - void addCommonPluginJarsToAction(Action action) { - action.getJarUrls().addAll(commonPluginJars); - if (!action.getUpstream().isEmpty()) { - action.getUpstream().forEach(this::addCommonPluginJarsToAction); - } - } - private void fillJobConfig() { jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE)); if (StringUtils.isEmpty(jobConfig.getName()) @@ -660,15 +654,21 @@ public class MultipleTableJobConfigParser { return sinkAction; } - public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { + public void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) { if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) { SupportSaveMode saveModeSink = (SupportSaveMode) sink; - Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler(); - if (saveModeHandler.isPresent()) { - try (SaveModeHandler handler = saveModeHandler.get()) { - new SaveModeExecuteWrapper(handler).execute(); - } catch (Exception e) { - throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); + if (envOptions + .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) + .equals(SaveModeExecuteLocation.CLIENT)) { + log.warn( + "SaveMode execute location on CLIENT is deprecated, please use SERVER instead."); + Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler(); + if (saveModeHandler.isPresent()) { + try (SaveModeHandler handler = saveModeHandler.get()) { + new SaveModeExecuteWrapper(handler).execute(); + } catch (Exception e) { + throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); + } } } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 2e9e168af2..8f54402d80 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -19,8 +19,16 @@ package org.apache.seatunnel.engine.server.master; import org.apache.seatunnel.api.common.metrics.JobMetrics; import org.apache.seatunnel.api.common.metrics.RawJobMetrics; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.api.sink.SaveModeExecuteLocation; +import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper; +import org.apache.seatunnel.api.sink.SaveModeHandler; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException; @@ -32,7 +40,9 @@ import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex; import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier; import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; @@ -80,11 +90,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch; +import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; import static org.apache.seatunnel.common.constants.JobMode.BATCH; public class JobMaster { @@ -210,11 +222,22 @@ public class JobMaster { nodeEngine.getSerializationService(), classLoader, jobImmutableInformation.getLogicalDag()); - seaTunnelServer - .getClassLoaderService() - .releaseClassLoader( - jobImmutableInformation.getJobId(), - jobImmutableInformation.getPluginJarsUrls()); + if (!restart + && !logicalDag.isStartWithSavePoint() + && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions()) + .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) + .equals(SaveModeExecuteLocation.CLUSTER)) { + try { + Thread.currentThread().setContextClassLoader(classLoader); + logicalDag.getLogicalVertexMap().values().stream() + .map(LogicalVertex::getAction) + .filter(action -> action instanceof SinkAction) + .map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink()) + .forEach(JobMaster::handleSaveMode); + } finally { + Thread.currentThread().setContextClassLoader(appClassLoader); + } + } final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = PlanUtils.fromLogicalDAG( @@ -228,6 +251,11 @@ public class JobMaster { runningJobStateTimestampsIMap, engineConfig.getQueueType(), engineConfig); + seaTunnelServer + .getClassLoaderService() + .releaseClassLoader( + jobImmutableInformation.getJobId(), + jobImmutableInformation.getPluginJarsUrls()); // revert to app class loader, it may be changed by PlanUtils.fromLogicalDAG Thread.currentThread().setContextClassLoader(appClassLoader); this.physicalPlan = planTuple.f0(); @@ -333,6 +361,30 @@ public class JobMaster { } } + public static void handleSaveMode(SeaTunnelSink sink) { + if (sink instanceof SupportSaveMode) { + Optional<SaveModeHandler> saveModeHandler = + ((SupportSaveMode) sink).getSaveModeHandler(); + if (saveModeHandler.isPresent()) { + try (SaveModeHandler handler = saveModeHandler.get()) { + new SaveModeExecuteWrapper(handler).execute(); + } catch (Exception e) { + throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); + } + } + } else if (sink.getClass() + .getName() + .equals( + "org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink")) { + // TODO we should not use class name to judge the sink type + Map<String, SeaTunnelSink> sinks = + (Map<String, SeaTunnelSink>) ReflectionUtils.getField(sink, "sinks").get(); + for (SeaTunnelSink seaTunnelSink : sinks.values()) { + handleSaveMode(seaTunnelSink); + } + } + } + public void handleCheckpointError(long pipelineId, boolean neverRestore) { if (neverRestore) { this.neverNeedRestore();