This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 6b6d8fae54 [Improve] re-struct Zeta Engine config options (#8741) 6b6d8fae54 is described below commit 6b6d8fae54a62fd36f79b1eaeb4fbc1afe45cefa Author: Jarvis <jar...@apache.org> AuthorDate: Wed Feb 19 13:52:43 2025 +0800 [Improve] re-struct Zeta Engine config options (#8741) --- .../engine/common/config/EngineConfig.java | 48 +- .../config/YamlSeaTunnelDomConfigProcessor.java | 221 +++++--- .../common/config/server/CheckpointConfig.java | 12 +- .../config/server/CheckpointStorageConfig.java | 6 +- .../config/server/ConnectorJarHAStorageConfig.java | 4 +- .../config/server/ConnectorJarStorageConfig.java | 17 +- .../config/server/CoordinatorServiceConfig.java | 14 +- .../engine/common/config/server/HttpConfig.java | 15 +- .../common/config/server/ServerConfigOptions.java | 560 +++++++++++---------- .../common/config/server/SlotServiceConfig.java | 10 +- .../server/service/jar/StorageStrategyFactory.java | 2 +- .../server/utils/SystemLoadCalculateTest.java | 8 +- 12 files changed, 552 insertions(+), 365 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index dd043aba55..8e0be8b83e 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -41,35 +41,44 @@ import static com.hazelcast.internal.util.Preconditions.checkPositive; @Data public class EngineConfig { - private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue(); + private int backupCount = + ServerConfigOptions.MasterServerConfigOptions.BACKUP_COUNT.defaultValue(); private int printExecutionInfoInterval = - ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL + .defaultValue(); private int printJobMetricsInfoInterval = - ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + .defaultValue(); private int jobMetricsBackupInterval = - ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL + .defaultValue(); private ThreadShareMode taskExecutionThreadShareMode = - ServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE.defaultValue(); + ServerConfigOptions.WorkerServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE + .defaultValue(); - private SlotServiceConfig slotServiceConfig = ServerConfigOptions.SLOT_SERVICE.defaultValue(); + private SlotServiceConfig slotServiceConfig = + ServerConfigOptions.WorkerServerConfigOptions.SLOT_SERVICE.defaultValue(); - private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue(); + private CheckpointConfig checkpointConfig = + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT.defaultValue(); private CoordinatorServiceConfig coordinatorServiceConfig = - ServerConfigOptions.COORDINATOR_SERVICE.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.COORDINATOR_SERVICE.defaultValue(); private ConnectorJarStorageConfig connectorJarStorageConfig = - ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG + .defaultValue(); private boolean classloaderCacheMode = ServerConfigOptions.CLASSLOADER_CACHE_MODE.defaultValue(); - private QueueType queueType = ServerConfigOptions.QUEUE_TYPE.defaultValue(); + private QueueType queueType = + ServerConfigOptions.WorkerServerConfigOptions.QUEUE_TYPE.defaultValue(); private int historyJobExpireMinutes = - ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue(); private ClusterRole clusterRole = ClusterRole.MASTER_AND_WORKER; @@ -81,9 +90,10 @@ public class EngineConfig { private TelemetryConfig telemetryConfig = ServerConfigOptions.TELEMETRY.defaultValue(); private ScheduleStrategy scheduleStrategy = - ServerConfigOptions.JOB_SCHEDULE_STRATEGY.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.JOB_SCHEDULE_STRATEGY.defaultValue(); - private HttpConfig httpConfig = ServerConfigOptions.HTTP.defaultValue(); + private HttpConfig httpConfig = + ServerConfigOptions.MasterServerConfigOptions.HTTP.defaultValue(); public void setBackupCount(int newBackupCount) { checkBackupCount(newBackupCount, 0); @@ -97,21 +107,24 @@ public class EngineConfig { public void setPrintExecutionInfoInterval(int printExecutionInfoInterval) { checkPositive( printExecutionInfoInterval, - ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL + " must be > 0"); + ServerConfigOptions.MasterServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL + + " must be > 0"); this.printExecutionInfoInterval = printExecutionInfoInterval; } public void setPrintJobMetricsInfoInterval(int printJobMetricsInfoInterval) { checkPositive( printJobMetricsInfoInterval, - ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0"); + ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + + " must be > 0"); this.printJobMetricsInfoInterval = printJobMetricsInfoInterval; } public void setJobMetricsBackupInterval(int jobMetricsBackupInterval) { checkPositive( jobMetricsBackupInterval, - ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL + " must be > 0"); + ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL + + " must be > 0"); this.jobMetricsBackupInterval = jobMetricsBackupInterval; } @@ -123,7 +136,8 @@ public class EngineConfig { public void setHistoryJobExpireMinutes(int historyJobExpireMinutes) { checkPositive( historyJobExpireMinutes, - ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES + " must be > 0"); + ServerConfigOptions.MasterServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES + + " must be > 0"); this.historyJobExpireMinutes = historyJobExpireMinutes; } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 4c4d62132c..7b4c968dd2 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -96,12 +96,16 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor SlotServiceConfig slotServiceConfig = new SlotServiceConfig(); for (Node node : childElements(slotServiceNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.DYNAMIC_SLOT.key().equals(name)) { + if (ServerConfigOptions.WorkerServerConfigOptions.DYNAMIC_SLOT.key().equals(name)) { slotServiceConfig.setDynamicSlot(getBooleanValue(getTextContent(node))); - } else if (ServerConfigOptions.SLOT_NUM.key().equals(name)) { + } else if (ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM.key().equals(name)) { slotServiceConfig.setSlotNum( - getIntegerValue(ServerConfigOptions.SLOT_NUM.key(), getTextContent(node))); - } else if (ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.key().equals(name)) { + getIntegerValue( + ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM.key(), + getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY + .key() + .equals(name)) { slotServiceConfig.setAllocateStrategy( AllocateStrategy.valueOf(getTextContent(node).toUpperCase())); } else { @@ -115,14 +119,18 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor CoordinatorServiceConfig coordinatorServiceConfig = new CoordinatorServiceConfig(); for (Node node : childElements(coordinatorServiceNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) { + if (ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) { coordinatorServiceConfig.setMaxThreadNum( getIntegerValue( - ServerConfigOptions.MAX_THREAD_NUM.key(), getTextContent(node))); - } else if (ServerConfigOptions.CORE_THREAD_NUM.key().equals(name)) { + ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM.key(), + getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM + .key() + .equals(name)) { coordinatorServiceConfig.setCoreThreadNum( getIntegerValue( - ServerConfigOptions.CORE_THREAD_NUM.key(), getTextContent(node))); + ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM.key(), + getTextContent(node))); } else { LOGGER.warning("Unrecognized element: " + name); } @@ -134,56 +142,91 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor final EngineConfig engineConfig = config.getEngineConfig(); for (Node node : childElements(engineNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.BACKUP_COUNT.key().equals(name)) { + if (ServerConfigOptions.MasterServerConfigOptions.BACKUP_COUNT.key().equals(name)) { engineConfig.setBackupCount( getIntegerValue( - ServerConfigOptions.BACKUP_COUNT.key(), getTextContent(node))); - } else if (ServerConfigOptions.QUEUE_TYPE.key().equals(name)) { + ServerConfigOptions.MasterServerConfigOptions.BACKUP_COUNT.key(), + getTextContent(node))); + } else if (ServerConfigOptions.WorkerServerConfigOptions.QUEUE_TYPE + .key() + .equals(name)) { engineConfig.setQueueType( QueueType.valueOf(getTextContent(node).toUpperCase(Locale.ROOT))); - } else if (ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL + .key() + .equals(name)) { engineConfig.setPrintExecutionInfoInterval( getIntegerValue( - ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(), + ServerConfigOptions.MasterServerConfigOptions + .PRINT_EXECUTION_INFO_INTERVAL + .key(), getTextContent(node))); - } else if (ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + .key() + .equals(name)) { engineConfig.setPrintJobMetricsInfoInterval( getIntegerValue( - ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(), + ServerConfigOptions.MasterServerConfigOptions + .PRINT_JOB_METRICS_INFO_INTERVAL + .key(), getTextContent(node))); - } else if (ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL + .key() + .equals(name)) { engineConfig.setJobMetricsBackupInterval( getIntegerValue( - ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key(), + ServerConfigOptions.MasterServerConfigOptions + .JOB_METRICS_BACKUP_INTERVAL + .key(), getTextContent(node))); - } else if (ServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE.key().equals(name)) { + } else if (ServerConfigOptions.WorkerServerConfigOptions + .TASK_EXECUTION_THREAD_SHARE_MODE + .key() + .equals(name)) { String mode = getTextContent(node).toUpperCase(Locale.ROOT); if (!Arrays.asList("ALL", "OFF", "PART").contains(mode)) { throw new IllegalArgumentException( - ServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE + ServerConfigOptions.WorkerServerConfigOptions + .TASK_EXECUTION_THREAD_SHARE_MODE + " must in [ALL, OFF, PART]"); } engineConfig.setTaskExecutionThreadShareMode(ThreadShareMode.valueOf(mode)); - } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) { + } else if (ServerConfigOptions.WorkerServerConfigOptions.SLOT_SERVICE + .key() + .equals(name)) { engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node)); - } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT + .key() + .equals(name)) { engineConfig.setCheckpointConfig(parseCheckpointConfig(node)); - } else if (ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES + .key() + .equals(name)) { engineConfig.setHistoryJobExpireMinutes( getIntegerValue( - ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key(), + ServerConfigOptions.MasterServerConfigOptions + .HISTORY_JOB_EXPIRE_MINUTES + .key(), getTextContent(node))); - } else if (ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG + .key() + .equals(name)) { engineConfig.setConnectorJarStorageConfig(parseConnectorJarStorageConfig(node)); } else if (ServerConfigOptions.CLASSLOADER_CACHE_MODE.key().equals(name)) { engineConfig.setClassloaderCacheMode(getBooleanValue(getTextContent(node))); - } else if (ServerConfigOptions.EVENT_REPORT_HTTP.equalsIgnoreCase(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.EVENT_REPORT_HTTP + .equalsIgnoreCase(name)) { NamedNodeMap attributes = node.getAttributes(); - Node urlNode = attributes.getNamedItem(ServerConfigOptions.EVENT_REPORT_HTTP_URL); + Node urlNode = + attributes.getNamedItem( + ServerConfigOptions.MasterServerConfigOptions + .EVENT_REPORT_HTTP_URL); if (urlNode != null) { engineConfig.setEventReportHttpApi(getTextContent(urlNode)); Node headersNode = - attributes.getNamedItem(ServerConfigOptions.EVENT_REPORT_HTTP_HEADERS); + attributes.getNamedItem( + ServerConfigOptions.MasterServerConfigOptions + .EVENT_REPORT_HTTP_HEADERS); if (headersNode != null) { Map<String, String> headers = new LinkedHashMap<>(); NodeList nodeList = headersNode.getChildNodes(); @@ -196,12 +239,16 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor } } else if (ServerConfigOptions.TELEMETRY.key().equals(name)) { engineConfig.setTelemetryConfig(parseTelemetryConfig(node)); - } else if (ServerConfigOptions.JOB_SCHEDULE_STRATEGY.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.JOB_SCHEDULE_STRATEGY + .key() + .equals(name)) { engineConfig.setScheduleStrategy( ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT))); - } else if (ServerConfigOptions.HTTP.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.HTTP.key().equals(name)) { engineConfig.setHttpConfig(parseHttpConfig(node)); - } else if (ServerConfigOptions.COORDINATOR_SERVICE.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.COORDINATOR_SERVICE + .key() + .equals(name)) { engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node)); } else { LOGGER.warning("Unrecognized element: " + name); @@ -219,22 +266,35 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor CheckpointConfig checkpointConfig = new CheckpointConfig(); for (Node node : childElements(checkpointNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.CHECKPOINT_INTERVAL.key().equals(name)) { + if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL + .key() + .equals(name)) { checkpointConfig.setCheckpointInterval( getIntegerValue( - ServerConfigOptions.CHECKPOINT_INTERVAL.key(), + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL + .key(), getTextContent(node))); - } else if (ServerConfigOptions.CHECKPOINT_TIMEOUT.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT + .key() + .equals(name)) { checkpointConfig.setCheckpointTimeout( getIntegerValue( - ServerConfigOptions.CHECKPOINT_TIMEOUT.key(), + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT + .key(), getTextContent(node))); - } else if (ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions + .SCHEMA_CHANGE_CHECKPOINT_TIMEOUT + .key() + .equals(name)) { checkpointConfig.setSchemaChangeCheckpointTimeout( getIntegerValue( - ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(), + ServerConfigOptions.MasterServerConfigOptions + .SCHEMA_CHANGE_CHECKPOINT_TIMEOUT + .key(), getTextContent(node))); - } else if (ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE + .key() + .equals(name)) { checkpointConfig.setStorage(parseCheckpointStorageConfig(node)); } else { LOGGER.warning("Unrecognized element: " + name); @@ -248,14 +308,23 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor CheckpointStorageConfig checkpointStorageConfig = new CheckpointStorageConfig(); for (Node node : childElements(checkpointStorageConfigNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.key().equals(name)) { + if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_TYPE + .key() + .equals(name)) { checkpointStorageConfig.setStorage(getTextContent(node)); - } else if (ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED + .key() + .equals(name)) { checkpointStorageConfig.setMaxRetainedCheckpoints( getIntegerValue( - ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key(), + ServerConfigOptions.MasterServerConfigOptions + .CHECKPOINT_STORAGE_MAX_RETAINED + .key(), getTextContent(node))); - } else if (ServerConfigOptions.CHECKPOINT_STORAGE_PLUGIN_CONFIG.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions + .CHECKPOINT_STORAGE_PLUGIN_CONFIG + .key() + .equals(name)) { Map<String, String> pluginConfig = parseCheckpointPluginConfig(node); checkpointStorageConfig.setStoragePluginConfig(pluginConfig); } else { @@ -285,30 +354,47 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor ConnectorJarStorageConfig connectorJarStorageConfig = new ConnectorJarStorageConfig(); for (Node node : childElements(connectorJarStorageConfigNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE.key().equals(name)) { + if (ServerConfigOptions.MasterServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE + .key() + .equals(name)) { connectorJarStorageConfig.setEnable(getBooleanValue(getTextContent(node))); - } else if (ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE + .key() + .equals(name)) { String mode = getTextContent(node).toUpperCase(); if (StringUtils.isNotBlank(mode) && !Arrays.asList("SHARED", "ISOLATED").contains(mode)) { throw new IllegalArgumentException( - ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE + " must in [SHARED, ISOLATED]"); } connectorJarStorageConfig.setStorageMode(ConnectorJarStorageMode.valueOf(mode)); - } else if (ServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH + .key() + .equals(name)) { connectorJarStorageConfig.setStoragePath(getTextContent(node)); - } else if (ServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions + .CONNECTOR_JAR_CLEANUP_TASK_INTERVAL + .key() + .equals(name)) { connectorJarStorageConfig.setCleanupTaskInterval( getIntegerValue( - ServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL.key(), + ServerConfigOptions.MasterServerConfigOptions + .CONNECTOR_JAR_CLEANUP_TASK_INTERVAL + .key(), getTextContent(node))); - } else if (ServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME + .key() + .equals(name)) { connectorJarStorageConfig.setConnectorJarExpiryTime( getIntegerValue( - ServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.key(), + ServerConfigOptions.MasterServerConfigOptions + .CONNECTOR_JAR_EXPIRY_TIME + .key(), getTextContent(node))); - } else if (ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG + .key() + .equals(name)) { connectorJarStorageConfig.setConnectorJarHAStorageConfig( parseConnectorJarHAStorageConfig(node)); } else { @@ -323,16 +409,20 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor ConnectorJarHAStorageConfig connectorJarHAStorageConfig = new ConnectorJarHAStorageConfig(); for (Node node : childElements(connectorJarHAStorageConfigNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE.key().equals(name)) { + if (ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE + .key() + .equals(name)) { String type = getTextContent(node); if (StringUtils.isNotBlank(type) && !Arrays.asList("localfile", "hdfs").contains(type)) { throw new IllegalArgumentException( - ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE + ServerConfigOptions.MasterServerConfigOptions + .CONNECTOR_JAR_HA_STORAGE_TYPE + " must in [localfile, hdfs]"); } connectorJarHAStorageConfig.setType(type); - } else if (ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG + } else if (ServerConfigOptions.MasterServerConfigOptions + .CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG .key() .equals(name)) { Map<String, String> connectorJarHAStoragePluginConfig = @@ -404,19 +494,30 @@ public class YamlSeaTunnelDomConfigProcessor extends AbstractDomConfigProcessor HttpConfig httpConfig = new HttpConfig(); for (Node node : childElements(httpNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.PORT.key().equals(name)) { + if (ServerConfigOptions.MasterServerConfigOptions.PORT.key().equals(name)) { httpConfig.setPort( - getIntegerValue(ServerConfigOptions.PORT.key(), getTextContent(node))); - } else if (ServerConfigOptions.CONTEXT_PATH.key().equals(name)) { + getIntegerValue( + ServerConfigOptions.MasterServerConfigOptions.PORT.key(), + getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.CONTEXT_PATH + .key() + .equals(name)) { httpConfig.setContextPath(getTextContent(node)); - } else if (ServerConfigOptions.ENABLE_HTTP.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.ENABLE_HTTP + .key() + .equals(name)) { httpConfig.setEnabled(getBooleanValue(getTextContent(node))); - } else if (ServerConfigOptions.ENABLE_DYNAMIC_PORT.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.ENABLE_DYNAMIC_PORT + .key() + .equals(name)) { httpConfig.setEnableDynamicPort(getBooleanValue(getTextContent(node))); - } else if (ServerConfigOptions.PORT_RANGE.key().equals(name)) { + } else if (ServerConfigOptions.MasterServerConfigOptions.PORT_RANGE + .key() + .equals(name)) { httpConfig.setPortRange( getIntegerValue( - ServerConfigOptions.PORT_RANGE.key(), getTextContent(node))); + ServerConfigOptions.MasterServerConfigOptions.PORT_RANGE.key(), + getTextContent(node))); } else { LOGGER.warning("Unrecognized element: " + name); } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java index 76fb99f4d8..2a1c959704 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java @@ -28,12 +28,16 @@ public class CheckpointConfig implements Serializable { public static final long MINIMAL_CHECKPOINT_TIME = 10; - private long checkpointInterval = ServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue(); - private long checkpointTimeout = ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue(); + private long checkpointInterval = + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue(); + private long checkpointTimeout = + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue(); private long schemaChangeCheckpointTimeout = - ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT + .defaultValue(); - private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue(); + private CheckpointStorageConfig storage = + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE.defaultValue(); private boolean checkpointEnable = true; diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java index b6b4983d85..61dde630c8 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java @@ -25,10 +25,12 @@ import java.util.Map; @Data public class CheckpointStorageConfig { - private String storage = ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue(); + private String storage = + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue(); private int maxRetainedCheckpoints = - ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED + .defaultValue(); /** Storage plugin instance configuration */ private Map<String, String> storagePluginConfig = new HashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java index 811d6cdc6a..86593e33c0 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java @@ -25,7 +25,9 @@ import java.util.Map; @Data public class ConnectorJarHAStorageConfig { - private String type = ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE.defaultValue(); + private String type = + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE + .defaultValue(); /** Storage plugin instance configuration */ private Map<String, String> storagePluginConfig = new HashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java index 4048ddb859..d83568195c 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java @@ -23,21 +23,26 @@ import static com.hazelcast.internal.util.Preconditions.checkNotNull; @Data public class ConnectorJarStorageConfig { - private Boolean enable = ServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE.defaultValue(); + private Boolean enable = + ServerConfigOptions.MasterServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE + .defaultValue(); private ConnectorJarStorageMode storageMode = - ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE.defaultValue(); - private String storagePath = ServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH.defaultValue(); + private String storagePath = + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH.defaultValue(); private Integer cleanupTaskInterval = - ServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL + .defaultValue(); private Integer connectorJarExpiryTime = - ServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.defaultValue(); private ConnectorJarHAStorageConfig connectorJarHAStorageConfig = - ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG + .defaultValue(); public ConnectorJarStorageConfig setStorageMode(ConnectorJarStorageMode storageMode) { checkNotNull(storageMode); diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java index 3ed455e373..fb8316f2f5 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java @@ -26,17 +26,23 @@ import static com.hazelcast.internal.util.Preconditions.checkPositive; @Data public class CoordinatorServiceConfig implements Serializable { - private int coreThreadNum = ServerConfigOptions.CORE_THREAD_NUM.defaultValue(); + private int coreThreadNum = + ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM.defaultValue(); - private int maxThreadNum = ServerConfigOptions.MAX_THREAD_NUM.defaultValue(); + private int maxThreadNum = + ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM.defaultValue(); public void setCoreThreadNum(int coreThreadNum) { - checkPositive(coreThreadNum, ServerConfigOptions.CORE_THREAD_NUM + " must be >= 0"); + checkPositive( + coreThreadNum, + ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM + " must be >= 0"); this.coreThreadNum = coreThreadNum; } public void setMaxThreadNum(int maxThreadNum) { - checkPositive(maxThreadNum, ServerConfigOptions.MAX_THREAD_NUM + " must be > 0"); + checkPositive( + maxThreadNum, + ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM + " must be > 0"); this.maxThreadNum = maxThreadNum; } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java index d0227263a5..4aaabc49b1 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java @@ -26,18 +26,21 @@ import static com.hazelcast.internal.util.Preconditions.checkPositive; @Data public class HttpConfig implements Serializable { - private boolean enabled = ServerConfigOptions.ENABLE_HTTP.defaultValue(); + private boolean enabled = + ServerConfigOptions.MasterServerConfigOptions.ENABLE_HTTP.defaultValue(); - private int port = ServerConfigOptions.PORT.defaultValue(); + private int port = ServerConfigOptions.MasterServerConfigOptions.PORT.defaultValue(); - private String contextPath = ServerConfigOptions.CONTEXT_PATH.defaultValue(); + private String contextPath = + ServerConfigOptions.MasterServerConfigOptions.CONTEXT_PATH.defaultValue(); - private boolean enableDynamicPort = ServerConfigOptions.ENABLE_DYNAMIC_PORT.defaultValue(); + private boolean enableDynamicPort = + ServerConfigOptions.MasterServerConfigOptions.ENABLE_DYNAMIC_PORT.defaultValue(); - private int portRange = ServerConfigOptions.PORT_RANGE.defaultValue(); + private int portRange = ServerConfigOptions.MasterServerConfigOptions.PORT_RANGE.defaultValue(); public void setPort(int port) { - checkPositive(port, ServerConfigOptions.HTTP + " must be > 0"); + checkPositive(port, ServerConfigOptions.MasterServerConfigOptions.HTTP + " must be > 0"); this.port = port; } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 119d8c9c61..6aab8dfa79 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -26,210 +26,6 @@ import java.util.Map; public class ServerConfigOptions { - public static final Option<Integer> BACKUP_COUNT = - Options.key("backup-count") - .intType() - .defaultValue(1) - .withDescription("The number of backup copies of each partition."); - - public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL = - Options.key("print-execution-info-interval") - .intType() - .defaultValue(60) - .withDescription( - "The interval (in seconds) between two consecutive executions of the print execution info task."); - - public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL = - Options.key("print-job-metrics-info-interval") - .intType() - .defaultValue(60) - .withDescription("The interval (in seconds) of job print metrics info"); - - public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL = - Options.key("job-metrics-backup-interval") - .intType() - .defaultValue(10) - .withDescription("The interval (in seconds) of job metrics backups"); - - public static final Option<ThreadShareMode> TASK_EXECUTION_THREAD_SHARE_MODE = - Options.key("task_execution_thread_share_mode") - .type(new TypeReference<ThreadShareMode>() {}) - .defaultValue(ThreadShareMode.OFF) - .withDescription( - "The thread sharing mode of TaskExecutionServer, including ALL, OFF, PART. Default is OFF"); - - public static final Option<Boolean> DYNAMIC_SLOT = - Options.key("dynamic-slot") - .booleanType() - .defaultValue(true) - .withDescription("Whether to use dynamic slot."); - - public static final Option<Integer> SLOT_NUM = - Options.key("slot-num") - .intType() - .defaultValue(2) - .withDescription( - "The number of slots. Only valid when dynamic slot is disabled."); - - public static final Option<Integer> CHECKPOINT_INTERVAL = - Options.key("interval") - .intType() - .defaultValue(300000) - .withDescription( - "The interval (in milliseconds) between two consecutive checkpoints."); - - public static final Option<Integer> CHECKPOINT_TIMEOUT = - Options.key("timeout") - .intType() - .defaultValue(30000) - .withDescription("The timeout (in milliseconds) for a checkpoint."); - - public static final Option<Integer> SCHEMA_CHANGE_CHECKPOINT_TIMEOUT = - Options.key("schema-change-timeout") - .intType() - .defaultValue(30000) - .withDescription( - "The timeout (in milliseconds) for a schema change checkpoint."); - - public static final Option<String> CHECKPOINT_STORAGE_TYPE = - Options.key("type") - .stringType() - .defaultValue("localfile") - .withDescription("The checkpoint storage type."); - - public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED = - Options.key("max-retained") - .intType() - .defaultValue(20) - .withDescription("The maximum number of retained checkpoints."); - - public static final Option<QueueType> QUEUE_TYPE = - Options.key("queue-type") - .type(new TypeReference<QueueType>() {}) - .defaultValue(QueueType.BLOCKINGQUEUE) - .withDescription("The internal data cache queue type."); - - public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE = - Options.key("storage") - .type(new TypeReference<CheckpointStorageConfig>() {}) - .defaultValue(new CheckpointStorageConfig()) - .withDescription("The checkpoint storage configuration."); - - public static final Option<AllocateStrategy> SLOT_ALLOCATE_STRATEGY = - Options.key("slot-allocate-strategy") - .enumType(AllocateStrategy.class) - .defaultValue(AllocateStrategy.RANDOM) - .withDescription( - "When the strategy is SLOT_RATIO, the system allocates tasks based on the slot usage ratio, with priority given to workers with low usage rates; When the strategy is SYSTEM_LOAD, the system allocates tasks based on server load, with priority given to workers with lower load."); - - public static final Option<SlotServiceConfig> SLOT_SERVICE = - Options.key("slot-service") - .type(new TypeReference<SlotServiceConfig>() {}) - .defaultValue(new SlotServiceConfig()) - .withDescription("The slot service configuration."); - - public static final Option<CheckpointConfig> CHECKPOINT = - Options.key("checkpoint") - .type(new TypeReference<CheckpointConfig>() {}) - .defaultValue(new CheckpointConfig()) - .withDescription("The checkpoint configuration."); - - public static final Option<Map<String, String>> CHECKPOINT_STORAGE_PLUGIN_CONFIG = - Options.key("plugin-config") - .type(new TypeReference<Map<String, String>>() {}) - .noDefaultValue() - .withDescription("The checkpoint storage instance configuration."); - - public static final Option<Integer> CORE_THREAD_NUM = - Options.key("core-thread-num") - .intType() - .defaultValue(10) - .withDescription("The core thread num of coordinator service"); - - public static final Option<Integer> MAX_THREAD_NUM = - Options.key("max-thread-num") - .intType() - .defaultValue(Integer.MAX_VALUE) - .withDescription("The max thread num of coordinator service"); - - public static final Option<CoordinatorServiceConfig> COORDINATOR_SERVICE = - Options.key("coordinator-service") - .type(new TypeReference<CoordinatorServiceConfig>() {}) - .defaultValue(new CoordinatorServiceConfig()) - .withDescription("The coordinator service configuration."); - - public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES = - Options.key("history-job-expire-minutes") - .intType() - .defaultValue(1440) - .withDescription("The expire time of history jobs.time unit minute"); - - public static final Option<ScheduleStrategy> JOB_SCHEDULE_STRATEGY = - Options.key("job-schedule-strategy") - .enumType(ScheduleStrategy.class) - .defaultValue(ScheduleStrategy.REJECT) - .withDescription( - "When the policy is REJECT, when the task queue is full, the task will be rejected; when the policy is WAIT, when the task queue is full, the task will wait"); - - public static final Option<Boolean> ENABLE_CONNECTOR_JAR_STORAGE = - Options.key("enable") - .booleanType() - .defaultValue(Boolean.FALSE) - .withDescription( - "Enable the engine server Jar package storage service," - + " automatically upload connector Jar packages and dependent third-party Jar packages" - + " to the server before job execution." - + " Enabling this configuration does not require the server to hold all connector Jar packages"); - - public static final Option<ConnectorJarStorageMode> CONNECTOR_JAR_STORAGE_MODE = - Options.key("connector-jar-storage-mode") - .enumType(ConnectorJarStorageMode.class) - .defaultValue(ConnectorJarStorageMode.SHARED) - .withDescription( - "The storage mode of the connector jar package, including SHARED, ISOLATED. Default is SHARED"); - - public static final Option<String> CONNECTOR_JAR_STORAGE_PATH = - Options.key("connector-jar-storage-path") - .stringType() - .defaultValue("") - .withDescription("The user defined connector jar storage path."); - - public static final Option<Integer> CONNECTOR_JAR_CLEANUP_TASK_INTERVAL = - Options.key("connector-jar-cleanup-task-interval") - .intType() - .defaultValue(3600) - .withDescription("The user defined connector jar cleanup task interval."); - - public static final Option<Integer> CONNECTOR_JAR_EXPIRY_TIME = - Options.key("connector-jar-expiry-time") - .intType() - .defaultValue(600) - .withDescription("The user defined connector jar expiry time."); - - public static final Option<String> CONNECTOR_JAR_HA_STORAGE_TYPE = - Options.key("type") - .stringType() - .defaultValue("localfile") - .withDescription("The connector jar HA storage type."); - - public static final Option<Map<String, String>> CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG = - Options.key("plugin-config") - .mapType() - .noDefaultValue() - .withDescription("The connector jar HA storage instance configuration."); - - public static final Option<ConnectorJarHAStorageConfig> CONNECTOR_JAR_HA_STORAGE_CONFIG = - Options.key("jar-ha-storage") - .type(new TypeReference<ConnectorJarHAStorageConfig>() {}) - .defaultValue(new ConnectorJarHAStorageConfig()) - .withDescription("The connector jar ha storage configuration."); - - public static final Option<ConnectorJarStorageConfig> CONNECTOR_JAR_STORAGE_CONFIG = - Options.key("jar-storage") - .type(new TypeReference<ConnectorJarStorageConfig>() {}) - .defaultValue(new ConnectorJarStorageConfig()) - .withDescription("The connector jar storage configuration."); - public static final Option<Boolean> CLASSLOADER_CACHE_MODE = Options.key("classloader-cache-mode") .booleanType() @@ -237,6 +33,20 @@ public class ServerConfigOptions { .withDescription( "Whether to use classloader cache mode. With cache mode, all jobs share the same classloader if the jars are the same"); + ///////////////////////////////////////////////// + // The options for metrics start + public static final Option<Boolean> TELEMETRY_METRIC_ENABLED = + Options.key("enabled") + .booleanType() + .defaultValue(false) + .withDescription("Whether open metrics export."); + + public static final Option<TelemetryMetricConfig> TELEMETRY_METRIC = + Options.key("metric") + .type(new TypeReference<TelemetryMetricConfig>() {}) + .defaultValue(new TelemetryMetricConfig()) + .withDescription("The telemetry metric configuration."); + public static final Option<Boolean> TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE = Options.key("scheduled-deletion-enable") .booleanType() @@ -251,63 +61,297 @@ public class ServerConfigOptions { .defaultValue(new TelemetryLogsConfig()) .withDescription("The telemetry logs configuration."); - public static final Option<Boolean> TELEMETRY_METRIC_ENABLED = - Options.key("enabled") - .booleanType() - .defaultValue(false) - .withDescription("Whether open metrics export."); - - public static final Option<TelemetryMetricConfig> TELEMETRY_METRIC = - Options.key("metric") - .type(new TypeReference<TelemetryMetricConfig>() {}) - .defaultValue(new TelemetryMetricConfig()) - .withDescription("The telemetry metric configuration."); - public static final Option<TelemetryConfig> TELEMETRY = Options.key("telemetry") .type(new TypeReference<TelemetryConfig>() {}) .defaultValue(new TelemetryConfig()) .withDescription("The telemetry configuration."); - - public static final Option<Integer> PORT = - Options.key("port") - .intType() - .defaultValue(8080) - .withDescription("The port of the http server."); - - public static final Option<Boolean> ENABLE_HTTP = - Options.key("enable-http") - .booleanType() - .defaultValue(false) - .withDescription("Whether to enable the http server."); - - public static final Option<String> CONTEXT_PATH = - Options.key("context-path") - .stringType() - .defaultValue("") - .withDescription("The context path of the http server."); - - public static final Option<Boolean> ENABLE_DYNAMIC_PORT = - Options.key("enable-dynamic-port") - .booleanType() - .defaultValue(false) - .withDescription( - "Whether to enable the dynamic port of the http server. If true, We will use the unused port"); - - public static final Option<Integer> PORT_RANGE = - Options.key("port-range") - .intType() - .defaultValue(100) - .withDescription( - "The port range of the http server. If enable-dynamic-port is true, We will use the unused port in the range"); - - public static final Option<HttpConfig> HTTP = - Options.key("http") - .type(new TypeReference<HttpConfig>() {}) - .defaultValue(new HttpConfig()) - .withDescription("The http configuration."); - - public static final String EVENT_REPORT_HTTP = "event-report-http"; - public static final String EVENT_REPORT_HTTP_URL = "url"; - public static final String EVENT_REPORT_HTTP_HEADERS = "headers"; + // The options for metrics end + ///////////////////////////////////////////////// + + /** The options for master. */ + public static class MasterServerConfigOptions { + + public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL = + Options.key("print-execution-info-interval") + .intType() + .defaultValue(60) + .withDescription( + "The interval (in seconds) between two consecutive executions of the print execution info task."); + + public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL = + Options.key("print-job-metrics-info-interval") + .intType() + .defaultValue(60) + .withDescription("The interval (in seconds) of job print metrics info"); + + public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL = + Options.key("job-metrics-backup-interval") + .intType() + .defaultValue(10) + .withDescription("The interval (in seconds) of job metrics backups"); + + ///////////////////////////////////////////////// + // The options about Hazelcast IMAP store start + public static final Option<Integer> BACKUP_COUNT = + Options.key("backup-count") + .intType() + .defaultValue(1) + .withDescription("The number of backup copies of each partition."); + + public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES = + Options.key("history-job-expire-minutes") + .intType() + .defaultValue(1440) + .withDescription("The expire time of history jobs.time unit minute"); + // The options about Hazelcast IMAP store end + ///////////////////////////////////////////////// + + ///////////////////////////////////////////////// + // The options for checkpoint start + public static final Option<Integer> CHECKPOINT_INTERVAL = + Options.key("interval") + .intType() + .defaultValue(300000) + .withDescription( + "The interval (in milliseconds) between two consecutive checkpoints."); + + public static final Option<Integer> CHECKPOINT_TIMEOUT = + Options.key("timeout") + .intType() + .defaultValue(30000) + .withDescription("The timeout (in milliseconds) for a checkpoint."); + + public static final Option<String> CHECKPOINT_STORAGE_TYPE = + Options.key("type") + .stringType() + .defaultValue("localfile") + .withDescription("The checkpoint storage type."); + + public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED = + Options.key("max-retained") + .intType() + .defaultValue(20) + .withDescription("The maximum number of retained checkpoints."); + + public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE = + Options.key("storage") + .type(new TypeReference<CheckpointStorageConfig>() {}) + .defaultValue(new CheckpointStorageConfig()) + .withDescription("The checkpoint storage configuration."); + + public static final Option<Integer> SCHEMA_CHANGE_CHECKPOINT_TIMEOUT = + Options.key("schema-change-timeout") + .intType() + .defaultValue(30000) + .withDescription( + "The timeout (in milliseconds) for a schema change checkpoint."); + + public static final Option<Map<String, String>> CHECKPOINT_STORAGE_PLUGIN_CONFIG = + Options.key("plugin-config") + .type(new TypeReference<Map<String, String>>() {}) + .noDefaultValue() + .withDescription("The checkpoint storage instance configuration."); + + public static final Option<CheckpointConfig> CHECKPOINT = + Options.key("checkpoint") + .type(new TypeReference<CheckpointConfig>() {}) + .defaultValue(new CheckpointConfig()) + .withDescription("The checkpoint configuration."); + // The options for checkpoint end + ///////////////////////////////////////////////// + + ///////////////////////////////////////////////////// + // The options for job scheduler start + public static final Option<AllocateStrategy> SLOT_ALLOCATE_STRATEGY = + Options.key("slot-allocate-strategy") + .enumType(AllocateStrategy.class) + .defaultValue(AllocateStrategy.RANDOM) + .withDescription( + "When the strategy is SLOT_RATIO, the system allocates tasks based on the slot usage ratio, with priority given to workers with low usage rates; When the strategy is SYSTEM_LOAD, the system allocates tasks based on server load, with priority given to workers with lower load."); + + public static final Option<ScheduleStrategy> JOB_SCHEDULE_STRATEGY = + Options.key("job-schedule-strategy") + .enumType(ScheduleStrategy.class) + .defaultValue(ScheduleStrategy.REJECT) + .withDescription( + "When the policy is REJECT, when the task queue is full, the task will be rejected; when the policy is WAIT, when the task queue is full, the task will wait"); + // The options for job scheduler end + ///////////////////////////////////////////////////// + + ///////////////////////////////////////////////////// + // The options for http server start + public static final Option<Integer> PORT = + Options.key("port") + .intType() + .defaultValue(8080) + .withDescription("The port of the http server."); + + public static final Option<Boolean> ENABLE_HTTP = + Options.key("enable-http") + .booleanType() + .defaultValue(false) + .withDescription("Whether to enable the http server."); + + public static final Option<String> CONTEXT_PATH = + Options.key("context-path") + .stringType() + .defaultValue("") + .withDescription("The context path of the http server."); + + public static final Option<Boolean> ENABLE_DYNAMIC_PORT = + Options.key("enable-dynamic-port") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable the dynamic port of the http server. If true, We will use the unused port"); + + public static final Option<Integer> PORT_RANGE = + Options.key("port-range") + .intType() + .defaultValue(100) + .withDescription( + "The port range of the http server. If enable-dynamic-port is true, We will use the unused port in the range"); + + public static final Option<HttpConfig> HTTP = + Options.key("http") + .type(new TypeReference<HttpConfig>() {}) + .defaultValue(new HttpConfig()) + .withDescription("The http configuration."); + + public static final String EVENT_REPORT_HTTP = "event-report-http"; + public static final String EVENT_REPORT_HTTP_URL = "url"; + public static final String EVENT_REPORT_HTTP_HEADERS = "headers"; + + // The options for http server end + ///////////////////////////////////////////////////// + + ///////////////////////////////////////////////// + // The options for connector jar storage start + public static final Option<Boolean> ENABLE_CONNECTOR_JAR_STORAGE = + Options.key("enable") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "Enable the engine server Jar package storage service," + + " automatically upload connector Jar packages and dependent third-party Jar packages" + + " to the server before job execution." + + " Enabling this configuration does not require the server to hold all connector Jar packages"); + + public static final Option<ConnectorJarStorageMode> CONNECTOR_JAR_STORAGE_MODE = + Options.key("connector-jar-storage-mode") + .enumType(ConnectorJarStorageMode.class) + .defaultValue(ConnectorJarStorageMode.SHARED) + .withDescription( + "The storage mode of the connector jar package, including SHARED, ISOLATED. Default is SHARED"); + + public static final Option<String> CONNECTOR_JAR_STORAGE_PATH = + Options.key("connector-jar-storage-path") + .stringType() + .defaultValue("") + .withDescription("The user defined connector jar storage path."); + + public static final Option<Integer> CONNECTOR_JAR_CLEANUP_TASK_INTERVAL = + Options.key("connector-jar-cleanup-task-interval") + .intType() + .defaultValue(3600) + .withDescription("The user defined connector jar cleanup task interval."); + + public static final Option<Integer> CONNECTOR_JAR_EXPIRY_TIME = + Options.key("connector-jar-expiry-time") + .intType() + .defaultValue(600) + .withDescription("The user defined connector jar expiry time."); + + public static final Option<String> CONNECTOR_JAR_HA_STORAGE_TYPE = + Options.key("type") + .stringType() + .defaultValue("localfile") + .withDescription("The connector jar HA storage type."); + + public static final Option<Map<String, String>> CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG = + Options.key("plugin-config") + .mapType() + .noDefaultValue() + .withDescription("The connector jar HA storage instance configuration."); + + public static final Option<ConnectorJarHAStorageConfig> CONNECTOR_JAR_HA_STORAGE_CONFIG = + Options.key("jar-ha-storage") + .type(new TypeReference<ConnectorJarHAStorageConfig>() {}) + .defaultValue(new ConnectorJarHAStorageConfig()) + .withDescription("The connector jar ha storage configuration."); + + public static final Option<ConnectorJarStorageConfig> CONNECTOR_JAR_STORAGE_CONFIG = + Options.key("jar-storage") + .type(new TypeReference<ConnectorJarStorageConfig>() {}) + .defaultValue(new ConnectorJarStorageConfig()) + .withDescription("The connector jar storage configuration."); + // The options for connector jar storage end + ///////////////////////////////////////////////// + + ///////////////////////////////////////////////// + // The options for coordinator service start + public static final Option<Integer> CORE_THREAD_NUM = + Options.key("core-thread-num") + .intType() + .defaultValue(10) + .withDescription("The core thread num of coordinator service"); + + public static final Option<Integer> MAX_THREAD_NUM = + Options.key("max-thread-num") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription("The max thread num of coordinator service"); + + public static final Option<CoordinatorServiceConfig> COORDINATOR_SERVICE = + Options.key("coordinator-service") + .type(new TypeReference<CoordinatorServiceConfig>() {}) + .defaultValue(new CoordinatorServiceConfig()) + .withDescription("The coordinator service configuration."); + // The options for coordinator service end + ///////////////////////////////////////////////// + + } + + /** The options for worker. */ + public static class WorkerServerConfigOptions { + + public static final Option<ThreadShareMode> TASK_EXECUTION_THREAD_SHARE_MODE = + Options.key("task_execution_thread_share_mode") + .type(new TypeReference<ThreadShareMode>() {}) + .defaultValue(ThreadShareMode.OFF) + .withDescription( + "The thread sharing mode of TaskExecutionServer, including ALL, OFF, PART. Default is OFF"); + + public static final Option<QueueType> QUEUE_TYPE = + Options.key("queue-type") + .type(new TypeReference<QueueType>() {}) + .defaultValue(QueueType.BLOCKINGQUEUE) + .withDescription("The internal data cache queue type."); + + ///////////////////////////////////////////////// + // The options for slot start + public static final Option<Boolean> DYNAMIC_SLOT = + Options.key("dynamic-slot") + .booleanType() + .defaultValue(true) + .withDescription("Whether to use dynamic slot."); + + public static final Option<Integer> SLOT_NUM = + Options.key("slot-num") + .intType() + .defaultValue(2) + .withDescription( + "The number of slots. Only valid when dynamic slot is disabled."); + + public static final Option<SlotServiceConfig> SLOT_SERVICE = + Options.key("slot-service") + .type(new TypeReference<SlotServiceConfig>() {}) + .defaultValue(new SlotServiceConfig()) + .withDescription("The slot service configuration."); + + // The options for slot end + ///////////////////////////////////////////////// + + } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java index 92f70fda3b..158f2803e9 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java @@ -27,14 +27,16 @@ import static com.hazelcast.internal.util.Preconditions.checkPositive; public class SlotServiceConfig implements Serializable { private AllocateStrategy allocateStrategy = - ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue(); + ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue(); - private boolean dynamicSlot = ServerConfigOptions.DYNAMIC_SLOT.defaultValue(); + private boolean dynamicSlot = + ServerConfigOptions.WorkerServerConfigOptions.DYNAMIC_SLOT.defaultValue(); - private int slotNum = ServerConfigOptions.SLOT_NUM.defaultValue(); + private int slotNum = ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM.defaultValue(); public void setSlotNum(int slotNum) { - checkPositive(slotNum, ServerConfigOptions.SLOT_NUM + " must be > 0"); + checkPositive( + slotNum, ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM + " must be > 0"); this.slotNum = slotNum; } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java index f0fe4d9b84..f64eb014d5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java @@ -39,7 +39,7 @@ public class StorageStrategyFactory { connectorJarStorageConfig, seaTunnelServer); default: throw new IllegalArgumentException( - ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE + ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE + " must in [SHARED, ISOLATED]"); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java index b98bad5f6c..e4e2ba2c45 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java @@ -354,7 +354,9 @@ public class SystemLoadCalculateTest { when(rm.getEngineConfig().getSlotServiceConfig()) .thenReturn(Mockito.mock(SlotServiceConfig.class)); when(rm.getEngineConfig().getSlotServiceConfig().getAllocateStrategy()) - .thenReturn(ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue()); + .thenReturn( + ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY + .defaultValue()); // Simulate ResourceRequestHandler to call calculateWeight to calculate weight SystemLoadStrategy systemLoadStrategy = new SystemLoadStrategy(workerLoadMap); systemLoadStrategy.calculateWeight(workerProfile2, workerAssignedSlots2); @@ -459,7 +461,9 @@ public class SystemLoadCalculateTest { when(rm.getEngineConfig().getSlotServiceConfig()) .thenReturn(Mockito.mock(SlotServiceConfig.class)); when(rm.getEngineConfig().getSlotServiceConfig().getAllocateStrategy()) - .thenReturn(ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue()); + .thenReturn( + ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY + .defaultValue()); WorkerProfile workerProfile1 = Mockito.mock(WorkerProfile.class); when(workerProfile1.getAssignedSlots()).thenReturn(new SlotProfile[0]);