Liu created FLINK-39267:
---------------------------

             Summary: Support dynamically updating checkpoint configuration at 
runtime via REST API
                 Key: FLINK-39267
                 URL: https://issues.apache.org/jira/browse/FLINK-39267
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing
            Reporter: Liu


### Motivation

Currently, checkpoint configuration parameters such as 
`execution.checkpointing.timeout` are fixed at job submission time and cannot 
be changed without restarting the job. This creates operational pain in several 
real-world scenarios:

**Scenario 1: Resolving consecutive checkpoint timeout failures**

When a job experiences consecutive checkpoint timeout failures (e.g., due to 
state growth or temporary I/O slowdowns), users often urgently need a 
successful checkpoint for:
- Committing offsets in exactly-once sinks (e.g., Kafka transactions)
- Performing job rescaling (which requires a recent checkpoint/savepoint)
- Preventing job failure when 
`execution.checkpointing.tolerable-failed-checkpoints` is exceeded

The current workaround — stop-with-savepoint → modify config → restart — 
introduces significant downtime and is impractical when the checkpoint itself 
is failing.

**Scenario 2: Adapting to changing workload characteristics**

Long-running streaming jobs may experience varying checkpoint durations across 
different time periods (e.g., peak vs. off-peak hours, backlog processing vs. 
normal processing). A static timeout value forces users to choose between:
- A large timeout that delays failure detection when checkpoints are truly stuck
- A small timeout that causes unnecessary failures during legitimate slow 
checkpoints

**Scenario 3: Avoiding wasted checkpoint work (Future Phase)**

When a checkpoint has completed 80%+ of task acknowledgements but is about to 
expire, all the snapshot I/O work is wasted. Dynamically extending the timeout 
could save significant cluster resources. This is proposed as a future 
enhancement (see "Phased Approach" below).

### Phased Approach

This improvement is designed to be implemented incrementally:

**Phase 1 (This Issue): Dynamic checkpoint timeout**
- Change `CheckpointCoordinator.checkpointTimeout` from `final` to `volatile`.
- Add a setter method to update the timeout at runtime.
- Expose a new REST API endpoint for runtime configuration updates.
- The new timeout takes effect for the **next** triggered checkpoint. Already 
in-flight checkpoints (whose `CheckpointCanceller` has been scheduled) are not 
affected.
- This follows the same design pattern as `setIsProcessingBacklog()` 
(FLIP-309), which dynamically switches checkpoint intervals at runtime without 
affecting in-flight checkpoints.

**Phase 2 (Follow-up): Additional checkpoint parameters**
- Extend the REST API to support dynamically updating other checkpoint 
configuration parameters, such as `checkpointInterval` and 
`minPauseBetweenCheckpoints`, following the same pattern.

**Phase 3 (Future): Timeout extension for in-flight checkpoints**
- Reschedule the `CheckpointCanceller` for pending checkpoints when timeout is 
updated, so that the new timeout also applies to currently running checkpoints.
- This requires modifying `PendingCheckpoint` to support resetting its 
canceller handle (currently `setCancellerHandle()` throws 
`IllegalStateException` if called twice).
- Edge cases (new timeout already elapsed, concurrent modifications) need 
careful design.

### Public API Changes

**New REST API endpoint:**

```
PATCH /jobs/:jobid/checkpointing/configuration

Request body (Phase 1):
{
"checkpointTimeout": 600000
}

Response: 200 OK
```

The endpoint path `/checkpointing/configuration` is intentionally designed to 
be extensible — additional parameters (interval, minPause, etc.) can be added 
to the request body in Phase 2 without changing the API contract.

This is consistent with the existing `PUT /jobs/:jobid/resource-requirements` 
pattern for runtime job configuration updates.

### Design Details (Phase 1)

- The `checkpointTimeout` field in `CheckpointCoordinator` is read once per 
checkpoint in `createPendingCheckpoint()` when scheduling the 
`CheckpointCanceller`. Making it `volatile` ensures visibility across threads 
with minimal performance impact.
- No changes needed to `PendingCheckpoint`, 
`CheckpointCoordinatorConfiguration`, or Task-side code.
- The REST endpoint routes through `RestfulGateway` → `JobMaster` → 
`CheckpointCoordinator`.
- Validation: the new timeout must be a positive value.

**Core change in CheckpointCoordinator:**

```java
// Change field from:
private final long checkpointTimeout;
// To:
private volatile long checkpointTimeout;

// New setter method:
public void setCheckpointTimeout(long newTimeout) {
Preconditions.checkArgument(newTimeout > 0,
"Checkpoint timeout must be positive, but was %s", newTimeout);
this.checkpointTimeout = newTimeout;
LOG.info("Checkpoint timeout for job {} updated to {} ms.", job, newTimeout);
}
```

### Compatibility, Deprecation, and Migration Plan

- Fully backward compatible. If the REST API is not called, behavior is 
identical to the current implementation.
- No configuration deprecation.
- No changes to existing REST APIs.

### Related Issues / Prior Art

- **FLIP-309** (`setIsProcessingBacklog`) — dynamic checkpoint interval 
switching at runtime, same pattern
- **FLIP-160** / `JobResourcesRequirementsUpdateHeaders` — REST API for runtime 
job config updates



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to