zjffdu commented on a change in pull request #4098:
URL: https://github.com/apache/zeppelin/pull/4098#discussion_r616455596



##########
File path: 
zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -55,6 +64,65 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, 
RecoveryStorage rec
     }
     envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
     envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+
+    // yarn application mode specific logic
+    if (context.getProperties().getProperty("flink.execution.mode")
+            .equalsIgnoreCase("yarn_application")) {
+      updateEnvsForYarnApplicationMode(envs, context);
+    }
+
     return envs;
   }
+
+  private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
+                                                InterpreterLaunchContext 
context) {
+    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+
+    StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+
+    // Extract yarn.ship-files, add hive-site.xml automatically if hive is 
enabled
+    // and HIVE_CONF_DIR is specified
+    String hiveConfDirProperty = 
context.getProperties().getProperty("HIVE_CONF_DIR");
+    List<String> yarnShipFiles = new ArrayList<>();
+    if (StringUtils.isNotBlank(hiveConfDirProperty) &&
+            Boolean.parseBoolean(context.getProperties()
+                    .getProperty("zeppelin.flink.enableHive", "false"))) {
+      File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml");
+      if (hiveSiteFile.isFile() && hiveSiteFile.exists()) {
+        yarnShipFiles.add(hiveSiteFile.getAbsolutePath());
+      } else {
+        LOGGER.warn("Hive site file: {} doesn't exist or is not a directory", 
hiveSiteFile);
+      }
+    }
+    if (context.getProperties().containsKey("yarn.ship-files")) {
+      
yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files"));
+    }
+    if (!yarnShipFiles.isEmpty()) {
+      flinkYarnApplicationConfBuilder.append(
+              " -D yarn.ship-files=" + 
yarnShipFiles.stream().collect(Collectors.joining(",")));
+    }
+
+    // specify yarn.application.name
+    String yarnAppName = 
context.getProperties().getProperty("flink.yarn.appName");
+    if (StringUtils.isNotBlank(yarnAppName)) {
+      // flink run command can not contains whitespace, so replace it with _
+      flinkYarnApplicationConfBuilder.append(
+              " -D yarn.application.name=" + yarnAppName.replaceAll(" ", "_") 
+ "");
+    }
+
+    // add other yarn and python configuration.
+    for (Map.Entry<Object, Object> entry : context.getProperties().entrySet()) 
{
+      if (!entry.getKey().toString().equalsIgnoreCase("yarn.ship-files") &&
+              
!entry.getKey().toString().equalsIgnoreCase("flink.yarn.appName")) {
+        if 
(CharMatcher.whitespace().matchesAnyOf(entry.getValue().toString())) {
+          LOGGER.warn("flink configuration key {} is skipped because it 
contains white space",
+                  entry.getValue().toString());
+        } else {
+          flinkYarnApplicationConfBuilder.append(
+                  " -D " + entry.getKey().toString() + "=" + 
entry.getValue().toString() + "");
+        }
+      }
+    }

Review comment:
       Thanks for the careful review @cuspymd 
   Comments are addressed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to