Hi Matthias,

First, thanks for a fast reply.
I am new to Flink, so probably I miss a lot in terms of flow and objects
passed.

The motivation is to get internal data from the transferred OUT Object to
send metrics. So I do downscale it but as per my perspective it is not
forwarded (super called with original value) or expected to be used in
later steps (this expected to be a local scope variable)
As I am suspect that you are right - can you point me to how can I get
internal data from OUT without changing it or affecting next steps.
As well - when I create the object - I specify OUT type (which is Session):

val flinkKinesisProducer = new LogSessionProducer[*Session*](new
KinesisEventSerializer[Session], producerConfig)

"… but of course I might be completely be mistaken due to incomplete
information."
What kind of information can I supply?

Thanks a lot!

Daniel

On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch>
wrote:



Hi Daniel,



On the first look there is one thing that catches my eye:

In line ‘val session = value.asInstanceOf[Session]' it looks like you are
downcasting the event from OUT to Session.

In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific
serializer[OUT] to transport events from one operator to the next (or at
least from one task to the next, if configured this way).

These serializers usually only understand one type, OUT in your case. Only
in certain circumstances the java object (the event) is transported
directly from one operator to the next.



I guess this is what happened, you serializer that only understands OUT can
not cope with a Session object …



… but of course I might be completely be mistaken due to incomplete
information.



I hope this helps 😊



Feel free to get back to me for clarifications (on the mailing list)



Cheers



Thias









*From:* Daniel Vol <vold...@gmail.com>
*Sent:* Mittwoch, 11. August 2021 14:47
*To:* user@flink.apache.org
*Subject:* Odd Serialization exception



I started to get the following exception:



2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8)] INFO
o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at org.apache.flink.streaming.runtime.io
.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io
.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted



The only change from version that has no this issue is adding some metrics
to Producer (marked in red):

class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props:
Properties)

  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {



  @transient private var sessionsProcessed: Counter = _

  @transient private var sessionsWritten: Counter = _

  @transient private var sessionWriteFailed: Counter = _



  @transient private var sessionDuration: Long = 0

  @transient private var sessionSize: Int = 0



    override def open(parameters: Configuration): Unit = {

    val metrics = getRuntimeContext.getMetricGroup

    sessionsProcessed = metrics.counter("sessionsProcessed")

    sessionsWritten = metrics.counter("sessionsWritten")

    sessionWriteFailed = metrics.counter("sessionWriteFailed")

    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration",
ScalaGauge[Long](() => sessionDuration))

    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize",
ScalaGauge[Int](() => sessionSize))

  }



  @throws[Exception]

  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {

    try {

      val session = value.asInstanceOf[Session]

      sessionDuration = session.getSessionDuration

      sessionSize = session.getSessionTotalEvents

      super.invoke(value, context)

      sessionsWritten.inc()

    }

    catch {

      case e: IllegalArgumentException if e.getMessage.contains("Data
must be less than or equal to") =>

        logger.error("failed session ended = " + value, e)

        sessionWriteFailed.inc()

      case _ : Throwbale => sessionWriteFailed.inc()

    } finally sessionsProcessed.inc()

  }

}

Anyone is familiar and can point out what may cause this exception and
how should I solve it?

Thanks!

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for
the confidentiality and the intactness of this message. If you have
received it in error, please advise the sender by return e-mail and delete
this message and any attachments. Any unauthorised use or dissemination of
this information is strictly prohibited.

Reply via email to