hackergin commented on code in PR #25988:
URL: https://github.com/apache/flink/pull/25988#discussion_r1919899767


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+        private final String jobId;

Review Comment:
   Sure, I missed this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to