This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e0e376a90 [Improve][Zeta] Support restore execute savemode (#9059)
5e0e376a90 is described below

commit 5e0e376a90c34e8f8523e627831bceb83db3aa66
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Tue Apr 8 13:32:34 2025 +0800

    [Improve][Zeta] Support restore execute savemode (#9059)
---
 .../seatunnel/api/sink/DefaultSaveModeHandler.java | 10 ++++
 .../apache/seatunnel/api/sink/SaveModeHandler.java |  2 +
 .../api/sink/DefaultSaveModeHandlerTest.java       | 64 ++++++++++++++++++++++
 .../e2e/sink/inmemory/InMemorySaveModeHandler.java |  3 +
 .../core/parse/MultipleTableJobConfigParser.java   | 21 +++++++
 .../seatunnel/engine/server/master/JobMaster.java  | 13 +++--
 6 files changed, 109 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index e34b8ba437..07200b29cb 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -117,6 +117,16 @@ public class DefaultSaveModeHandler implements 
SaveModeHandler {
         }
     }
 
+    @Override
+    public void handleSchemaSaveModeWithRestore() {
+        if (SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST == schemaSaveMode) {
+            errorWhenSchemaNotExist();
+        } else if (SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST == 
schemaSaveMode
+                || SchemaSaveMode.RECREATE_SCHEMA == schemaSaveMode) {
+            createSchemaWhenNotExist();
+        }
+    }
+
     protected void recreateSchema() {
         if (tableExists()) {
             dropTable();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
index 3eddaf0514..5b67f934f0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
@@ -28,6 +28,8 @@ public interface SaveModeHandler extends AutoCloseable {
 
     void handleDataSaveMode();
 
+    void handleSchemaSaveModeWithRestore();
+
     SchemaSaveMode getSchemaSaveMode();
 
     DataSaveMode getDataSaveMode();
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
index 6f4785ce22..56b6b99155 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
@@ -22,15 +22,24 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.InMemoryCatalog;
 import org.apache.seatunnel.api.table.catalog.InMemoryCatalogFactory;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class DefaultSaveModeHandlerTest {
 
@@ -115,6 +124,61 @@ public class DefaultSaveModeHandlerTest {
                 "Should not truncate data for recreated table");
     }
 
+    @Test
+    public void handlesErrorWhenSchemaNotExist() {
+        Catalog catalog = mock(Catalog.class);
+        CatalogTable catalogTable = createCatalogTable("notExistsTable");
+        when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
+        DefaultSaveModeHandler handler =
+                new DefaultSaveModeHandler(
+                        SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST,
+                        DataSaveMode.APPEND_DATA,
+                        catalog,
+                        catalogTable,
+                        null);
+
+        assertThrows(SeaTunnelRuntimeException.class, 
handler::handleSchemaSaveModeWithRestore);
+    }
+
+    @Test
+    public void createsSchemaWhenNotExist() {
+        CatalogTable catalogTable = createCatalogTable("notExistsTable");
+
+        Catalog catalog = mock(Catalog.class);
+        when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
+        DefaultSaveModeHandler handler =
+                new DefaultSaveModeHandler(
+                        SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+                        DataSaveMode.APPEND_DATA,
+                        catalog,
+                        catalogTable,
+                        null);
+
+        handler.handleSchemaSaveModeWithRestore();
+
+        verify(catalog, times(1))
+                .createTable(any(TablePath.class), any(CatalogTable.class), 
eq(true));
+    }
+
+    @Test
+    public void recreatesSchemaWhenNotExist() {
+        CatalogTable catalogTable = createCatalogTable("notExistsTable");
+        Catalog catalog = mock(Catalog.class);
+        when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
+        DefaultSaveModeHandler handler =
+                new DefaultSaveModeHandler(
+                        SchemaSaveMode.RECREATE_SCHEMA,
+                        DataSaveMode.APPEND_DATA,
+                        catalog,
+                        catalogTable,
+                        null);
+
+        handler.handleSchemaSaveModeWithRestore();
+
+        verify(catalog, times(1))
+                .createTable(any(TablePath.class), any(CatalogTable.class), 
eq(true));
+    }
+
     private CatalogTable createCatalogTable(String tableName) {
         return CatalogTableUtil.getCatalogTable("", "st", "public", tableName, 
rowType);
     }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
index 61084819a2..a0645672d9 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
@@ -55,6 +55,9 @@ public class InMemorySaveModeHandler implements 
SaveModeHandler {
         log.info("handle data savemode with table path: {}", 
catalogTable.getTablePath());
     }
 
+    @Override
+    public void handleSchemaSaveModeWithRestore() {}
+
     @Override
     public SchemaSaveMode getSchemaSaveMode() {
         return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST;
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index e3414b8dbd..d2b47f5009 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -690,6 +690,8 @@ public class MultipleTableJobConfigParser {
                         actionConfig);
         if (!isStartWithSavePoint) {
             handleSaveMode(sink);
+        } else {
+            handleSchemaSaveModeWithRestore(sink);
         }
         sinkAction.setParallelism(parallelism);
         return sinkAction;
@@ -716,6 +718,25 @@ public class MultipleTableJobConfigParser {
         }
     }
 
+    public void handleSchemaSaveModeWithRestore(SeaTunnelSink<?, ?, ?, ?> 
sink) {
+        if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
+            SupportSaveMode saveModeSink = (SupportSaveMode) sink;
+            if (envOptions
+                    .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+                    .equals(SaveModeExecuteLocation.CLIENT)) {
+                Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
+                if (saveModeHandler.isPresent()) {
+                    try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.open();
+                        handler.handleSchemaSaveModeWithRestore();
+                    } catch (Exception e) {
+                        throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                    }
+                }
+            }
+        }
+    }
+
     private List<URL> getSourcePluginJarPaths(Config sourceConfig) {
         SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new 
SeaTunnelSourcePluginDiscovery();
         PluginIdentifier pluginIdentifier =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 7881920318..e512a7fbc7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -271,7 +271,8 @@ public class JobMaster {
                                                     
logicalVertexIdClassLoaderMap.get(
                                                             sink.getId()));
                                     JobMaster.handleSaveMode(
-                                            ((SinkAction<?, ?, ?, ?>) 
sink).getSink());
+                                            ((SinkAction<?, ?, ?, ?>) 
sink).getSink(),
+                                            logicalDag.isStartWithSavePoint());
                                 });
                 Thread.currentThread().setContextClassLoader(appClassLoader);
             }
@@ -556,14 +557,18 @@ public class JobMaster {
         }
     }
 
-    public static void handleSaveMode(SeaTunnelSink sink) {
+    public static void handleSaveMode(SeaTunnelSink sink, boolean 
isStartWithSavePoint) {
         if (sink instanceof SupportSaveMode) {
             Optional<SaveModeHandler> saveModeHandler =
                     ((SupportSaveMode) sink).getSaveModeHandler();
             if (saveModeHandler.isPresent()) {
                 try (SaveModeHandler handler = saveModeHandler.get()) {
                     handler.open();
-                    new SaveModeExecuteWrapper(handler).execute();
+                    if (!isStartWithSavePoint) {
+                        new SaveModeExecuteWrapper(handler).execute();
+                    } else {
+                        handler.handleSchemaSaveModeWithRestore();
+                    }
                 } catch (Exception e) {
                     throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                 }
@@ -571,7 +576,7 @@ public class JobMaster {
         } else if (sink instanceof MultiTableSink) {
             Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
             for (SeaTunnelSink seaTunnelSink : sinks.values()) {
-                handleSaveMode(seaTunnelSink);
+                handleSaveMode(seaTunnelSink, isStartWithSavePoint);
             }
         }
     }

Reply via email to