cuspymd commented on a change in pull request #4098:
URL: https://github.com/apache/zeppelin/pull/4098#discussion_r616012945
##########
File path:
zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -55,6 +64,65 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf,
RecoveryStorage rec
}
envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+
+ // yarn application mode specific logic
+ if (context.getProperties().getProperty("flink.execution.mode")
+ .equalsIgnoreCase("yarn_application")) {
+ updateEnvsForYarnApplicationMode(envs, context);
+ }
+
return envs;
}
+
+ private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
+ InterpreterLaunchContext
context) {
+ envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+
+ StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+
+ // Extract yarn.ship-files, add hive-site.xml automatically if hive is
enabled
+ // and HIVE_CONF_DIR is specified
+ String hiveConfDirProperty =
context.getProperties().getProperty("HIVE_CONF_DIR");
+ List<String> yarnShipFiles = new ArrayList<>();
+ if (StringUtils.isNotBlank(hiveConfDirProperty) &&
+ Boolean.parseBoolean(context.getProperties()
+ .getProperty("zeppelin.flink.enableHive", "false"))) {
+ File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml");
+ if (hiveSiteFile.isFile() && hiveSiteFile.exists()) {
+ yarnShipFiles.add(hiveSiteFile.getAbsolutePath());
+ } else {
+ LOGGER.warn("Hive site file: {} doesn't exist or is not a directory",
hiveSiteFile);
+ }
+ }
+ if (context.getProperties().containsKey("yarn.ship-files")) {
+
yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files"));
+ }
Review comment:
Separation with `getYarnShipFiles()` function seems to improve
readability.
##########
File path:
zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
##########
@@ -55,6 +64,65 @@ public FlinkInterpreterLauncher(ZeppelinConfiguration zConf,
RecoveryStorage rec
}
envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+
+ // yarn application mode specific logic
+ if (context.getProperties().getProperty("flink.execution.mode")
+ .equalsIgnoreCase("yarn_application")) {
+ updateEnvsForYarnApplicationMode(envs, context);
+ }
+
return envs;
}
+
+ private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
+ InterpreterLaunchContext
context) {
+ envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+
+ StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+
+ // Extract yarn.ship-files, add hive-site.xml automatically if hive is
enabled
+ // and HIVE_CONF_DIR is specified
+ String hiveConfDirProperty =
context.getProperties().getProperty("HIVE_CONF_DIR");
+ List<String> yarnShipFiles = new ArrayList<>();
+ if (StringUtils.isNotBlank(hiveConfDirProperty) &&
+ Boolean.parseBoolean(context.getProperties()
+ .getProperty("zeppelin.flink.enableHive", "false"))) {
+ File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml");
+ if (hiveSiteFile.isFile() && hiveSiteFile.exists()) {
+ yarnShipFiles.add(hiveSiteFile.getAbsolutePath());
+ } else {
+ LOGGER.warn("Hive site file: {} doesn't exist or is not a directory",
hiveSiteFile);
+ }
+ }
+ if (context.getProperties().containsKey("yarn.ship-files")) {
+
yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files"));
+ }
+ if (!yarnShipFiles.isEmpty()) {
+ flinkYarnApplicationConfBuilder.append(
+ " -D yarn.ship-files=" +
yarnShipFiles.stream().collect(Collectors.joining(",")));
+ }
+
+ // specify yarn.application.name
+ String yarnAppName =
context.getProperties().getProperty("flink.yarn.appName");
+ if (StringUtils.isNotBlank(yarnAppName)) {
+ // flink run command can not contains whitespace, so replace it with _
+ flinkYarnApplicationConfBuilder.append(
+ " -D yarn.application.name=" + yarnAppName.replaceAll(" ", "_")
+ "");
+ }
+
+ // add other yarn and python configuration.
+ for (Map.Entry<Object, Object> entry : context.getProperties().entrySet())
{
+ if (!entry.getKey().toString().equalsIgnoreCase("yarn.ship-files") &&
+
!entry.getKey().toString().equalsIgnoreCase("flink.yarn.appName")) {
+ if
(CharMatcher.whitespace().matchesAnyOf(entry.getValue().toString())) {
+ LOGGER.warn("flink configuration key {} is skipped because it
contains white space",
+ entry.getValue().toString());
+ } else {
+ flinkYarnApplicationConfBuilder.append(
+ " -D " + entry.getKey().toString() + "=" +
entry.getValue().toString() + "");
+ }
+ }
+ }
Review comment:
`entry.getKey().toString()` and `entry.getValue().toString()` are
repeated several times, so it seems to be good to use by explicitly declaring
them as variables.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]