XComp commented on code in PR #27921:
URL: https://github.com/apache/flink/pull/27921#discussion_r3225990963
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -2000,17 +2001,21 @@ public long getCheckpointTimeout() {
}
/**
- * Returns {@code -1} if a checkpoint is already in flight, otherwise the
remaining time (in ms)
- * until {@code minPauseBetweenCheckpoints} is satisfied ({@code 0} =
trigger now). All checks
- * are made under the coordinator lock.
+ * Returns {@link Optional#empty()} if a checkpoint is already in flight
(triggering or
+ * pending). Otherwise returns the remaining {@link Duration} until {@code
Review Comment:
Usually, you would mention the actual value first and the fallback
afterwards.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +190,111 @@ public ScheduledFuture<?> scheduleOperation(Runnable
callback, Duration delay) {
return context.runIfState(this, callback, delay);
}
+ @Override
+ public void requestActiveCheckpointTrigger() {
+ if (!activeCheckpointTriggerEnabled) {
+ return;
+ }
+
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+ return;
+ }
+
+ final Optional<Duration> triggerDelay =
+ checkpointCoordinator.getActiveCheckpointTriggerDelay();
+ if (triggerDelay.isEmpty()) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpoint already in progress.");
+ return;
+ }
+ scheduleActiveCheckpointTriggerRetry(triggerDelay.get());
+ }
+
+ private boolean shouldTriggerActiveCheckpoint(
Review Comment:
All calls of this method negate the result. We could rename the method into
`skipActiveCheckpointTriggering` and revert its logic to avoid negations.
##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -745,6 +745,24 @@ public InlineElement getDescription() {
.key()))
.build());
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Boolean>
SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
+
key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ Description.builder()
+ .text(
+ "When enabled, the Adaptive
Scheduler actively triggers a checkpoint when resources change and rescaling is
desired, "
+ + "rather than waiting for
the next periodic checkpoint. "
+ + "This reduces rescaling
latency, especially when checkpoint intervals are large. "
+ + "The active trigger
respects min-pause and "
Review Comment:
Why did we remove the reference to the `min-pause` parameter? 🤔
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("A retry should be scheduled while min-pause is not
satisfied")
+ .isNotEmpty();
+
+ ctx.setExpectStopWithSavepoint(assertNonNull());
+ exec.stopWithSavepoint("file:///tmp/target", true,
SavepointFormatType.CANONICAL);
+
+ ctxExecutor.triggerAll();
+ checkpointTimer.triggerAll();
+ int pendingAfterSavepoint =
coordinator.getNumberOfPendingCheckpoints();
+
+ // Trigger the previously scheduled retry; it must be a no-op
because the state
+ // transitioned to StopWithSavepoint (runIfState gates the action
on
+ // hadStateTransition).
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("Active checkpoint trigger retry must not fire after
stopWithSavepoint")
+ .isEqualTo(pendingAfterSavepoint);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return null;
+ }
+ };
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ int baseline =
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("No checkpoint trigger should be scheduled when
coordinator is null")
+ .hasSize(baseline);
+ }
+ }
+
+ @Test
+ void
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+
.setCheckpointInterval(Long.MAX_VALUE)
+ .build())
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ // Set up parallelism change so the only blocking guard is
periodic checkpointing
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+
+
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+ exec.requestActiveCheckpointTrigger();
Review Comment:
```suggestion
exec.requestActiveCheckpointTrigger();
((ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor()).triggerNonPeriodicScheduledTasks();
```
we're missing the state transition here. Otherwise, the test wouldn't test
what it wants to test (you can verify it by commenting out the periodic
checkpoint check in `Executing`)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +190,111 @@ public ScheduledFuture<?> scheduleOperation(Runnable
callback, Duration delay) {
return context.runIfState(this, callback, delay);
}
+ @Override
+ public void requestActiveCheckpointTrigger() {
+ if (!activeCheckpointTriggerEnabled) {
+ return;
+ }
+
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+ return;
+ }
+
+ final Optional<Duration> triggerDelay =
+ checkpointCoordinator.getActiveCheckpointTriggerDelay();
+ if (triggerDelay.isEmpty()) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpoint already in progress.");
+ return;
+ }
+ scheduleActiveCheckpointTriggerRetry(triggerDelay.get());
+ }
+
+ private boolean shouldTriggerActiveCheckpoint(
+ @Nullable CheckpointCoordinator checkpointCoordinator) {
+ if (checkpointCoordinator == null
+ || !checkpointCoordinator.isPeriodicCheckpointingConfigured())
{
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpointing not configured.");
+ return false;
+ }
+ if (!parallelismChanged()) {
Review Comment:
Generally, checking the unit tests makes me feel like we're having too many
different locations where we verify whether active checkpoint triggering is
allowed or not. 🤔
The parallelism change should only be evaluated in
`tryFireActiveCheckpointAfterRetry` because that's where it's relevant. In
contrast, the `checkpointCoordinator` null check and the periodic checkpointing
config check should not need to be evaluated in the scheduled task but should
only live in the code that would eventually schedule the checkpoint triggering.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java:
##########
@@ -400,6 +400,59 @@ private static void assertFinalStateTransitionHappened(
assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class);
}
+ @Test
+ void testActiveCheckpointTriggerCalledOnEnteringStabilizing() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ ctx.withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Idling.class);
+ ctx.clearActiveCheckpointTriggerCount();
+
+ testInstance.onChange(true);
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+
assertThat(ctx.getActiveCheckpointTriggerCount()).isGreaterThanOrEqualTo(1);
Review Comment:
why do we assert for `isGreaterThanOrEqualTo` here? Shouldn't we be able to
come up with an exact expected value? The test succeeds even when removing the
`clearActiveCheckpointTriggerCount`.
That applies to all three newly added tests.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +190,111 @@ public ScheduledFuture<?> scheduleOperation(Runnable
callback, Duration delay) {
return context.runIfState(this, callback, delay);
}
+ @Override
+ public void requestActiveCheckpointTrigger() {
+ if (!activeCheckpointTriggerEnabled) {
+ return;
+ }
+
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+ return;
+ }
+
+ final Optional<Duration> triggerDelay =
+ checkpointCoordinator.getActiveCheckpointTriggerDelay();
+ if (triggerDelay.isEmpty()) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpoint already in progress.");
+ return;
+ }
+ scheduleActiveCheckpointTriggerRetry(triggerDelay.get());
+ }
+
+ private boolean shouldTriggerActiveCheckpoint(
+ @Nullable CheckpointCoordinator checkpointCoordinator) {
+ if (checkpointCoordinator == null
+ || !checkpointCoordinator.isPeriodicCheckpointingConfigured())
{
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpointing not configured.");
+ return false;
+ }
+ if (!parallelismChanged()) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
parallelism unchanged.");
+ return false;
+ }
+ return true;
+ }
+
+ private void scheduleActiveCheckpointTriggerRetry(Duration delay) {
+ if (activeCheckpointTriggerScheduled) {
+ return;
+ }
+ activeCheckpointTriggerScheduled = true;
+ if (!delay.isZero()) {
+ getLogger()
+ .debug(
+ "Min pause not satisfied, scheduling active
checkpoint trigger retry in {} ms.",
+ delay.toMillis());
+ }
+ context.runIfState(this, this::tryFireActiveCheckpointAfterRetry,
delay);
+ }
+
+ private void tryFireActiveCheckpointAfterRetry() {
+ activeCheckpointTriggerScheduled = false;
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
Review Comment:
nit: I feel like we're chosing the wrong location for the null check here.
The subsequent calls like `getActiveCheckpointTriggerDelay` rely on
`checkpointCoordinator` to be not null. But this check is hidden in
`shouldTriggerActiveCheckpoint`. Doing the null check here instead would make
the code more readable.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("A retry should be scheduled while min-pause is not
satisfied")
+ .isNotEmpty();
+
+ ctx.setExpectStopWithSavepoint(assertNonNull());
+ exec.stopWithSavepoint("file:///tmp/target", true,
SavepointFormatType.CANONICAL);
+
+ ctxExecutor.triggerAll();
+ checkpointTimer.triggerAll();
+ int pendingAfterSavepoint =
coordinator.getNumberOfPendingCheckpoints();
+
+ // Trigger the previously scheduled retry; it must be a no-op
because the state
+ // transitioned to StopWithSavepoint (runIfState gates the action
on
+ // hadStateTransition).
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("Active checkpoint trigger retry must not fire after
stopWithSavepoint")
+ .isEqualTo(pendingAfterSavepoint);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return null;
+ }
+ };
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ int baseline =
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
Review Comment:
This test also succeeds if the existence of the checkpointCoordinator is not
evaluated because we're not changing the parallelism.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java:
##########
@@ -400,6 +400,59 @@ private static void assertFinalStateTransitionHappened(
assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class);
}
+ @Test
+ void testActiveCheckpointTriggerCalledOnEnteringStabilizing() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ ctx.withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Idling.class);
+ ctx.clearActiveCheckpointTriggerCount();
+
+ testInstance.onChange(true);
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+
assertThat(ctx.getActiveCheckpointTriggerCount()).isGreaterThanOrEqualTo(1);
+ }
+
+ @Test
+ void testActiveCheckpointTriggerCalledOnChangeInStabilizing() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ ctx.withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ testInstance.onChange(true);
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+ ctx.clearActiveCheckpointTriggerCount();
+
+ testInstance.onChange(true);
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+
assertThat(ctx.getActiveCheckpointTriggerCount()).isGreaterThanOrEqualTo(1);
Review Comment:
Wouldn't it be nicer to actually count the trigger calls instead of cleaning
the variable up in between? We do two onChange calls so we expect the trigger
to be called twice.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("A retry should be scheduled while min-pause is not
satisfied")
+ .isNotEmpty();
+
+ ctx.setExpectStopWithSavepoint(assertNonNull());
+ exec.stopWithSavepoint("file:///tmp/target", true,
SavepointFormatType.CANONICAL);
+
+ ctxExecutor.triggerAll();
+ checkpointTimer.triggerAll();
+ int pendingAfterSavepoint =
coordinator.getNumberOfPendingCheckpoints();
+
+ // Trigger the previously scheduled retry; it must be a no-op
because the state
+ // transitioned to StopWithSavepoint (runIfState gates the action
on
+ // hadStateTransition).
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("Active checkpoint trigger retry must not fire after
stopWithSavepoint")
+ .isEqualTo(pendingAfterSavepoint);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return null;
+ }
+ };
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ int baseline =
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("No checkpoint trigger should be scheduled when
coordinator is null")
+ .hasSize(baseline);
+ }
+ }
+
+ @Test
+ void
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+
.setCheckpointInterval(Long.MAX_VALUE)
+ .build())
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ // Set up parallelism change so the only blocking guard is
periodic checkpointing
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+
+
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+ exec.requestActiveCheckpointTrigger();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as(
+ "No checkpoint should be triggered when periodic
checkpointing is not configured")
+ .isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenParallelismUnchanged() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+
AccessExecutionJobVertex::getParallelism))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
Review Comment:
```suggestion
exec.requestActiveCheckpointTrigger();
((ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor()).triggerNonPeriodicScheduledTasks();
```
same here, we're not testing the exact code path the test is suppose to
verify.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("A retry should be scheduled while min-pause is not
satisfied")
+ .isNotEmpty();
+
+ ctx.setExpectStopWithSavepoint(assertNonNull());
+ exec.stopWithSavepoint("file:///tmp/target", true,
SavepointFormatType.CANONICAL);
+
+ ctxExecutor.triggerAll();
+ checkpointTimer.triggerAll();
+ int pendingAfterSavepoint =
coordinator.getNumberOfPendingCheckpoints();
+
+ // Trigger the previously scheduled retry; it must be a no-op
because the state
+ // transitioned to StopWithSavepoint (runIfState gates the action
on
+ // hadStateTransition).
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("Active checkpoint trigger retry must not fire after
stopWithSavepoint")
+ .isEqualTo(pendingAfterSavepoint);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return null;
+ }
+ };
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ int baseline =
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("No checkpoint trigger should be scheduled when
coordinator is null")
+ .hasSize(baseline);
+ }
+ }
+
+ @Test
+ void
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+
.setCheckpointInterval(Long.MAX_VALUE)
+ .build())
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ // Set up parallelism change so the only blocking guard is
periodic checkpointing
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+
+
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+ exec.requestActiveCheckpointTrigger();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as(
+ "No checkpoint should be triggered when periodic
checkpointing is not configured")
+ .isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenParallelismUnchanged() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+
AccessExecutionJobVertex::getParallelism))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("No checkpoint should be triggered when parallelism is
unchanged")
+ .isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenCheckpointInProgress() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ coordinator.triggerCheckpoint(false);
+ checkpointTimer.triggerAll();
+
+ int pendingBefore = coordinator.getNumberOfPendingCheckpoints();
+ assertThat(pendingBefore).isGreaterThan(0);
+ exec.requestActiveCheckpointTrigger();
Review Comment:
we're missing the state transition here as well
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,517 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("A retry should be scheduled while min-pause is not
satisfied")
+ .isNotEmpty();
+
+ ctx.setExpectStopWithSavepoint(assertNonNull());
+ exec.stopWithSavepoint("file:///tmp/target", true,
SavepointFormatType.CANONICAL);
+
+ ctxExecutor.triggerAll();
+ checkpointTimer.triggerAll();
+ int pendingAfterSavepoint =
coordinator.getNumberOfPendingCheckpoints();
+
+ // Trigger the previously scheduled retry; it must be a no-op
because the state
+ // transitioned to StopWithSavepoint (runIfState gates the action
on
+ // hadStateTransition).
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("Active checkpoint trigger retry must not fire after
stopWithSavepoint")
+ .isEqualTo(pendingAfterSavepoint);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return null;
+ }
+ };
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor)
ctx.getMainThreadExecutor();
+ int baseline =
ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("No checkpoint trigger should be scheduled when
coordinator is null")
+ .hasSize(baseline);
+ }
+ }
+
+ @Test
+ void
testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(
+ new CheckpointCoordinatorConfiguration
+
.CheckpointCoordinatorConfigurationBuilder()
+
.setCheckpointInterval(Long.MAX_VALUE)
+ .build())
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ // Set up parallelism change so the only blocking guard is
periodic checkpointing
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() +
1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+
+
assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+ exec.requestActiveCheckpointTrigger();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as(
+ "No checkpoint should be triggered when periodic
checkpointing is not configured")
+ .isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenParallelismUnchanged() throws
Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable<ExecutionJobVertex>
getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+
AccessExecutionJobVertex::getJobVertexId,
+
AccessExecutionJobVertex::getParallelism))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("No checkpoint should be triggered when parallelism is
unchanged")
+ .isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenCheckpointInProgress() throws
Exception {
Review Comment:
This test passes because no parallelism change is reocrded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]