This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 80cf91318d [Improve][Zeta] Move SaveMode behavior to master (#6843)
80cf91318d is described below

commit 80cf91318d6daebf4096a73c431cf270ef053785
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Thu Jun 13 11:29:33 2024 +0800

    [Improve][Zeta] Move SaveMode behavior to master (#6843)
---
 docs/en/concept/JobEnvConfig.md                    |   6 +
 .../apache/seatunnel/api/env/EnvCommonOptions.java |   7 ++
 .../apache/seatunnel/api/env/EnvOptionRule.java    |   1 +
 .../api/sink/SaveModeExecuteLocation.java          |  24 ++++
 .../seatunnel/connectors/doris/sink/DorisSink.java |   6 +
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |   5 +
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |   5 +
 .../e2e/sink/inmemory/InMemorySaveModeHandler.java | 137 +++++++++++++++++++++
 .../seatunnel/e2e/sink/inmemory/InMemorySink.java  |  15 ++-
 .../e2e/sink/inmemory/InMemorySinkFactory.java     |   2 +-
 .../seatunnel/engine/e2e/JobClientJobProxyIT.java  |  45 +++++++
 .../engine/e2e/classloader/ClassLoaderITBase.java  |   2 +-
 .../savemode/fake_to_inmemory_savemode.conf        |  57 +++++++++
 .../savemode/fake_to_inmemory_savemode_client.conf |  58 +++++++++
 .../engine/core/dag/logical/LogicalDag.java        |  13 ++
 .../core/dag/logical/LogicalDagGenerator.java      |  13 +-
 .../engine/core/job/AbstractJobEnvironment.java    |   2 +-
 .../engine/core/parse/JobConfigParser.java         |   6 +-
 .../core/parse/MultipleTableJobConfigParser.java   |  32 ++---
 .../seatunnel/engine/server/master/JobMaster.java  |  62 +++++++++-
 20 files changed, 469 insertions(+), 29 deletions(-)

diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md
index 32bf089e92..66104f7fbc 100644
--- a/docs/en/concept/JobEnvConfig.md
+++ b/docs/en/concept/JobEnvConfig.md
@@ -33,6 +33,12 @@ This parameter configures the parallelism of source and sink.
 
 Used to control the default retry times when a job fails. The default value is 
3, and it only works in the Zeta engine.
 
+### savemode.execute.location
+
+This parameter is used to specify the location of the savemode when the job is 
executed in the Zeta engine.
+The default value is `CLUSTER`, which means that the savemode is executed on 
the cluster. If you want to execute the savemode on the client,
+you can set it to `CLIENT`. Please use `CLUSTER` mode as much as possible, 
because when there are no problems with `CLUSTER` mode, we will remove `CLIENT` 
mode.
+
 ### shade.identifier
 
 Specify the method of encryption, if you didn't have the requirement for 
encrypting or decrypting config files, this option can be ignored.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 75e58a5f5b..cabf0856dc 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.env;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
 import org.apache.seatunnel.common.constants.JobMode;
 
 import java.util.Map;
@@ -76,6 +77,12 @@ public interface EnvCommonOptions {
                     .noDefaultValue()
                     .withDescription("The timeout (in milliseconds) for a 
checkpoint.");
 
+    Option<SaveModeExecuteLocation> SAVEMODE_EXECUTE_LOCATION =
+            Options.key("savemode.execute.location")
+                    .enumType(SaveModeExecuteLocation.class)
+                    .defaultValue(SaveModeExecuteLocation.CLUSTER)
+                    .withDescription("The location of save mode execute.");
+
     Option<String> JARS =
             Options.key("jars")
                     .stringType()
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index d4caa710d8..262cfe065e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -33,6 +33,7 @@ public class EnvOptionRule {
                         EnvCommonOptions.CHECKPOINT_TIMEOUT,
                         EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
                         EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
+                        EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
                         EnvCommonOptions.CUSTOM_PARAMETERS)
                 .build();
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java
new file mode 100644
index 0000000000..3184bce3fb
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+public enum SaveModeExecuteLocation {
+    @Deprecated
+    CLIENT,
+    CLUSTER
+}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index e0ef8e1893..e14b64d9a2 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -106,6 +106,12 @@ public class DorisSink
 
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
+        // Load the JDBC driver in to DriverManager
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
         CatalogFactory catalogFactory =
                 discoverFactory(
                         Thread.currentThread().getContextClassLoader(),
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 384319d022..69b01f10d2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -167,6 +167,11 @@ public class JdbcSink
 
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
+        try {
+            
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
         if (catalogTable != null) {
             if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
                 return Optional.empty();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 1cf08c4c43..d8c7b661c5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -77,6 +77,11 @@ public class JdbcSinkWriter
     public MultiTableResourceManager<ConnectionPoolManager> 
initMultiTableResourceManager(
             int tableSize, int queueSize) {
         HikariDataSource ds = new HikariDataSource();
+        try {
+            
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
         ds.setIdleTimeout(30 * 1000);
         ds.setMaximumPoolSize(queueSize);
         ds.setJdbcUrl(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
new file mode 100644
index 0000000000..e2c28bd447
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.sink.inmemory;
+
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class InMemorySaveModeHandler implements SaveModeHandler {
+
+    private final CatalogTable catalogTable;
+
+    public InMemorySaveModeHandler(CatalogTable catalogTable) {
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public void handleSchemaSaveMode() {
+        log.info("handle schema savemode with table path: {}", 
catalogTable.getTablePath());
+    }
+
+    @Override
+    public void handleDataSaveMode() {
+        log.info("handle data savemode with table path: {}", 
catalogTable.getTablePath());
+    }
+
+    @Override
+    public SchemaSaveMode getSchemaSaveMode() {
+        return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST;
+    }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        return DataSaveMode.APPEND_DATA;
+    }
+
+    @Override
+    public TablePath getHandleTablePath() {
+        return catalogTable.getTablePath();
+    }
+
+    @Override
+    public Catalog getHandleCatalog() {
+        return new Catalog() {
+            @Override
+            public void open() throws CatalogException {}
+
+            @Override
+            public void close() throws CatalogException {}
+
+            @Override
+            public String name() {
+                return "InMemoryCatalog";
+            }
+
+            @Override
+            public String getDefaultDatabase() throws CatalogException {
+                return null;
+            }
+
+            @Override
+            public boolean databaseExists(String databaseName) throws 
CatalogException {
+                return false;
+            }
+
+            @Override
+            public List<String> listDatabases() throws CatalogException {
+                return null;
+            }
+
+            @Override
+            public List<String> listTables(String databaseName)
+                    throws CatalogException, DatabaseNotExistException {
+                return null;
+            }
+
+            @Override
+            public boolean tableExists(TablePath tablePath) throws 
CatalogException {
+                return false;
+            }
+
+            @Override
+            public CatalogTable getTable(TablePath tablePath)
+                    throws CatalogException, TableNotExistException {
+                return null;
+            }
+
+            @Override
+            public void createTable(TablePath tablePath, CatalogTable table, 
boolean ignoreIfExists)
+                    throws TableAlreadyExistException, 
DatabaseNotExistException,
+                            CatalogException {}
+
+            @Override
+            public void dropTable(TablePath tablePath, boolean 
ignoreIfNotExists)
+                    throws TableNotExistException, CatalogException {}
+
+            @Override
+            public void createDatabase(TablePath tablePath, boolean 
ignoreIfExists)
+                    throws DatabaseAlreadyExistException, CatalogException {}
+
+            @Override
+            public void dropDatabase(TablePath tablePath, boolean 
ignoreIfNotExists)
+                    throws DatabaseNotExistException, CatalogException {}
+        };
+    }
+
+    @Override
+    public void close() throws Exception {}
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
index 3efe0ad00e..8f1eba9af4 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
@@ -20,10 +20,13 @@ package org.apache.seatunnel.e2e.sink.inmemory;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import java.io.IOException;
@@ -35,11 +38,14 @@ public class InMemorySink
                         InMemoryState,
                         InMemoryCommitInfo,
                         InMemoryAggregatedCommitInfo>,
-                SupportMultiTableSink {
+                SupportMultiTableSink,
+                SupportSaveMode {
 
     private ReadonlyConfig config;
+    private CatalogTable catalogTable;
 
-    public InMemorySink(ReadonlyConfig config) {
+    public InMemorySink(CatalogTable catalogTable, ReadonlyConfig config) {
+        this.catalogTable = catalogTable;
         this.config = config;
     }
 
@@ -69,4 +75,9 @@ public class InMemorySink
     public Optional<Serializer<InMemoryAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
         return Optional.of(new DefaultSerializer<>());
     }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        return Optional.of(new InMemorySaveModeHandler(catalogTable));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 16f1c0dc44..7b06ec99d9 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -56,6 +56,6 @@ public class InMemorySinkFactory
     @Override
     public TableSink<SeaTunnelRow, InMemoryState, InMemoryCommitInfo, 
InMemoryAggregatedCommitInfo>
             createSink(TableSinkFactoryContext context) {
-        return () -> new InMemorySink(context.getOptions());
+        return () -> new InMemorySink(context.getCatalogTable(), 
context.getOptions());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index bcbf40b9f7..3d871adb5a 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -70,6 +70,51 @@ public class JobClientJobProxyIT extends SeaTunnelContainer {
         Assertions.assertNotEquals(0, execResult.getExitCode());
     }
 
+    @Test
+    public void testSaveModeOnMasterOrClient() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult =
+                executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        int serverLogLength = 0;
+        String serverLogs = server.getLogs();
+        Assertions.assertTrue(
+                serverLogs.contains(
+                        
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema 
savemode with table path: test.table1"));
+        Assertions.assertTrue(
+                serverLogs.contains(
+                        
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data 
savemode with table path: test.table1"));
+        Assertions.assertTrue(
+                serverLogs.contains(
+                        
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema 
savemode with table path: test.table2"));
+        Assertions.assertTrue(
+                serverLogs.contains(
+                        
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data 
savemode with table path: test.table2"));
+
+        // restore will not execute savemode
+        execResult = restoreJob(server, 
"/savemode/fake_to_inmemory_savemode.conf", "1");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        // clear old logs
+        serverLogLength += serverLogs.length();
+        serverLogs = server.getLogs().substring(serverLogLength);
+        Assertions.assertFalse(serverLogs.contains("handle schema savemode 
with table path"));
+        Assertions.assertFalse(serverLogs.contains("handle data savemode with 
table path"));
+
+        // test savemode on client side
+        Container.ExecResult execResult2 =
+                executeJob(server, 
"/savemode/fake_to_inmemory_savemode_client.conf");
+        Assertions.assertEquals(0, execResult2.getExitCode());
+        // clear old logs
+        serverLogLength += serverLogs.length();
+        serverLogs = server.getLogs().substring(serverLogLength);
+        Assertions.assertFalse(serverLogs.contains("handle schema savemode 
with table path"));
+        Assertions.assertFalse(serverLogs.contains("handle data savemode with 
table path"));
+
+        Assertions.assertTrue(
+                execResult2.getStdout().contains("handle schema savemode with 
table path"));
+        Assertions.assertTrue(
+                execResult2.getStdout().contains("handle data savemode with 
table path"));
+    }
+
     @Test
     public void testJobFailedWillThrowException() throws IOException, 
InterruptedException {
         Container.ExecResult execResult = 
executeSeaTunnelJob("/batch_slot_not_enough.conf");
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
index 60907b3c0e..cdeef180f6 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
@@ -158,7 +158,7 @@ public abstract class ClassLoaderITBase extends 
SeaTunnelContainer {
             if (cacheMode()) {
                 Assertions.assertTrue(3 >= getClassLoaderCount());
             } else {
-                Assertions.assertTrue(2 + i >= getClassLoaderCount());
+                Assertions.assertTrue(3 + 2 * i >= getClassLoaderCount());
             }
         }
     }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf
new file mode 100644
index 0000000000..677589b0ac
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+        {
+            row.num = 1
+            schema = {
+                  table = "test.table1"
+                  columns = [
+                    {
+                        name = id
+                        type = bigint
+                    }
+                  ]
+            }
+        },
+        {
+            row.num = 1
+            schema = {
+                  table = "test.table2"
+                  columns = [
+                    {
+                        name = id
+                        type = bigint
+                    }
+                  ]
+            }
+        }
+    ]
+  }
+}
+
+sink{
+  InMemory {
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf
new file mode 100644
index 0000000000..e7b15a25f2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  savemode.execute.location = client
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+        {
+            row.num = 1
+            schema = {
+                  table = "test.table1"
+                  columns = [
+                    {
+                        name = id
+                        type = bigint
+                    }
+                  ]
+            }
+        },
+        {
+            row.num = 1
+            schema = {
+                  table = "test.table2"
+                  columns = [
+                    {
+                        name = id
+                        type = bigint
+                    }
+                  ]
+            }
+        }
+    ]
+  }
+}
+
+sink{
+  InMemory {
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
index 38fd74f3ee..8739a82d2a 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
@@ -61,6 +61,7 @@ public class LogicalDag implements IdentifiedDataSerializable 
{
     private final Set<LogicalEdge> edges = new LinkedHashSet<>();
     private final Map<Long, LogicalVertex> logicalVertexMap = new 
LinkedHashMap<>();
     private IdGenerator idGenerator;
+    private boolean isStartWithSavePoint = false;
 
     public LogicalDag() {}
 
@@ -85,6 +86,14 @@ public class LogicalDag implements 
IdentifiedDataSerializable {
         return logicalVertexMap;
     }
 
+    public boolean isStartWithSavePoint() {
+        return isStartWithSavePoint;
+    }
+
+    public void setStartWithSavePoint(boolean startWithSavePoint) {
+        isStartWithSavePoint = startWithSavePoint;
+    }
+
     @NonNull public JsonObject getLogicalDagAsJson() {
         JsonObject logicalDag = new JsonObject();
         JsonArray vertices = new JsonArray();
@@ -143,6 +152,8 @@ public class LogicalDag implements 
IdentifiedDataSerializable {
 
         out.writeObject(jobConfig);
         out.writeObject(idGenerator);
+
+        out.writeBoolean(isStartWithSavePoint);
     }
 
     @Override
@@ -165,6 +176,8 @@ public class LogicalDag implements 
IdentifiedDataSerializable {
 
         jobConfig = in.readObject();
         idGenerator = in.readObject();
+
+        isStartWithSavePoint = in.readBoolean();
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index f847c2127b..6a56240b99 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -38,6 +38,7 @@ public class LogicalDagGenerator {
     private List<Action> actions;
     private JobConfig jobConfig;
     private IdGenerator idGenerator;
+    private boolean isStartWithSavePoint;
 
     private final Map<Long, LogicalVertex> logicalVertexMap = new 
LinkedHashMap<>();
 
@@ -51,10 +52,19 @@ public class LogicalDagGenerator {
             @NonNull List<Action> actions,
             @NonNull JobConfig jobConfig,
             @NonNull IdGenerator idGenerator) {
+        this(actions, jobConfig, idGenerator, false);
+    }
+
+    public LogicalDagGenerator(
+            @NonNull List<Action> actions,
+            @NonNull JobConfig jobConfig,
+            @NonNull IdGenerator idGenerator,
+            boolean isStartWithSavePoint) {
         this.actions = actions;
         this.jobConfig = jobConfig;
         this.idGenerator = idGenerator;
-        if (actions.size() <= 0) {
+        this.isStartWithSavePoint = isStartWithSavePoint;
+        if (actions.isEmpty()) {
             throw new IllegalStateException("No actions define in the job. 
Cannot execute.");
         }
     }
@@ -65,6 +75,7 @@ public class LogicalDagGenerator {
         LogicalDag logicalDag = new LogicalDag(jobConfig, idGenerator);
         logicalDag.getEdges().addAll(logicalEdges);
         logicalDag.getLogicalVertexMap().putAll(logicalVertexMap);
+        logicalDag.setStartWithSavePoint(isStartWithSavePoint);
         return logicalDag;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index 9f54db03d5..49c9b9275d 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -146,7 +146,7 @@ public abstract class AbstractJobEnvironment {
     protected abstract MultipleTableJobConfigParser getJobConfigParser();
 
     protected LogicalDagGenerator getLogicalDagGenerator() {
-        return new LogicalDagGenerator(actions, jobConfig, idGenerator);
+        return new LogicalDagGenerator(actions, jobConfig, idGenerator, 
isStartWithSavePoint);
     }
 
     protected abstract LogicalDag getLogicalDag();
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 981b85049a..2ec19cabc9 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -53,21 +53,23 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
-import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
 
 @Data
 public class JobConfigParser {
     private static final ILogger LOGGER = 
Logger.getLogger(JobConfigParser.class);
     private IdGenerator idGenerator;
     private boolean isStartWithSavePoint;
+    private MultipleTableJobConfigParser multipleTableJobConfigParser;
     private List<URL> commonPluginJars;
 
     public JobConfigParser(
             @NonNull IdGenerator idGenerator,
             @NonNull List<URL> commonPluginJars,
+            MultipleTableJobConfigParser multipleTableJobConfigParser,
             boolean isStartWithSavePoint) {
         this.idGenerator = idGenerator;
         this.commonPluginJars = commonPluginJars;
+        this.multipleTableJobConfigParser = multipleTableJobConfigParser;
         this.isStartWithSavePoint = isStartWithSavePoint;
     }
 
@@ -166,7 +168,7 @@ public class JobConfigParser {
         sink.setJobContext(jobConfig.getJobContext());
         sink.setTypeInfo(rowType);
         if (!isStartWithSavePoint) {
-            handleSaveMode(sink);
+            multipleTableJobConfigParser.handleSaveMode(sink);
         }
         final String actionName =
                 createSinkActionName(configIndex, 
tuple.getLeft().getPluginName());
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 6e5fa3decd..c1ff66c0d3 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -146,7 +147,7 @@ public class MultipleTableJobConfigParser {
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.fallbackParser =
-                new JobConfigParser(idGenerator, commonPluginJars, 
isStartWithSavePoint);
+                new JobConfigParser(idGenerator, commonPluginJars, this, 
isStartWithSavePoint);
     }
 
     public MultipleTableJobConfigParser(
@@ -162,7 +163,7 @@ public class MultipleTableJobConfigParser {
         this.seaTunnelJobConfig = seaTunnelJobConfig;
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.fallbackParser =
-                new JobConfigParser(idGenerator, commonPluginJars, 
isStartWithSavePoint);
+                new JobConfigParser(idGenerator, commonPluginJars, this, 
isStartWithSavePoint);
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService 
classLoaderService) {
@@ -268,13 +269,6 @@ public class MultipleTableJobConfigParser {
                 });
     }
 
-    void addCommonPluginJarsToAction(Action action) {
-        action.getJarUrls().addAll(commonPluginJars);
-        if (!action.getUpstream().isEmpty()) {
-            action.getUpstream().forEach(this::addCommonPluginJarsToAction);
-        }
-    }
-
     private void fillJobConfig() {
         
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
         if (StringUtils.isEmpty(jobConfig.getName())
@@ -660,15 +654,21 @@ public class MultipleTableJobConfigParser {
         return sinkAction;
     }
 
-    public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
+    public void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
         if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
             SupportSaveMode saveModeSink = (SupportSaveMode) sink;
-            Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
-            if (saveModeHandler.isPresent()) {
-                try (SaveModeHandler handler = saveModeHandler.get()) {
-                    new SaveModeExecuteWrapper(handler).execute();
-                } catch (Exception e) {
-                    throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+            if (envOptions
+                    .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+                    .equals(SaveModeExecuteLocation.CLIENT)) {
+                log.warn(
+                        "SaveMode execute location on CLIENT is deprecated, 
please use SERVER instead.");
+                Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
+                if (saveModeHandler.isPresent()) {
+                    try (SaveModeHandler handler = saveModeHandler.get()) {
+                        new SaveModeExecuteWrapper(handler).execute();
+                    } catch (Exception e) {
+                        throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                    }
                 }
             }
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2e9e168af2..8f54402d80 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,8 +19,16 @@ package org.apache.seatunnel.engine.server.master;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
@@ -32,7 +40,9 @@ import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -80,11 +90,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 import static org.apache.seatunnel.common.constants.JobMode.BATCH;
 
 public class JobMaster {
@@ -210,11 +222,22 @@ public class JobMaster {
                         nodeEngine.getSerializationService(),
                         classLoader,
                         jobImmutableInformation.getLogicalDag());
-        seaTunnelServer
-                .getClassLoaderService()
-                .releaseClassLoader(
-                        jobImmutableInformation.getJobId(),
-                        jobImmutableInformation.getPluginJarsUrls());
+        if (!restart
+                && !logicalDag.isStartWithSavePoint()
+                && 
ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
+                        .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+                        .equals(SaveModeExecuteLocation.CLUSTER)) {
+            try {
+                Thread.currentThread().setContextClassLoader(classLoader);
+                logicalDag.getLogicalVertexMap().values().stream()
+                        .map(LogicalVertex::getAction)
+                        .filter(action -> action instanceof SinkAction)
+                        .map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink())
+                        .forEach(JobMaster::handleSaveMode);
+            } finally {
+                Thread.currentThread().setContextClassLoader(appClassLoader);
+            }
+        }
 
         final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
                 PlanUtils.fromLogicalDAG(
@@ -228,6 +251,11 @@ public class JobMaster {
                         runningJobStateTimestampsIMap,
                         engineConfig.getQueueType(),
                         engineConfig);
+        seaTunnelServer
+                .getClassLoaderService()
+                .releaseClassLoader(
+                        jobImmutableInformation.getJobId(),
+                        jobImmutableInformation.getPluginJarsUrls());
         // revert to app class loader, it may be changed by 
PlanUtils.fromLogicalDAG
         Thread.currentThread().setContextClassLoader(appClassLoader);
         this.physicalPlan = planTuple.f0();
@@ -333,6 +361,30 @@ public class JobMaster {
         }
     }
 
+    public static void handleSaveMode(SeaTunnelSink sink) {
+        if (sink instanceof SupportSaveMode) {
+            Optional<SaveModeHandler> saveModeHandler =
+                    ((SupportSaveMode) sink).getSaveModeHandler();
+            if (saveModeHandler.isPresent()) {
+                try (SaveModeHandler handler = saveModeHandler.get()) {
+                    new SaveModeExecuteWrapper(handler).execute();
+                } catch (Exception e) {
+                    throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                }
+            }
+        } else if (sink.getClass()
+                .getName()
+                .equals(
+                        
"org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink"))
 {
+            // TODO we should not use class name to judge the sink type
+            Map<String, SeaTunnelSink> sinks =
+                    (Map<String, SeaTunnelSink>) 
ReflectionUtils.getField(sink, "sinks").get();
+            for (SeaTunnelSink seaTunnelSink : sinks.values()) {
+                handleSaveMode(seaTunnelSink);
+            }
+        }
+    }
+
     public void handleCheckpointError(long pipelineId, boolean neverRestore) {
         if (neverRestore) {
             this.neverNeedRestore();


Reply via email to