pnowojski commented on code in PR #23679:
URL: https://github.com/apache/flink/pull/23679#discussion_r1386250291


##########
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##########
@@ -611,6 +636,22 @@ public static void 
printHelpForSavepoint(Collection<CustomCommandLine> customCom
         System.out.println();
     }
 
+    public static void printHelpForCheckpoint(Collection<CustomCommandLine> 
customCommandLines) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.setLeftPadding(5);
+        formatter.setWidth(80);
+
+        System.out.println(
+                "\nAction \"checkpoint\" triggers checkpoints for a running 
job or disposes existing ones.");

Review Comment:
   > or disposes existing ones
   
   Is that a typo from copying/pasting? Users shouldn't be allowed to dispose 
checkpoints 🤔  



##########
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##########
@@ -157,6 +157,20 @@ public class CliFrontendParser {
                             + " for changing state backends, native = a 
specific format for the"
                             + " chosen state backend, might be faster to take 
and restore from.");
 
+    static final Option CHECKPOINT_FULL_OPTION =
+            new Option(
+                    "full",
+                    "full",
+                    false,
+                    "Defines whether to trigger this checkpoint as a full 
one.");
+
+    static final Option CHECKPOINT_INCREMENTAL_OPTION =
+            new Option(
+                    "incremental",
+                    "incremental",
+                    false,
+                    "Defines whether to trigger this checkpoint as a 
incremental one.");

Review Comment:
   nit: `as an incremental`



##########
docs/content.zh/docs/deployment/cli.md:
##########
@@ -153,6 +153,35 @@ $ ./bin/flink savepoint \
 Triggering the savepoint disposal through the `savepoint` action does not only 
remove the data from 
 the storage but makes Flink clean up the savepoint-related metadata as well.
 
+### Creating a Checkpoint
+[Checkpoints]({{< ref "docs/ops/state/checkpoints" >}}) can also be manually 
created to save the
+current state. To get the difference between checkpoint and savepoint, please 
refer to
+[Checkpoints vs. Savepoints]({{< ref 
"docs/ops/state/checkpoints_vs_savepoints" >}}). All that's
+needed to trigger a checkpoint manually is the JobID:
+```bash
+$ ./bin/flink checkpoint \
+      $JOB_ID
+```
+```
+Triggering checkpoint for job 99c59fead08c613763944f533bf90c0f.
+Waiting for response...
+Checkpoint(CONFIGURED) 26 for job 99c59fead08c613763944f533bf90c0f completed.
+You can resume your program from this checkpoint with the run command.
+```
+If you want to trigger a full checkpoint while the job periodically triggering 
incremental checkpoints,
+please use the `--full` option.

Review Comment:
   I would rephrase it to sth like:
   > By default this command triggers the same type of checkpoint that is 
configured for the given job. If you want override this behaviour, you can 
force either full or incremental checkpoint to be performed via `--full` or 
`--incremental` flags respectively.
   > (code example)



##########
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java:
##########
@@ -611,6 +636,22 @@ public static void 
printHelpForSavepoint(Collection<CustomCommandLine> customCom
         System.out.println();
     }
 
+    public static void printHelpForCheckpoint(Collection<CustomCommandLine> 
customCommandLines) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.setLeftPadding(5);
+        formatter.setWidth(80);
+
+        System.out.println(
+                "\nAction \"checkpoint\" triggers checkpoints for a running 
job or disposes existing ones.");

Review Comment:
   > or disposes existing ones
   
   Is that a typo from copying/pasting? Users shouldn't be allowed to dispose 
checkpoints 🤔  



##########
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java:
##########
@@ -842,6 +844,85 @@ private void disposeSavepoint(
         logAndSysout("Savepoint '" + savepointPath + "' disposed.");
     }
 
+    /**
+     * Executes the CHECKPOINT action.
+     *
+     * @param args Command line arguments for the checkpoint action.
+     */
+    protected void checkpoint(String[] args) throws Exception {
+        LOG.info("Running 'checkpoint' command.");
+
+        final Options commandOptions = 
CliFrontendParser.getCheckpointCommandOptions();
+
+        final CommandLine commandLine = getCommandLine(commandOptions, args, 
false);
+
+        final CheckpointOptions checkpointOptions = new 
CheckpointOptions(commandLine);
+
+        // evaluate help flag
+        if (checkpointOptions.isPrintHelp()) {
+            CliFrontendParser.printHelpForCheckpoint(customCommandLines);
+            return;
+        }
+
+        final CustomCommandLine activeCommandLine = 
validateAndGetActiveCommandLine(commandLine);
+
+        String[] cleanedArgs = checkpointOptions.getArgs();
+
+        final JobID jobId;
+
+        if (cleanedArgs.length >= 1) {
+            String jobIdString = cleanedArgs[0];
+
+            jobId = parseJobId(jobIdString);
+        } else {
+            throw new CliArgsException(
+                    "Missing JobID. " + "Specify a Job ID to manipulate a 
checkpoint.");
+        }
+        runClusterAction(
+                activeCommandLine,
+                commandLine,
+                (clusterClient, effectiveConfiguration) ->
+                        triggerCheckpoint(
+                                clusterClient,
+                                jobId,
+                                checkpointOptions.getCheckpointType(),
+                                getClientTimeout(effectiveConfiguration)));
+    }
+
+    /** Sends a CheckpointTriggerMessage to the job manager. */
+    private void triggerCheckpoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            CheckpointType checkpointType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering checkpoint for job " + jobId + '.');
+
+        CompletableFuture<Long> checkpointFuture =
+                clusterClient.triggerCheckpoint(jobId, checkpointType);
+
+        logAndSysout("Waiting for response...");
+
+        try {
+            final long checkpointId =
+                    checkpointFuture.get(clientTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+
+            logAndSysout(
+                    "Checkpoint("
+                            + checkpointType
+                            + ") "
+                            + checkpointId
+                            + " for job "
+                            + jobId
+                            + " completed.");
+            logAndSysout("You can resume your program from this checkpoint 
with the run command.");
+        } catch (Exception e) {
+            Throwable cause = ExceptionUtils.stripExecutionException(e);
+            throw new FlinkException(
+                    "Triggering a checkpoint for the job " + jobId + " 
failed.", cause);

Review Comment:
   nitty nit: It would be slightly easier to read to move `failed` to the 
beginning of the message.
   
   > Failed to trigger a checkpoint for the job (...)



##########
docs/content/docs/deployment/cli.md:
##########
@@ -151,6 +151,35 @@ $ ./bin/flink savepoint \
 Triggering the savepoint disposal through the `savepoint` action does not only 
remove the data from 
 the storage but makes Flink clean up the savepoint-related metadata as well.
 
+### Creating a Checkpoint
+[Checkpoints]({{< ref "docs/ops/state/checkpoints" >}}) can also be manually 
created to save the 
+current state. To get the difference between checkpoint and savepoint, please 
refer to 
+[Checkpoints vs. Savepoints]({{< ref 
"docs/ops/state/checkpoints_vs_savepoints" >}}). All that's 
+needed to trigger a checkpoint manually is the JobID:
+```bash
+$ ./bin/flink checkpoint \
+      $JOB_ID
+```
+```
+Triggering checkpoint for job 99c59fead08c613763944f533bf90c0f.
+Waiting for response...
+Checkpoint(CONFIGURED) 26 for job 99c59fead08c613763944f533bf90c0f completed.
+You can resume your program from this checkpoint with the run command.
+```
+If you want to trigger a full checkpoint while the job periodically triggering 
incremental checkpoints, 
+please use the `--full` option.

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to