This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 941bda5a20 [enhancement](spark-load)support dynamic set env (#12276) 941bda5a20 is described below commit 941bda5a20843f81eaafbe6a634c0da908ee339f Author: chenlinzhong <490103...@qq.com> AuthorDate: Wed Sep 7 16:24:29 2022 +0800 [enhancement](spark-load)support dynamic set env (#12276) * [enhancement](spark-load)support dynamic set env and display spark appid * [enhancement](spark-load)support dynamic set env --- .../org/apache/doris/catalog/SparkResource.java | 49 ++++++++++++++++++++-- .../doris/load/loadv2/SparkEtlJobHandler.java | 32 ++++++++++++-- .../apache/doris/load/loadv2/SparkLoadJobTest.java | 2 +- 3 files changed, 76 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index c8b59bfa26..001c8ae0cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -74,6 +74,7 @@ public class SparkResource extends Resource { private static final String YARN_MASTER = "yarn"; private static final String SPARK_CONFIG_PREFIX = "spark."; private static final String BROKER_PROPERTY_PREFIX = "broker."; + private static final String ENV_PREFIX = "env."; // spark uses hadoop configs in the form of spark.hadoop.* private static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop."; private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address"; @@ -103,19 +104,22 @@ public class SparkResource extends Resource { // broker username and password @SerializedName(value = "brokerProperties") private Map<String, String> brokerProperties; + @SerializedName(value = "envConfigs") + private Map<String, String> envConfigs; public SparkResource(String name) { - this(name, Maps.newHashMap(), null, null, Maps.newHashMap()); + this(name, Maps.newHashMap(), null, null, Maps.newHashMap(), Maps.newHashMap()); } // "public" for testing public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker, - Map<String, String> brokerProperties) { + Map<String, String> brokerProperties, Map<String, String> envConfigs) { super(name, ResourceType.SPARK); this.sparkConfigs = sparkConfigs; this.workingDir = workingDir; this.broker = broker; this.brokerProperties = brokerProperties; + this.envConfigs = envConfigs; } public String getMaster() { @@ -149,12 +153,25 @@ public class SparkResource extends Resource { return sparkConfigs; } + public Map<String, String> getEnvConfigsWithoutPrefix() { + Map<String, String> envConfig = Maps.newHashMap(); + if (envConfigs != null) { + for (Map.Entry<String, String> entry : envConfigs.entrySet()) { + if (entry.getKey().startsWith(ENV_PREFIX)) { + String key = entry.getKey().substring(ENV_PREFIX.length()); + envConfig.put(key, entry.getValue()); + } + } + } + return envConfig; + } + public Pair<String, String> getYarnResourcemanagerAddressPair() { return Pair.of(YARN_RESOURCE_MANAGER_ADDRESS, sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)); } public SparkResource getCopiedResource() { - return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties); + return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties, envConfigs); } @Override @@ -233,6 +250,15 @@ public class SparkResource extends Resource { broker = properties.get(BROKER); } brokerProperties.putAll(getBrokerProperties(properties)); + Map<String, String> env = getEnvConfig(properties); + if (env.size() > 0) { + if (envConfigs == null) { + envConfigs = env; + } else { + envConfigs.putAll(env); + } + } + LOG.info("updateProperties,{},{}", properties, envConfigs); } @Override @@ -241,6 +267,8 @@ public class SparkResource extends Resource { // get spark configs sparkConfigs = getSparkConfig(properties); + envConfigs = getEnvConfig(properties); + LOG.info("setProperties,{},{}", properties, envConfigs); // check master and deploy mode if (getMaster() == null) { throw new DdlException("Missing " + SPARK_MASTER + " in properties"); @@ -285,6 +313,16 @@ public class SparkResource extends Resource { return sparkConfig; } + private Map<String, String> getEnvConfig(Map<String, String> properties) { + Map<String, String> envConfig = Maps.newHashMap(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().startsWith(ENV_PREFIX)) { + envConfig.put(entry.getKey(), entry.getValue()); + } + } + return envConfig; + } + private Map<String, String> getSparkHadoopConfig(Map<String, String> properties) { Map<String, String> sparkConfig = Maps.newHashMap(); for (Map.Entry<String, String> entry : properties.entrySet()) { @@ -338,5 +376,10 @@ public class SparkResource extends Resource { for (Map.Entry<String, String> entry : brokerProperties.entrySet()) { result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue())); } + if (envConfigs != null) { + for (Map.Entry<String, String> entry : envConfigs.entrySet()) { + result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue())); + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index eb7ac15595..84954d3064 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -123,7 +123,10 @@ public class SparkEtlJobHandler { throw new LoadException(e.getMessage()); } - SparkLauncher launcher = new SparkLauncher(); + Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix(); + LOG.info("submit etl job,env:{}", envParams); + + SparkLauncher launcher = new SparkLauncher(envParams); // master | deployMode // ------------|------------- // yarn | cluster @@ -195,7 +198,19 @@ public class SparkEtlJobHandler { // command: yarn --config configDir application -status appId String yarnStatusCmd = String.format(YARN_STATUS_CMD, yarnClient, configDir, appId); LOG.info(yarnStatusCmd); - String[] envp = { "LC_ALL=" + Config.locale }; + + Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix(); + int envNums = envParams.size() + 1; + String[] envp = new String[envNums]; + int idx = 0; + envp[idx++] = "LC_ALL=" + Config.locale; + if (envParams.size() > 0) { + for (Map.Entry<String, String> entry : envParams.entrySet()) { + String envItem = entry.getKey() + "=" + entry.getValue(); + envp[idx++] = envItem; + } + } + LOG.info("getEtlJobStatus,appId:{}, loadJobId:{}, env:{},resource:{}", appId, loadJobId, envp, resource); CommandResult result = Util.executeCommand(yarnStatusCmd, envp, EXEC_CMD_TIMEOUT_MS); if (result.getReturnCode() != 0) { String stderr = result.getStderr(); @@ -284,7 +299,18 @@ public class SparkEtlJobHandler { // command: yarn --config configDir application -kill appId String yarnKillCmd = String.format(YARN_KILL_CMD, yarnClient, configDir, appId); LOG.info(yarnKillCmd); - String[] envp = { "LC_ALL=" + Config.locale }; + Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix(); + int envNums = envParams.size() + 1; + String[] envp = new String[envNums]; + int idx = 0; + envp[idx++] = "LC_ALL=" + Config.locale; + if (envParams.size() > 0) { + for (Map.Entry<String, String> entry : envParams.entrySet()) { + String envItem = entry.getKey() + "=" + entry.getValue(); + envp[idx++] = envItem; + } + } + LOG.info("killEtlJob, env:{}", envp); CommandResult result = Util.executeCommand(yarnKillCmd, envp, EXEC_CMD_TIMEOUT_MS); LOG.info("yarn application -kill {}, output: {}", appId, result.getStdout()); if (result.getReturnCode() != 0) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 40b0262673..6ce9d6090a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -500,7 +500,7 @@ public class SparkLoadJobTest { @Mocked ResourceMgr resourceMgr) throws Exception { long dbId = 1000L; SparkResource sparkResource = new SparkResource("my_spark", Maps.newHashMap(), "/path/to/", "bos", - Maps.newHashMap()); + Maps.newHashMap(), Maps.newHashMap()); new Expectations() { { env.getResourceMgr(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org