Hi Daniel,

On the first look there is one thing that catches my eye:
In line ‘val session = value.asInstanceOf[Session]' it looks like you are 
downcasting the event from OUT to Session.
In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific 
serializer[OUT] to transport events from one operator to the next (or at least 
from one task to the next, if configured this way).
These serializers usually only understand one type, OUT in your case. Only in 
certain circumstances the java object (the event) is transported directly from 
one operator to the next.

I guess this is what happened, you serializer that only understands OUT can not 
cope with a Session object …

… but of course I might be completely be mistaken due to incomplete information.

I hope this helps 😊

Feel free to get back to me for clarifications (on the mailing list)

Cheers

Thias




From: Daniel Vol <vold...@gmail.com>
Sent: Mittwoch, 11. August 2021 14:47
To: user@flink.apache.org
Subject: Odd Serialization exception

I started to get the following exception:

2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), 
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: 
Unnamed (1/8)] INFO  o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - 
Could not complete snapshot 134 for operator 
Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, 
PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint 
was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 134 for operator Window(EventTimeSessionWindows(1800000), 
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: 
Unnamed (1/8). Failure reason: Checkpoint was declined.
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted

The only change from version that has no this issue is adding some metrics to 
Producer (marked in red):


class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: 
Properties)

  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {



  @transient private var sessionsProcessed: Counter = _

  @transient private var sessionsWritten: Counter = _

  @transient private var sessionWriteFailed: Counter = _



  @transient private var sessionDuration: Long = 0

  @transient private var sessionSize: Int = 0



    override def open(parameters: Configuration): Unit = {

    val metrics = getRuntimeContext.getMetricGroup

    sessionsProcessed = metrics.counter("sessionsProcessed")

    sessionsWritten = metrics.counter("sessionsWritten")

    sessionWriteFailed = metrics.counter("sessionWriteFailed")

    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", 
ScalaGauge[Long](() => sessionDuration))

    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => 
sessionSize))

  }



  @throws[Exception]

  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {

    try {

      val session = value.asInstanceOf[Session]

      sessionDuration = session.getSessionDuration

      sessionSize = session.getSessionTotalEvents

      super.invoke(value, context)

      sessionsWritten.inc()

    }

    catch {

      case e: IllegalArgumentException if e.getMessage.contains("Data must be 
less than or equal to") =>

        logger.error("failed session ended = " + value, e)

        sessionWriteFailed.inc()

      case _ : Throwbale => sessionWriteFailed.inc()

    } finally sessionsProcessed.inc()

  }

}

Anyone is familiar and can point out what may cause this exception and how 
should I solve it?

Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to