lsyldliu commented on code in PR #25988:
URL: https://github.com/apache/flink/pull/25988#discussion_r1919828124


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1271,117 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private static JobExecutionResult executeRefreshJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeNonApplicationJob(
+                    script, executionConfig, operationExecutor, 
operationHandle);
+        }
+    }
+
+    private static boolean isApplicationMode(Configuration configuration) {
+        return configuration.get(TARGET).endsWith("application");
+    }
+
+    private static JobExecutionResult executeNonApplicationJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        // Retrieve execute target
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+
+        if (executeTarget == null || executeTarget.isEmpty()) {
+            String errorMessage =
+                    "Execute target is null or empty. Please specify a valid 
target in the session configuration.";
+            LOG.error(errorMessage);
+            throw new ValidationException(errorMessage);
+        }
+
+        // Retrieve and validate cluster ID if not running locally
+        String clusterId = null;
+        if (!"local".equals(executeTarget)) {
+            clusterId =
+                    operationExecutor
+                            .getSessionClusterId()
+                            .orElseThrow(
+                                    () -> {
+                                        String errorMessage =
+                                                String.format(
+                                                        "No cluster ID found 
when executing materialized table refresh task. Execute target: %s",

Review Comment:
   Please standardize terminology and use refresh job



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1271,117 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private static JobExecutionResult executeRefreshJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeNonApplicationJob(
+                    script, executionConfig, operationExecutor, 
operationHandle);
+        }
+    }
+
+    private static boolean isApplicationMode(Configuration configuration) {
+        return configuration.get(TARGET).endsWith("application");
+    }
+
+    private static JobExecutionResult executeNonApplicationJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        // Retrieve execute target
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+
+        if (executeTarget == null || executeTarget.isEmpty()) {

Review Comment:
   ```suggestion
           if (executeTarget == null || executeTarget.isEmpty() || 
"local".equals(executeTarget)) {
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1271,117 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private static JobExecutionResult executeRefreshJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeNonApplicationJob(
+                    script, executionConfig, operationExecutor, 
operationHandle);
+        }
+    }
+
+    private static boolean isApplicationMode(Configuration configuration) {
+        return configuration.get(TARGET).endsWith("application");
+    }
+
+    private static JobExecutionResult executeNonApplicationJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        // Retrieve execute target
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+
+        if (executeTarget == null || executeTarget.isEmpty()) {

Review Comment:
   Why check if `execution.target` is configured here and not before calling 
the isApplicationMode method, otherwise isApplicationMode could have an NPE?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+        private final String jobId;

Review Comment:
   I'm a bit of a code cleaner, so why can't we organize code as following:
   ```
           private final String executionTarget;
           private final String clusterId;
           private final String jobId;
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {

Review Comment:
   For minicluster, is it not remote?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1271,117 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private static JobExecutionResult executeRefreshJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeNonApplicationJob(
+                    script, executionConfig, operationExecutor, 
operationHandle);
+        }
+    }
+
+    private static boolean isApplicationMode(Configuration configuration) {
+        return configuration.get(TARGET).endsWith("application");
+    }
+
+    private static JobExecutionResult executeNonApplicationJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        // Retrieve execute target
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+
+        if (executeTarget == null || executeTarget.isEmpty()) {
+            String errorMessage =
+                    "Execute target is null or empty. Please specify a valid 
target in the session configuration.";
+            LOG.error(errorMessage);
+            throw new ValidationException(errorMessage);
+        }
+
+        // Retrieve and validate cluster ID if not running locally
+        String clusterId = null;
+        if (!"local".equals(executeTarget)) {

Review Comment:
   Delete these code.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+        private final String jobId;
+
+        private final String executionTarget;
+
+        private final @Nullable String clusterId;
+
+        private final @Nullable String clusterIdKeyName;
+
+        public JobExecutionResult(
+                String jobId, String executionTarget, @Nullable String 
clusterId) {
+            this.jobId = jobId;
+            this.executionTarget = executionTarget;
+            this.clusterId = clusterId;
+            this.clusterIdKeyName = convertClusterIdKeyName(executionTarget);
+        }
+    }
+
+    private static @Nullable String convertClusterIdKeyName(String targetName) 
{
+        if (targetName.startsWith("yarn")) {
+            return "yarn.application.id";
+        } else if (targetName.startsWith("kubernetes")) {
+            return "kubernetes.cluster-id";
+        } else {
+            return null;

Review Comment:
   We can give it a specific key name for standalone mode.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java:
##########
@@ -31,20 +29,22 @@ public class ContinuousRefreshHandler implements 
RefreshHandler, Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    // TODO: add clusterId for yarn and k8s resource manager
     private final String executionTarget;
+    private final String clusterId;
     private final String jobId;
+    private final String restorePath;
 
-    private @Nullable final String restorePath;

Review Comment:
   Why drop the @Nullable annotation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to