This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 5e0e376a90 [Improve][Zeta] Support restore execute savemode (#9059) 5e0e376a90 is described below commit 5e0e376a90c34e8f8523e627831bceb83db3aa66 Author: hailin0 <wanghai...@apache.org> AuthorDate: Tue Apr 8 13:32:34 2025 +0800 [Improve][Zeta] Support restore execute savemode (#9059) --- .../seatunnel/api/sink/DefaultSaveModeHandler.java | 10 ++++ .../apache/seatunnel/api/sink/SaveModeHandler.java | 2 + .../api/sink/DefaultSaveModeHandlerTest.java | 64 ++++++++++++++++++++++ .../e2e/sink/inmemory/InMemorySaveModeHandler.java | 3 + .../core/parse/MultipleTableJobConfigParser.java | 21 +++++++ .../seatunnel/engine/server/master/JobMaster.java | 13 +++-- 6 files changed, 109 insertions(+), 4 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java index e34b8ba437..07200b29cb 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java @@ -117,6 +117,16 @@ public class DefaultSaveModeHandler implements SaveModeHandler { } } + @Override + public void handleSchemaSaveModeWithRestore() { + if (SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST == schemaSaveMode) { + errorWhenSchemaNotExist(); + } else if (SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST == schemaSaveMode + || SchemaSaveMode.RECREATE_SCHEMA == schemaSaveMode) { + createSchemaWhenNotExist(); + } + } + protected void recreateSchema() { if (tableExists()) { dropTable(); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java index 3eddaf0514..5b67f934f0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java @@ -28,6 +28,8 @@ public interface SaveModeHandler extends AutoCloseable { void handleDataSaveMode(); + void handleSchemaSaveModeWithRestore(); + SchemaSaveMode getSchemaSaveMode(); DataSaveMode getDataSaveMode(); diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java index 6f4785ce22..56b6b99155 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java @@ -22,15 +22,24 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.InMemoryCatalog; import org.apache.seatunnel.api.table.catalog.InMemoryCatalogFactory; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class DefaultSaveModeHandlerTest { @@ -115,6 +124,61 @@ public class DefaultSaveModeHandlerTest { "Should not truncate data for recreated table"); } + @Test + public void handlesErrorWhenSchemaNotExist() { + Catalog catalog = mock(Catalog.class); + CatalogTable catalogTable = createCatalogTable("notExistsTable"); + when(catalog.tableExists(any(TablePath.class))).thenReturn(false); + DefaultSaveModeHandler handler = + new DefaultSaveModeHandler( + SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST, + DataSaveMode.APPEND_DATA, + catalog, + catalogTable, + null); + + assertThrows(SeaTunnelRuntimeException.class, handler::handleSchemaSaveModeWithRestore); + } + + @Test + public void createsSchemaWhenNotExist() { + CatalogTable catalogTable = createCatalogTable("notExistsTable"); + + Catalog catalog = mock(Catalog.class); + when(catalog.tableExists(any(TablePath.class))).thenReturn(false); + DefaultSaveModeHandler handler = + new DefaultSaveModeHandler( + SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST, + DataSaveMode.APPEND_DATA, + catalog, + catalogTable, + null); + + handler.handleSchemaSaveModeWithRestore(); + + verify(catalog, times(1)) + .createTable(any(TablePath.class), any(CatalogTable.class), eq(true)); + } + + @Test + public void recreatesSchemaWhenNotExist() { + CatalogTable catalogTable = createCatalogTable("notExistsTable"); + Catalog catalog = mock(Catalog.class); + when(catalog.tableExists(any(TablePath.class))).thenReturn(false); + DefaultSaveModeHandler handler = + new DefaultSaveModeHandler( + SchemaSaveMode.RECREATE_SCHEMA, + DataSaveMode.APPEND_DATA, + catalog, + catalogTable, + null); + + handler.handleSchemaSaveModeWithRestore(); + + verify(catalog, times(1)) + .createTable(any(TablePath.class), any(CatalogTable.class), eq(true)); + } + private CatalogTable createCatalogTable(String tableName) { return CatalogTableUtil.getCatalogTable("", "st", "public", tableName, rowType); } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java index 61084819a2..a0645672d9 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java @@ -55,6 +55,9 @@ public class InMemorySaveModeHandler implements SaveModeHandler { log.info("handle data savemode with table path: {}", catalogTable.getTablePath()); } + @Override + public void handleSchemaSaveModeWithRestore() {} + @Override public SchemaSaveMode getSchemaSaveMode() { return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index e3414b8dbd..d2b47f5009 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -690,6 +690,8 @@ public class MultipleTableJobConfigParser { actionConfig); if (!isStartWithSavePoint) { handleSaveMode(sink); + } else { + handleSchemaSaveModeWithRestore(sink); } sinkAction.setParallelism(parallelism); return sinkAction; @@ -716,6 +718,25 @@ public class MultipleTableJobConfigParser { } } + public void handleSchemaSaveModeWithRestore(SeaTunnelSink<?, ?, ?, ?> sink) { + if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) { + SupportSaveMode saveModeSink = (SupportSaveMode) sink; + if (envOptions + .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) + .equals(SaveModeExecuteLocation.CLIENT)) { + Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler(); + if (saveModeHandler.isPresent()) { + try (SaveModeHandler handler = saveModeHandler.get()) { + handler.open(); + handler.handleSchemaSaveModeWithRestore(); + } catch (Exception e) { + throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); + } + } + } + } + } + private List<URL> getSourcePluginJarPaths(Config sourceConfig) { SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery(); PluginIdentifier pluginIdentifier = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 7881920318..e512a7fbc7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -271,7 +271,8 @@ public class JobMaster { logicalVertexIdClassLoaderMap.get( sink.getId())); JobMaster.handleSaveMode( - ((SinkAction<?, ?, ?, ?>) sink).getSink()); + ((SinkAction<?, ?, ?, ?>) sink).getSink(), + logicalDag.isStartWithSavePoint()); }); Thread.currentThread().setContextClassLoader(appClassLoader); } @@ -556,14 +557,18 @@ public class JobMaster { } } - public static void handleSaveMode(SeaTunnelSink sink) { + public static void handleSaveMode(SeaTunnelSink sink, boolean isStartWithSavePoint) { if (sink instanceof SupportSaveMode) { Optional<SaveModeHandler> saveModeHandler = ((SupportSaveMode) sink).getSaveModeHandler(); if (saveModeHandler.isPresent()) { try (SaveModeHandler handler = saveModeHandler.get()) { handler.open(); - new SaveModeExecuteWrapper(handler).execute(); + if (!isStartWithSavePoint) { + new SaveModeExecuteWrapper(handler).execute(); + } else { + handler.handleSchemaSaveModeWithRestore(); + } } catch (Exception e) { throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); } @@ -571,7 +576,7 @@ public class JobMaster { } else if (sink instanceof MultiTableSink) { Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks(); for (SeaTunnelSink seaTunnelSink : sinks.values()) { - handleSaveMode(seaTunnelSink); + handleSaveMode(seaTunnelSink, isStartWithSavePoint); } } }