I started to get the following exception:

2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8)] INFO
o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at org.apache.flink.streaming.runtime.io
.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io
.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted

The only change from version that has no this issue is adding some metrics
to Producer (marked in red):

class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props:
Properties)
  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {

  @transient private var sessionsProcessed: Counter = _
  @transient private var sessionsWritten: Counter = _
  @transient private var sessionWriteFailed: Counter = _

  @transient private var sessionDuration: Long = 0
  @transient private var sessionSize: Int = 0

    override def open(parameters: Configuration): Unit = {
    val metrics = getRuntimeContext.getMetricGroup
    sessionsProcessed = metrics.counter("sessionsProcessed")
    sessionsWritten = metrics.counter("sessionsWritten")
    sessionWriteFailed = metrics.counter("sessionWriteFailed")
    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration",
ScalaGauge[Long](() => sessionDuration))
    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize",
ScalaGauge[Int](() => sessionSize))
  }

  @throws[Exception]
  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
    try {
      val session = value.asInstanceOf[Session]
      sessionDuration = session.getSessionDuration
      sessionSize = session.getSessionTotalEvents
      super.invoke(value, context)      sessionsWritten.inc()    }
    catch {
      case e: IllegalArgumentException if e.getMessage.contains("Data
must be less than or equal to") =>
        logger.error("failed session ended = " + value, e)
        sessionWriteFailed.inc()
      case _ : Throwbale => sessionWriteFailed.inc()
    } finally sessionsProcessed.inc()
  }
}

Anyone is familiar and can point out what may cause this exception and
how should I solve it?

Thanks!

Reply via email to