cuspymd commented on a change in pull request #4128:
URL: https://github.com/apache/zeppelin/pull/4128#discussion_r644049588



##########
File path: 
flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
##########
@@ -25,45 +26,74 @@
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URLClassLoader;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
- * Interpreter for flink scala. It delegates all the function to 
FlinkScalaInterpreter.
+ * Interpreter for flink scala. It delegates all the function to 
FlinkScalaInterpreter
+ * which is implemented by flink scala shell underneath.
  */
 public class FlinkInterpreter extends Interpreter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkInterpreter.class);
 
+  private Map<String, String> innerInterpreterClassMap = new HashMap<>();
   private FlinkScalaInterpreter innerIntp;
-  private FlinkZeppelinContext z;
+  private ZeppelinContext z;
 
   public FlinkInterpreter(Properties properties) {
     super(properties);
+    innerInterpreterClassMap.put("2.11", 
"org.apache.zeppelin.flink.FlinkScala211Interpreter");
+    innerInterpreterClassMap.put("2.12", 
"org.apache.zeppelin.flink.FlinkScala212Interpreter");
   }
 
-  private void checkScalaVersion() throws InterpreterException {
+  private String extractScalaVersion() throws InterpreterException {
     String scalaVersionString = scala.util.Properties.versionString();
     LOGGER.info("Using Scala: " + scalaVersionString);
     if (scalaVersionString.contains("version 2.11")) {
-      return;
+      return "2.11";
+    } else if (scalaVersionString.contains("version 2.12")) {
+      return "2.12";
     } else {
       throw new InterpreterException("Unsupported scala version: " + 
scalaVersionString +
-              ", Only scala 2.11 is supported");
+              ", Only scala 2.11/2.12 is supported");
     }
   }
 
   @Override
   public void open() throws InterpreterException {
-    checkScalaVersion();
-    
-    this.innerIntp = new FlinkScalaInterpreter(getProperties());
-    this.innerIntp.open();
-    this.z = this.innerIntp.getZeppelinContext();
+    try {
+      this.innerIntp = loadFlinkScalaInterpreter();
+      this.innerIntp.open();
+      this.z = this.innerIntp.getZeppelinContext();
+    } catch (Exception e) {
+      throw new InterpreterException("Fail to open FlinkInterpreter", e);

Review comment:
       It seems be strange to run again `new InterpreterException()` when 
`loadFlinkScalaInterpreter()` throws an exception of the type 
`InterpreterException`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to