Thank you both!

Looks much better and hopefully works!

Daniel.

On Thu, Aug 12, 2021 at 10:35 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Good morning Daniel,
>
>
>
> … so my guess was not the cause of your problem 😊, anyway it seems like
> you always want to use your LogSessionProducer with Session?
>
> In that case you could drop the generics from the class like this:
>
>
>
> class LogSessionProducer(schema: SerializationSchema[Session], props:
> Properties)
>
>   extends FlinkKinesisProducer[Session](schema, props) with LazyLogging {
>
>   ...
>
>   override def invoke(value: Session, context: SinkFunction.Context[_]):
> Unit = {
>
>   ...
>
>
>
> As to your assumption that the problems could be in your override def
> open() …
>
> … I don’t see you invoke the super.open(…) function which would leave the
> producer only half initialized
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Daniel Vol <vold...@gmail.com>
> *Sent:* Donnerstag, 12. August 2021 08:01
> *To:* Guowei Ma <guowei....@gmail.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Odd Serialization exception
>
>
>
> Hi Guowei,
>
>
>
> I am running on EMR 5.32.0 with Flink 1.11.2
>
>
>
> In meanwhile I did some tests and commented out part of the new code -
>
> override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>     try {
>
>
>
> *//      val session = value.asInstanceOf[Session]//      sessionDuration = 
> 17L //session.getSessionDuration//      sessionSize = 19 
> //session.getSessionTotalEvents      *super.invoke(value, context)
>       *sessionsWritten*.inc()
>     }
>
> Though I still get Caused by: org.apache.flink.util.SerializedThrowable:
> null
>
> So, my assumption is that something wrong with "override def open()" method
>
>
>
> Thanks!
>
>
>
> On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <guowei....@gmail.com> wrote:
>
> Hi, Daniel
>
> Could you tell me the version of Flink you use? I want to look at the
> corresponding code.
>
> Best,
>
> Guowei
>
>
>
>
>
> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vold...@gmail.com> wrote:
>
> Hi Matthias,
>
>
>
> First, thanks for a fast reply.
> I am new to Flink, so probably I miss a lot in terms of flow and objects
> passed.
>
>
>
> The motivation is to get internal data from the transferred OUT Object to
> send metrics. So I do downscale it but as per my perspective it is not
> forwarded (super called with original value) or expected to be used in
> later steps (this expected to be a local scope variable)
>
> As I am suspect that you are right - can you point me to how can I get
> internal data from OUT without changing it or affecting next steps.
>
> As well - when I create the object - I specify OUT type (which is Session):
>
> val flinkKinesisProducer = new LogSessionProducer[*Session*](new 
> KinesisEventSerializer[Session], producerConfig)
>
> "… but of course I might be completely be mistaken due to incomplete
> information."
>
> What kind of information can I supply?
>
>
>
> Thanks a lot!
>
>
>
> Daniel
>
>
>
> On 11 Aug 2021, at 17:28, Schwalbe Matthias <matthias.schwa...@viseca.ch>
> wrote:
>
> 
>
> Hi Daniel,
>
>
>
> On the first look there is one thing that catches my eye:
>
> In line ‘val session = value.asInstanceOf[Session]' it looks like you are
> downcasting the event from OUT to Session.
>
> In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific
> serializer[OUT] to transport events from one operator to the next (or at
> least from one task to the next, if configured this way).
>
> These serializers usually only understand one type, OUT in your case. Only
> in certain circumstances the java object (the event) is transported
> directly from one operator to the next.
>
>
>
> I guess this is what happened, you serializer that only understands OUT
> can not cope with a Session object …
>
>
>
> … but of course I might be completely be mistaken due to incomplete
> information.
>
>
>
> I hope this helps 😊
>
>
>
> Feel free to get back to me for clarifications (on the mailing list)
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Daniel Vol <vold...@gmail.com>
> *Sent:* Mittwoch, 11. August 2021 14:47
> *To:* user@flink.apache.org
> *Subject:* Odd Serialization exception
>
>
>
> I started to get the following exception:
>
>
>
> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8)] INFO
> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>     at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>     at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: null
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>     ... 23 common frames omitted
>
>
>
> The only change from version that has no this issue is adding some metrics
> to Producer (marked in red):
>
> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: 
> Properties)
>
>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>
>
>
>   @transient private var sessionsProcessed: Counter = _
>
>   @transient private var sessionsWritten: Counter = _
>
>   @transient private var sessionWriteFailed: Counter = _
>
>
>
>   @transient private var sessionDuration: Long = 0
>
>   @transient private var sessionSize: Int = 0
>
>
>
>     override def open(parameters: Configuration): Unit = {
>
>     val metrics = getRuntimeContext.getMetricGroup
>
>     sessionsProcessed = metrics.counter("sessionsProcessed")
>
>     sessionsWritten = metrics.counter("sessionsWritten")
>
>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>
>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", 
> ScalaGauge[Long](() => sessionDuration))
>
>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => 
> sessionSize))
>
>   }
>
>
>
>   @throws[Exception]
>
>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>
>     try {
>
>       val session = value.asInstanceOf[Session]
>
>       sessionDuration = session.getSessionDuration
>
>       sessionSize = session.getSessionTotalEvents
>
>       super.invoke(value, context)
>
>       sessionsWritten.inc()
>
>     }
>
>     catch {
>
>       case e: IllegalArgumentException if e.getMessage.contains("Data must be 
> less than or equal to") =>
>
>         logger.error("failed session ended = " + value, e)
>
>         sessionWriteFailed.inc()
>
>       case _ : Throwbale => sessionWriteFailed.inc()
>
>     } finally sessionsProcessed.inc()
>
>   }
>
> }
>
> Anyone is familiar and can point out what may cause this exception and how 
> should I solve it?
>
> Thanks!
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to