Hi Tao, I'm afraid that your Flink job continues to be in high backpressued and all subsequent checkpoints did not ever run 'FromElementsFunctionT#snapshotState' which means your code to throw exception never be executed. You could check those expired checkpoints to see whether your tasks containing 'FromElementsFunctionT' has ever been completed.
Best Yun Tang ________________________________ From: tao xiao <xiaotao...@gmail.com> Sent: Saturday, June 26, 2021 16:40 To: user <user@flink.apache.org> Subject: Re: Exception in snapshotState suppresses subsequent checkpoints Btw here is the checkpoint related log [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: Checkpoint was declined. (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl) org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) [flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) [flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) [flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) [flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) [flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-runtime_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-runtime_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261] Caused by: org.apache.flink.util.SerializedThrowable: npe at com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111) ~[classes/:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] ... 20 more [2021-06-26 16:08:55,357] INFO Checkpoint 1 of job afde4a82f41e8284cb0bfff20497a5cc expired before completing. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) output [2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) 33 34 35 [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job afde4a82f41e8284cb0bfff20497a5cc expired before completing. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) On Sat, Jun 26, 2021 at 4:36 PM tao xiao <xiaotao...@gmail.com<mailto:xiaotao...@gmail.com>> wrote: Hi team, I run a simple 1.12.1 Flink job in IDE with TolerableCheckpointFailureNumber set where I throw an exception in source function snapshotState intentionally to verify how Flink behaves. What I find is the first checkpoint throws the exception and eventually time out while the main flow continues to work. This is expected however all subsequent checkpoints don't reach the exception anymore and report timeout when timeout reaches. I want to know if this is expected behavior which all later checkpoints cannot finish if there is one checkpoint that throws exception. Below is the code the reproduce the behavior main StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true)); env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointTimeout(3_000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000); env.addSource(new FromElementsFunctionT()) .setParallelism(1) .print() .setParallelism(1); env.execute("Demo"); Source function /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package sample.flink; import java.util.ArrayList; import java.util.List; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Preconditions; /** * A stream source function that returns a sequence of elements. * * <p>Upon construction, this source function serializes the elements using Flink's type * information. That way, any object transport using Java serialization will not be affected by the * serializability of the elements. * * <p><b>NOTE:</b> This source has a parallelism of 1. * */ @PublicEvolving public class FromElementsFunctionT implements SourceFunction<Integer>, CheckpointedFunction { private static final long serialVersionUID = 1L; /** The number of elements emitted already. */ private volatile int numElementsEmitted; /** Flag to make the source cancelable. */ private volatile boolean isRunning = true; private transient ListState<Integer> checkpointedState; @Override public void initializeState(FunctionInitializationContext context) throws Exception { Preconditions.checkState( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); this.checkpointedState = context.getOperatorStateStore() .getListState( new ListStateDescriptor<>( "from-elements-state", IntSerializer.INSTANCE)); if (context.isRestored()) { List<Integer> retrievedStates = new ArrayList<>(); for (Integer entry : this.checkpointedState.get()) { retrievedStates.add(entry); } // given that the parallelism of the function is 1, we can only have 1 state Preconditions.checkArgument( retrievedStates.size() == 1, getClass().getSimpleName() + " retrieved invalid state."); this.numElementsEmitted = retrievedStates.get(0); } } @Override public void run(SourceContext<Integer> ctx) throws Exception { final Object lock = ctx.getCheckpointLock(); while (isRunning && numElementsEmitted < Integer.MAX_VALUE) { Thread.sleep(1000); synchronized (lock) { ctx.collect(numElementsEmitted++); } } } @Override public void cancel() { isRunning = false; } // ------------------------------------------------------------------------ // Checkpointing // ------------------------------------------------------------------------ @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { Preconditions.checkState( this.checkpointedState != null, "The " + getClass().getSimpleName() + " has not been properly initialized."); this.checkpointedState.clear(); this.checkpointedState.add(this.numElementsEmitted); throw new NullPointerException("npe"); } } -- Regards, Tao -- Regards, Tao