Good morning Daniel,

… so my guess was not the cause of your problem 😊, anyway it seems like you 
always want to use your LogSessionProducer with Session?
In that case you could drop the generics from the class like this:

class LogSessionProducer(schema: SerializationSchema[Session], props: 
Properties)
  extends FlinkKinesisProducer[Session](schema, props) with LazyLogging {
  ...
  override def invoke(value: Session, context: SinkFunction.Context[_]): Unit = 
{
  ...

As to your assumption that the problems could be in your override def open() …
… I don’t see you invoke the super.open(…) function which would leave the 
producer only half initialized

Thias




From: Daniel Vol <vold...@gmail.com>
Sent: Donnerstag, 12. August 2021 08:01
To: Guowei Ma <guowei....@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Odd Serialization exception

Hi Guowei,

I am running on EMR 5.32.0 with Flink 1.11.2

In meanwhile I did some tests and commented out part of the new code -

override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
    try {
//      val session = value.asInstanceOf[Session]
//      sessionDuration = 17L //session.getSessionDuration
//      sessionSize = 19 //session.getSessionTotalEvents
      super.invoke(value, context)
      sessionsWritten.inc()
    }
Though I still get Caused by: org.apache.flink.util.SerializedThrowable: null
So, my assumption is that something wrong with "override def open()" method

Thanks!

On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma 
<guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote:
Hi, Daniel
Could you tell me the version of Flink you use? I want to look at the 
corresponding code.
Best,
Guowei


On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol 
<vold...@gmail.com<mailto:vold...@gmail.com>> wrote:
Hi Matthias,

First, thanks for a fast reply.
I am new to Flink, so probably I miss a lot in terms of flow and objects passed.

The motivation is to get internal data from the transferred OUT Object to send 
metrics. So I do downscale it but as per my perspective it is not forwarded 
(super called with original value) or expected to be used in later steps (this 
expected to be a local scope variable)
As I am suspect that you are right - can you point me to how can I get internal 
data from OUT without changing it or affecting next steps.
As well - when I create the object - I specify OUT type (which is Session):

val flinkKinesisProducer = new LogSessionProducer[Session](new 
KinesisEventSerializer[Session], producerConfig)
"… but of course I might be completely be mistaken due to incomplete 
information."
What kind of information can I supply?

Thanks a lot!

Daniel


On 11 Aug 2021, at 17:28, Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote:

Hi Daniel,

On the first look there is one thing that catches my eye:
In line ‘val session = value.asInstanceOf[Session]' it looks like you are 
downcasting the event from OUT to Session.
In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific 
serializer[OUT] to transport events from one operator to the next (or at least 
from one task to the next, if configured this way).
These serializers usually only understand one type, OUT in your case. Only in 
certain circumstances the java object (the event) is transported directly from 
one operator to the next.

I guess this is what happened, you serializer that only understands OUT can not 
cope with a Session object …

… but of course I might be completely be mistaken due to incomplete information.

I hope this helps 😊

Feel free to get back to me for clarifications (on the mailing list)

Cheers

Thias




From: Daniel Vol <vold...@gmail.com<mailto:vold...@gmail.com>>
Sent: Mittwoch, 11. August 2021 14:47
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Odd Serialization exception

I started to get the following exception:

2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), 
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: 
Unnamed (1/8)] INFO  o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - 
Could not complete snapshot 134 for operator 
Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, 
PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint 
was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 134 for operator Window(EventTimeSessionWindows(1800000), 
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: 
Unnamed (1/8). Failure reason: Checkpoint was declined.
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted

The only change from version that has no this issue is adding some metrics to 
Producer (marked in red):

class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: 
Properties)

  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {



  @transient private var sessionsProcessed: Counter = _

  @transient private var sessionsWritten: Counter = _

  @transient private var sessionWriteFailed: Counter = _



  @transient private var sessionDuration: Long = 0

  @transient private var sessionSize: Int = 0



    override def open(parameters: Configuration): Unit = {

    val metrics = getRuntimeContext.getMetricGroup

    sessionsProcessed = metrics.counter("sessionsProcessed")

    sessionsWritten = metrics.counter("sessionsWritten")

    sessionWriteFailed = metrics.counter("sessionWriteFailed")

    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", 
ScalaGauge[Long](() => sessionDuration))

    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => 
sessionSize))

  }



  @throws[Exception]

  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {

    try {

      val session = value.asInstanceOf[Session]

      sessionDuration = session.getSessionDuration

      sessionSize = session.getSessionTotalEvents

      super.invoke(value, context)

      sessionsWritten.inc()

    }

    catch {

      case e: IllegalArgumentException if e.getMessage.contains("Data must be 
less than or equal to") =>

        logger.error("failed session ended = " + value, e)

        sessionWriteFailed.inc()

      case _ : Throwbale => sessionWriteFailed.inc()

    } finally sessionsProcessed.inc()

  }

}

Anyone is familiar and can point out what may cause this exception and how 
should I solve it?

Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to