zjffdu commented on a change in pull request #3397: [ZEPPELIN-4208] Cluster 
synchronize InterpreterSetting
URL: https://github.com/apache/zeppelin/pull/3397#discussion_r300923978
 
 

 ##########
 File path: 
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
 ##########
 @@ -978,4 +1041,85 @@ public void onParagraphUpdate(Paragraph p) throws 
IOException {
   public void onParagraphStatusChange(Paragraph p, Job.Status status) throws 
IOException {
 
   }
+
+  @Override
+  public void onClusterEvent(String msg) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("onClusterEvent : {}", msg);
+    }
+
+    try {
+      Gson gson = new Gson();
+      ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+      String id = message.get("id");
+      String name = message.get("name");
+      String group = message.get("group");
+      InterpreterOption option = null;
+      Map<String, InterpreterProperty> properties = null;
+      List<Dependency> dependencies = null;
+      String jsonOption = message.get("option");
+      if (!StringUtils.isBlank(jsonOption)) {
+        option = InterpreterOption.fromJson(jsonOption);
+      }
+      String jsonProperties = message.get("properties");
+      if (!StringUtils.isBlank(jsonProperties)) {
+        properties = gson.fromJson(jsonProperties,
+            new TypeToken<Map<String, InterpreterProperty>>() {}.getType());
+      }
+      String jsonDependencies = message.get("dependencies");
+      if (!StringUtils.isBlank(jsonOption)) {
+        dependencies = gson.fromJson(jsonDependencies, new 
TypeToken<List<Dependency>>() {}.getType());
+      }
+
+      switch (message.clusterEvent) {
+        case CREATE_INTP_SETTING:
+          inlineCreateNewSetting(name, group, dependencies, option, 
properties);
+          break;
+        case UPDATE_INTP_SETTING:
+          inlineSetPropertyAndRestart(id, option, properties, dependencies, 
false);
+          break;
+        case DELETE_INTP_SETTING:
+          inlineRemove(id, false);
+          break;
+        default:
+          LOGGER.error("Unknown clusterEvent:{}, msg:{} ", 
message.clusterEvent, msg);
+          break;
+      }
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+    } catch (InterpreterException e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+
+  // broadcast cluster event
+  private void broadcastClusterEvent(ClusterEvent event, InterpreterSetting 
intpSetting) {
+    if (!conf.isClusterMode()) {
+      return;
+    }
+
+    List<Dependency> dependencies = intpSetting.getDependencies();
+    Map<String, InterpreterProperty> properties
+        = (Map<String, InterpreterProperty>) intpSetting.getProperties();
+    InterpreterOption intpOption = intpSetting.getOption();
+
+    HashMap<String, String> params = new HashMap<>();
+    String jsonDep = gson.toJson(dependencies, new 
TypeToken<List<Dependency>>() {
+    }.getType());
+    String jsonProps = gson.toJson(properties, new TypeToken<Map<String, 
InterpreterProperty>>() {
+    }.getType());
+    params.put("id", intpSetting.getId());
 
 Review comment:
   It won't be too large. Many fields of InterpreterSetting is transient which 
won't be serialized. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to