I started to get the following exception: 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8)] INFO o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl - Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879) at org.apache.flink.streaming.runtime.io .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113) at org.apache.flink.streaming.runtime.io .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198) at org.apache.flink.streaming.runtime.io .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93) at org.apache.flink.streaming.runtime.io .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158) at org.apache.flink.streaming.runtime.io .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.SerializedThrowable: null at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) ... 23 common frames omitted
The only change from version that has no this issue is adding some metrics to Producer (marked in red): class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties) extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging { @transient private var sessionsProcessed: Counter = _ @transient private var sessionsWritten: Counter = _ @transient private var sessionWriteFailed: Counter = _ @transient private var sessionDuration: Long = 0 @transient private var sessionSize: Int = 0 override def open(parameters: Configuration): Unit = { val metrics = getRuntimeContext.getMetricGroup sessionsProcessed = metrics.counter("sessionsProcessed") sessionsWritten = metrics.counter("sessionsWritten") sessionWriteFailed = metrics.counter("sessionWriteFailed") metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration)) metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize)) } @throws[Exception] override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = { try { val session = value.asInstanceOf[Session] sessionDuration = session.getSessionDuration sessionSize = session.getSessionTotalEvents super.invoke(value, context) sessionsWritten.inc() } catch { case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") => logger.error("failed session ended = " + value, e) sessionWriteFailed.inc() case _ : Throwbale => sessionWriteFailed.inc() } finally sessionsProcessed.inc() } } Anyone is familiar and can point out what may cause this exception and how should I solve it? Thanks!