This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push: new 17a8324cd1 [enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672 17a8324cd1 is described below commit 17a8324cd1bb65a40232261722f42ae07c1cd951 Author: chenlinzhong <490103...@qq.com> AuthorDate: Wed Oct 26 12:54:55 2022 +0800 [enhancement](cherry-pick)(spark-load)support dynamic set env (#12276) #13672 --- .../org/apache/doris/catalog/SparkResource.java | 49 ++++++++++++++++++++-- .../doris/load/loadv2/SparkEtlJobHandler.java | 32 ++++++++++++-- .../apache/doris/load/loadv2/SparkLoadJobTest.java | 2 +- 3 files changed, 76 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index e2feb65326..00bc2d7c69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -75,6 +75,7 @@ public class SparkResource extends Resource { private static final String YARN_MASTER = "yarn"; private static final String SPARK_CONFIG_PREFIX = "spark."; private static final String BROKER_PROPERTY_PREFIX = "broker."; + private static final String ENV_PREFIX = "env."; // spark uses hadoop configs in the form of spark.hadoop.* private static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop."; private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address"; @@ -104,19 +105,22 @@ public class SparkResource extends Resource { // broker username and password @SerializedName(value = "brokerProperties") private Map<String, String> brokerProperties; + @SerializedName(value = "envConfigs") + private Map<String, String> envConfigs; public SparkResource(String name) { - this(name, Maps.newHashMap(), null, null, Maps.newHashMap()); + this(name, Maps.newHashMap(), null, null, Maps.newHashMap(), Maps.newHashMap()); } // "public" for testing public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker, - Map<String, String> brokerProperties) { + Map<String, String> brokerProperties, Map<String, String> envConfigs) { super(name, ResourceType.SPARK); this.sparkConfigs = sparkConfigs; this.workingDir = workingDir; this.broker = broker; this.brokerProperties = brokerProperties; + this.envConfigs = envConfigs; } public String getMaster() { @@ -150,12 +154,25 @@ public class SparkResource extends Resource { return sparkConfigs; } + public Map<String, String> getEnvConfigsWithoutPrefix() { + Map<String, String> envConfig = Maps.newHashMap(); + if (envConfigs != null) { + for (Map.Entry<String, String> entry : envConfigs.entrySet()) { + if (entry.getKey().startsWith(ENV_PREFIX)) { + String key = entry.getKey().substring(ENV_PREFIX.length()); + envConfig.put(key, entry.getValue()); + } + } + } + return envConfig; + } + public Pair<String, String> getYarnResourcemanagerAddressPair() { return Pair.create(YARN_RESOURCE_MANAGER_ADDRESS, sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)); } public SparkResource getCopiedResource() { - return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties); + return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties, envConfigs); } @Override @@ -230,6 +247,15 @@ public class SparkResource extends Resource { broker = properties.get(BROKER); } brokerProperties.putAll(getBrokerProperties(properties)); + Map<String, String> env = getEnvConfig(properties); + if (env.size() > 0) { + if (envConfigs == null) { + envConfigs = env; + } else { + envConfigs.putAll(env); + } + } + LOG.info("updateProperties,{},{}", properties, envConfigs); } @Override @@ -238,6 +264,8 @@ public class SparkResource extends Resource { // get spark configs sparkConfigs = getSparkConfig(properties); + envConfigs = getEnvConfig(properties); + LOG.info("setProperties,{},{}", properties, envConfigs); // check master and deploy mode if (getMaster() == null) { throw new DdlException("Missing " + SPARK_MASTER + " in properties"); @@ -281,6 +309,16 @@ public class SparkResource extends Resource { return sparkConfig; } + private Map<String, String> getEnvConfig(Map<String, String> properties) { + Map<String, String> envConfig = Maps.newHashMap(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().startsWith(ENV_PREFIX)) { + envConfig.put(entry.getKey(), entry.getValue()); + } + } + return envConfig; + } + private Map<String, String> getSparkHadoopConfig(Map<String, String> properties) { Map<String, String> sparkConfig = Maps.newHashMap(); for (Map.Entry<String, String> entry : properties.entrySet()) { @@ -333,5 +371,10 @@ public class SparkResource extends Resource { for (Map.Entry<String, String> entry : brokerProperties.entrySet()) { result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue())); } + if (envConfigs != null) { + for (Map.Entry<String, String> entry : envConfigs.entrySet()) { + result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue())); + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index f26793ab82..b89817890e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -123,7 +123,10 @@ public class SparkEtlJobHandler { throw new LoadException(e.getMessage()); } - SparkLauncher launcher = new SparkLauncher(); + Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix(); + LOG.info("submit etl job,env:{}", envParams); + + SparkLauncher launcher = new SparkLauncher(envParams); // master | deployMode // ------------|------------- // yarn | cluster @@ -195,7 +198,19 @@ public class SparkEtlJobHandler { // command: yarn --config configDir application -status appId String yarnStatusCmd = String.format(YARN_STATUS_CMD, yarnClient, configDir, appId); LOG.info(yarnStatusCmd); - String[] envp = { "LC_ALL=" + Config.locale }; + + Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix(); + int envNums = envParams.size() + 1; + String[] envp = new String[envNums]; + int idx = 0; + envp[idx++] = "LC_ALL=" + Config.locale; + if (envParams.size() > 0) { + for (Map.Entry<String, String> entry : envParams.entrySet()) { + String envItem = entry.getKey() + "=" + entry.getValue(); + envp[idx++] = envItem; + } + } + LOG.info("getEtlJobStatus,appId:{}, loadJobId:{}, env:{},resource:{}", appId, loadJobId, envp, resource); CommandResult result = Util.executeCommand(yarnStatusCmd, envp, EXEC_CMD_TIMEOUT_MS); if (result.getReturnCode() != 0) { String stderr = result.getStderr(); @@ -283,7 +298,18 @@ public class SparkEtlJobHandler { // command: yarn --config configDir application -kill appId String yarnKillCmd = String.format(YARN_KILL_CMD, yarnClient, configDir, appId); LOG.info(yarnKillCmd); - String[] envp = { "LC_ALL=" + Config.locale }; + Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix(); + int envNums = envParams.size() + 1; + String[] envp = new String[envNums]; + int idx = 0; + envp[idx++] = "LC_ALL=" + Config.locale; + if (envParams.size() > 0) { + for (Map.Entry<String, String> entry : envParams.entrySet()) { + String envItem = entry.getKey() + "=" + entry.getValue(); + envp[idx++] = envItem; + } + } + LOG.info("killEtlJob, env:{}", envp); CommandResult result = Util.executeCommand(yarnKillCmd, envp, EXEC_CMD_TIMEOUT_MS); LOG.info("yarn application -kill {}, output: {}", appId, result.getStdout()); if (result.getReturnCode() != 0) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 9908f9b804..b220dfdeb6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -493,7 +493,7 @@ public class SparkLoadJobTest { @Mocked ResourceMgr resourceMgr) throws Exception { long dbId = 1000L; SparkResource sparkResource = new SparkResource("my_spark", Maps.newHashMap(), "/path/to/", "bos", - Maps.newHashMap()); + Maps.newHashMap(), Maps.newHashMap()); new Expectations() { { catalog.getResourceMgr(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org