link3280 commented on code in PR #20159:
URL: https://github.com/apache/flink/pull/20159#discussion_r924224755


##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java:
##########
@@ -322,4 +339,96 @@ public List<String> listJars(String sessionId) {
         final SessionContext context = getSessionContext(sessionId);
         return context.listJars();
     }
+
+    @Override
+    public Optional<String> stopJob(
+            String sessionId, String jobId, boolean isWithSavepoint, boolean 
isWithDrain)
+            throws SqlExecutionException {
+        Duration clientTimeout = 
getSessionConfig(sessionId).get(ClientOptions.CLIENT_TIMEOUT);
+        try {
+            return runClusterAction(
+                    sessionId,
+                    clusterClient -> {
+                        if (isWithSavepoint) {
+                            // blocking get savepoint path
+                            try {
+                                String savepoint =
+                                        clusterClient
+                                                .stopWithSavepoint(
+                                                        
JobID.fromHexString(jobId),
+                                                        isWithDrain,
+                                                        null,
+                                                        
SavepointFormatType.DEFAULT)
+                                                .get(
+                                                        
clientTimeout.toMillis(),
+                                                        TimeUnit.MILLISECONDS);
+                                return Optional.of(savepoint);
+                            } catch (Exception e) {
+                                throw new FlinkException(
+                                        "Could not stop job "
+                                                + jobId
+                                                + " in session "
+                                                + sessionId
+                                                + ".",
+                                        e);
+                            }
+                        } else {
+                            clusterClient.cancel(JobID.fromHexString(jobId));
+                            return Optional.empty();
+                        }
+                    });
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    "Could not stop job " + jobId + " in session " + sessionId 
+ ".", e);
+        }
+    }
+
+    /**
+     * Retrieves the {@link ClusterClient} from the session and runs the given 
{@link ClusterAction}
+     * against it.
+     *
+     * @param sessionId the specified session ID
+     * @param clusterAction the cluster action to run against the retrieved 
{@link ClusterClient}.
+     * @param <ClusterID> type of the cluster id
+     * @param <Result>> type of the result
+     * @throws FlinkException if something goes wrong
+     */
+    private <ClusterID, Result> Result runClusterAction(

Review Comment:
   LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to