alexott commented on a change in pull request #3556: [ZEPPELIN-4488]. Support 
Flink 1.10
URL: https://github.com/apache/zeppelin/pull/3556#discussion_r370933873
 
 

 ##########
 File path: 
flink/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
 ##########
 @@ -37,51 +38,80 @@ public FlinkStreamSqlInterpreter(Properties properties) {
     super(properties);
   }
 
-
   @Override
-  public void open() throws InterpreterException {
-    this.flinkInterpreter =
-            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
-    this.tbenv = flinkInterpreter.getStreamTableEnvironment();
+  protected boolean isBatch() {
+    return false;
   }
 
   @Override
-  public void close() throws InterpreterException {
-
+  public void open() throws InterpreterException {
+    super.open();
+    this.tbenv = flinkInterpreter.getJavaStreamTableEnvironment();
   }
 
   @Override
-  protected void checkLocalProperties(Map<String, String> localProperties)
-          throws InterpreterException {
+  public void close() throws InterpreterException {
 
   }
 
   @Override
   public void callSelect(String sql, InterpreterContext context) throws 
IOException {
-    String streamType = context.getLocalProperties().get("type");
-    if (streamType == null) {
-      throw new IOException("type must be specified for stream sql");
+    String savepointDir = context.getLocalProperties().get("savepointDir");
+    if (!StringUtils.isBlank(savepointDir)) {
+      Object savepointPath = flinkInterpreter.getZeppelinContext()
+              .angular(context.getParagraphId() + "_savepointpath", 
context.getNoteId(), null);
+      if (savepointPath == null) {
+        LOGGER.info("savepointPath is null because it is the first run");
+      } else {
+        LOGGER.info("set savepointPath to: " + savepointPath.toString());
+        this.flinkInterpreter.getFlinkConfiguration()
+                .setString("execution.savepoint.path", 
savepointPath.toString());
+      }
     }
-    if (streamType.equalsIgnoreCase("single")) {
-      SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
-              flinkInterpreter.getStreamExecutionEnvironment(),
-              flinkInterpreter.getStreamTableEnvironment(), context,
-              flinkInterpreter.getDefaultParallelism());
-      streamJob.run(sql);
-    } else if (streamType.equalsIgnoreCase("ts")) {
-      TimeSeriesStreamSqlJob streamJob = new TimeSeriesStreamSqlJob(
-              flinkInterpreter.getStreamExecutionEnvironment(),
-              flinkInterpreter.getStreamTableEnvironment(), context,
-              flinkInterpreter.getDefaultParallelism());
-      streamJob.run(sql);
-    } else if (streamType.equalsIgnoreCase("retract")) {
-      RetractStreamSqlJob streamJob = new RetractStreamSqlJob(
-              flinkInterpreter.getStreamExecutionEnvironment(),
-              flinkInterpreter.getStreamTableEnvironment(), context,
-              flinkInterpreter.getDefaultParallelism());
-      streamJob.run(sql);
-    } else {
-      throw new IOException("Unrecognized stream type: " + streamType);
+    int defaultSqlParallelism = this.tbenv.getConfig().getConfiguration()
+            
.getInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+    try {
+      if (context.getLocalProperties().containsKey("parallelism")) {
+        this.tbenv.getConfig().getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
+                        
Integer.parseInt(context.getLocalProperties().get("parallelism")));
+      }
+
+      String streamType = context.getLocalProperties().get("type");
+      if (streamType == null) {
+        throw new IOException("type must be specified for stream sql");
+      }
+      if (streamType.equalsIgnoreCase("single")) {
+        SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
+                flinkInterpreter.getStreamExecutionEnvironment(),
+                flinkInterpreter.getStreamTableEnvironment(),
+                flinkInterpreter.getJobManager(),
+                context,
+                flinkInterpreter.getDefaultParallelism());
+        streamJob.run(sql);
 
 Review comment:
   I'm thinking if we can generalize this using constructor references, etc.?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to