pnowojski commented on code in PR #23679: URL: https://github.com/apache/flink/pull/23679#discussion_r1386250291
########## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java: ########## @@ -611,6 +636,22 @@ public static void printHelpForSavepoint(Collection<CustomCommandLine> customCom System.out.println(); } + public static void printHelpForCheckpoint(Collection<CustomCommandLine> customCommandLines) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println( + "\nAction \"checkpoint\" triggers checkpoints for a running job or disposes existing ones."); Review Comment: > or disposes existing ones Is that a typo from copying/pasting? Users shouldn't be allowed to dispose checkpoints 🤔 ########## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java: ########## @@ -157,6 +157,20 @@ public class CliFrontendParser { + " for changing state backends, native = a specific format for the" + " chosen state backend, might be faster to take and restore from."); + static final Option CHECKPOINT_FULL_OPTION = + new Option( + "full", + "full", + false, + "Defines whether to trigger this checkpoint as a full one."); + + static final Option CHECKPOINT_INCREMENTAL_OPTION = + new Option( + "incremental", + "incremental", + false, + "Defines whether to trigger this checkpoint as a incremental one."); Review Comment: nit: `as an incremental` ########## docs/content.zh/docs/deployment/cli.md: ########## @@ -153,6 +153,35 @@ $ ./bin/flink savepoint \ Triggering the savepoint disposal through the `savepoint` action does not only remove the data from the storage but makes Flink clean up the savepoint-related metadata as well. +### Creating a Checkpoint +[Checkpoints]({{< ref "docs/ops/state/checkpoints" >}}) can also be manually created to save the +current state. To get the difference between checkpoint and savepoint, please refer to +[Checkpoints vs. Savepoints]({{< ref "docs/ops/state/checkpoints_vs_savepoints" >}}). All that's +needed to trigger a checkpoint manually is the JobID: +```bash +$ ./bin/flink checkpoint \ + $JOB_ID +``` +``` +Triggering checkpoint for job 99c59fead08c613763944f533bf90c0f. +Waiting for response... +Checkpoint(CONFIGURED) 26 for job 99c59fead08c613763944f533bf90c0f completed. +You can resume your program from this checkpoint with the run command. +``` +If you want to trigger a full checkpoint while the job periodically triggering incremental checkpoints, +please use the `--full` option. Review Comment: I would rephrase it to sth like: > By default this command triggers the same type of checkpoint that is configured for the given job. If you want override this behaviour, you can force either full or incremental checkpoint to be performed via `--full` or `--incremental` flags respectively. > (code example) ########## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java: ########## @@ -611,6 +636,22 @@ public static void printHelpForSavepoint(Collection<CustomCommandLine> customCom System.out.println(); } + public static void printHelpForCheckpoint(Collection<CustomCommandLine> customCommandLines) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println( + "\nAction \"checkpoint\" triggers checkpoints for a running job or disposes existing ones."); Review Comment: > or disposes existing ones Is that a typo from copying/pasting? Users shouldn't be allowed to dispose checkpoints 🤔 ########## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java: ########## @@ -842,6 +844,85 @@ private void disposeSavepoint( logAndSysout("Savepoint '" + savepointPath + "' disposed."); } + /** + * Executes the CHECKPOINT action. + * + * @param args Command line arguments for the checkpoint action. + */ + protected void checkpoint(String[] args) throws Exception { + LOG.info("Running 'checkpoint' command."); + + final Options commandOptions = CliFrontendParser.getCheckpointCommandOptions(); + + final CommandLine commandLine = getCommandLine(commandOptions, args, false); + + final CheckpointOptions checkpointOptions = new CheckpointOptions(commandLine); + + // evaluate help flag + if (checkpointOptions.isPrintHelp()) { + CliFrontendParser.printHelpForCheckpoint(customCommandLines); + return; + } + + final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine); + + String[] cleanedArgs = checkpointOptions.getArgs(); + + final JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + + jobId = parseJobId(jobIdString); + } else { + throw new CliArgsException( + "Missing JobID. " + "Specify a Job ID to manipulate a checkpoint."); + } + runClusterAction( + activeCommandLine, + commandLine, + (clusterClient, effectiveConfiguration) -> + triggerCheckpoint( + clusterClient, + jobId, + checkpointOptions.getCheckpointType(), + getClientTimeout(effectiveConfiguration))); + } + + /** Sends a CheckpointTriggerMessage to the job manager. */ + private void triggerCheckpoint( + ClusterClient<?> clusterClient, + JobID jobId, + CheckpointType checkpointType, + Duration clientTimeout) + throws FlinkException { + logAndSysout("Triggering checkpoint for job " + jobId + '.'); + + CompletableFuture<Long> checkpointFuture = + clusterClient.triggerCheckpoint(jobId, checkpointType); + + logAndSysout("Waiting for response..."); + + try { + final long checkpointId = + checkpointFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS); + + logAndSysout( + "Checkpoint(" + + checkpointType + + ") " + + checkpointId + + " for job " + + jobId + + " completed."); + logAndSysout("You can resume your program from this checkpoint with the run command."); + } catch (Exception e) { + Throwable cause = ExceptionUtils.stripExecutionException(e); + throw new FlinkException( + "Triggering a checkpoint for the job " + jobId + " failed.", cause); Review Comment: nitty nit: It would be slightly easier to read to move `failed` to the beginning of the message. > Failed to trigger a checkpoint for the job (...) ########## docs/content/docs/deployment/cli.md: ########## @@ -151,6 +151,35 @@ $ ./bin/flink savepoint \ Triggering the savepoint disposal through the `savepoint` action does not only remove the data from the storage but makes Flink clean up the savepoint-related metadata as well. +### Creating a Checkpoint +[Checkpoints]({{< ref "docs/ops/state/checkpoints" >}}) can also be manually created to save the +current state. To get the difference between checkpoint and savepoint, please refer to +[Checkpoints vs. Savepoints]({{< ref "docs/ops/state/checkpoints_vs_savepoints" >}}). All that's +needed to trigger a checkpoint manually is the JobID: +```bash +$ ./bin/flink checkpoint \ + $JOB_ID +``` +``` +Triggering checkpoint for job 99c59fead08c613763944f533bf90c0f. +Waiting for response... +Checkpoint(CONFIGURED) 26 for job 99c59fead08c613763944f533bf90c0f completed. +You can resume your program from this checkpoint with the run command. +``` +If you want to trigger a full checkpoint while the job periodically triggering incremental checkpoints, +please use the `--full` option. Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org